feat: rpc external users: use 2 queues but no extra threads

This commit is contained in:
Konstantin Akimov 2024-06-12 17:05:50 +07:00
parent c575a5808a
commit c7efd56a07
No known key found for this signature in database
GPG Key ID: 2176C4A5D01EA524
2 changed files with 37 additions and 45 deletions

View File

@ -69,14 +69,14 @@ private:
Mutex cs;
std::condition_variable cond GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> external_queue GUARDED_BY(cs);
bool running GUARDED_BY(cs);
const size_t maxDepth;
const bool m_is_external;
const size_t m_external_depth;
public:
explicit WorkQueue(size_t _maxDepth, bool is_external) : running(true),
maxDepth(_maxDepth),
m_is_external(is_external)
explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true),
maxDepth(_maxDepth), m_external_depth(external_depth)
{
}
/** Precondition: worker threads have all stopped (they have been joined).
@ -85,13 +85,19 @@ public:
{
}
/** Enqueue a work item */
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs)
{
LOCK(cs);
if (!running || queue.size() >= maxDepth) {
if (!running) {
return false;
}
queue.emplace_back(std::unique_ptr<WorkItem>(item));
if (is_external) {
if (external_queue.size() >= m_external_depth) return false;
external_queue.emplace_back(std::unique_ptr<WorkItem>(item));
} else {
if (queue.size() >= maxDepth) return false;
queue.emplace_back(std::unique_ptr<WorkItem>(item));
}
cond.notify_one();
return true;
}
@ -102,15 +108,18 @@ public:
std::unique_ptr<WorkItem> i;
{
WAIT_LOCK(cs, lock);
while (running && queue.empty())
while (running && external_queue.empty() && queue.empty())
cond.wait(lock);
if (!running && queue.empty())
if (!running && external_queue.empty() && queue.empty())
break;
i = std::move(queue.front());
queue.pop_front();
}
if (m_is_external) {
LogPrintf("HTTP: Calling handler for external user...\n");
if (!queue.empty()) {
i = std::move(queue.front());
queue.pop_front();
} else {
i = std::move(external_queue.front());
external_queue.pop_front();
LogPrintf("HTTP: Calling handler for external user...\n");
}
}
(*i)();
}
@ -145,7 +154,6 @@ static struct evhttp* eventHTTP = nullptr;
static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue_external{nullptr};
//! Handlers for (sub)paths
static std::vector<HTTPPathHandler> pathHandlers;
//! Bound listening sockets
@ -284,19 +292,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
assert(g_work_queue);
// We have queue created only if RPC arg 'rpcexternaluser' is specified
if (is_external_request && g_work_queue_external) {
if (g_work_queue_external->Enqueue(item.get())) {
item.release();
} else {
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
}
} else if (g_work_queue->Enqueue(item.get())) {
if (g_work_queue->Enqueue(item.get(), is_external_request)) {
item.release(); /* if true, queue took ownership */
} else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
if (is_external_request)
{
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
} else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
}
}
} else {
hreq->WriteReply(HTTP_NOT_FOUND);
@ -425,14 +431,14 @@ bool InitHTTPServer()
LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
int workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, false);
int workQueueDepthExternal = 0;
if (!gArgs.GetArg("-rpcexternaluser", "").empty()) {
LogPrintf("HTTP: creating external work queue of depth %d\n", workQueueDepthExternal);
g_work_queue_external = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepthExternal, true);
workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
}
LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, workQueueDepthExternal);
// transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release();
eventHTTP = http_ctr.release();
@ -460,18 +466,12 @@ void StartHTTPServer()
{
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
int rpcThreadsExternals = std::max((long)gArgs.GetArg("-rpcexternalthreads", DEFAULT_HTTP_THREADS), 1L);
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
g_thread_http = std::thread(ThreadHTTP, eventBase);
for (int i = 0; i < rpcThreads; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
}
if (g_work_queue_external) {
for (int i = 0; i < rpcThreadsExternals; i++) {
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue_external.get(), i);
}
}
}
void InterruptHTTPServer()
@ -481,9 +481,6 @@ void InterruptHTTPServer()
// Reject requests on current connections
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
if (g_work_queue_external) {
g_work_queue_external->Interrupt();
}
if (g_work_queue) {
g_work_queue->Interrupt();
}
@ -492,9 +489,6 @@ void InterruptHTTPServer()
void StopHTTPServer()
{
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
if (g_work_queue_external) {
g_work_queue_external->Interrupt();
}
if (g_work_queue) {
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
for (auto& thread : g_thread_http_workers) {
@ -520,7 +514,6 @@ void StopHTTPServer()
event_base_free(eventBase);
eventBase = nullptr;
}
g_work_queue_external.reset();
g_work_queue.reset();
LogPrint(BCLog::HTTP, "Stopped HTTP server\n");
}

View File

@ -768,7 +768,6 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcthreads=<n>", strprintf("Set the number of threads to service RPC calls (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcexternalthreads=<n>", strprintf("Set the number of threads to service RPC calls from external consumers (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcuser=<user>", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcexternaluser=<user>", "Username for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcwhitelist=<whitelist>", "Set a whitelist to filter incoming RPC calls for a specific user. The field <whitelist> comes in the format: <USERNAME>:<rpc 1>,<rpc 2>,...,<rpc n>. If multiple whitelists are set for a given user, they are set-intersected. See -rpcwhitelistdefault documentation for information on default whitelist behavior.", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);