diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 509f4993be..e936b1d04d 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -69,14 +69,14 @@ private: Mutex cs; std::condition_variable cond GUARDED_BY(cs); std::deque> queue GUARDED_BY(cs); + std::deque> external_queue GUARDED_BY(cs); bool running GUARDED_BY(cs); const size_t maxDepth; - const bool m_is_external; + const size_t m_external_depth; public: - explicit WorkQueue(size_t _maxDepth, bool is_external) : running(true), - maxDepth(_maxDepth), - m_is_external(is_external) + explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true), + maxDepth(_maxDepth), m_external_depth(external_depth) { } /** Precondition: worker threads have all stopped (they have been joined). @@ -85,13 +85,19 @@ public: { } /** Enqueue a work item */ - bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) + bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs) { LOCK(cs); - if (!running || queue.size() >= maxDepth) { + if (!running) { return false; } - queue.emplace_back(std::unique_ptr(item)); + if (is_external) { + if (external_queue.size() >= m_external_depth) return false; + external_queue.emplace_back(std::unique_ptr(item)); + } else { + if (queue.size() >= maxDepth) return false; + queue.emplace_back(std::unique_ptr(item)); + } cond.notify_one(); return true; } @@ -102,15 +108,18 @@ public: std::unique_ptr i; { WAIT_LOCK(cs, lock); - while (running && queue.empty()) + while (running && external_queue.empty() && queue.empty()) cond.wait(lock); - if (!running && queue.empty()) + if (!running && external_queue.empty() && queue.empty()) break; - i = std::move(queue.front()); - queue.pop_front(); - } - if (m_is_external) { - LogPrintf("HTTP: Calling handler for external user...\n"); + if (!queue.empty()) { + i = std::move(queue.front()); + queue.pop_front(); + } else { + i = std::move(external_queue.front()); + external_queue.pop_front(); + LogPrintf("HTTP: Calling handler for external user...\n"); + } } (*i)(); } @@ -145,7 +154,6 @@ static struct evhttp* eventHTTP = nullptr; static std::vector rpc_allow_subnets; //! Work queue for handling longer requests off the event loop thread static std::unique_ptr> g_work_queue{nullptr}; -static std::unique_ptr> g_work_queue_external{nullptr}; //! Handlers for (sub)paths static std::vector pathHandlers; //! Bound listening sockets @@ -284,19 +292,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg) auto item{std::make_unique(std::move(hreq), path, i->handler)}; assert(g_work_queue); - // We have queue created only if RPC arg 'rpcexternaluser' is specified - if (is_external_request && g_work_queue_external) { - if (g_work_queue_external->Enqueue(item.get())) { - item.release(); - } else { - LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded"); - } - } else if (g_work_queue->Enqueue(item.get())) { + if (g_work_queue->Enqueue(item.get(), is_external_request)) { item.release(); /* if true, queue took ownership */ } else { - LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + if (is_external_request) + { + LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n"); + item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded"); + } else { + LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); + item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + } } } else { hreq->WriteReply(HTTP_NOT_FOUND); @@ -425,14 +431,14 @@ bool InitHTTPServer() LogPrint(BCLog::HTTP, "Initialized HTTP server\n"); int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - int workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth); - - g_work_queue = std::make_unique>(workQueueDepth, false); + int workQueueDepthExternal = 0; if (!gArgs.GetArg("-rpcexternaluser", "").empty()) { LogPrintf("HTTP: creating external work queue of depth %d\n", workQueueDepthExternal); - g_work_queue_external = std::make_unique>(workQueueDepthExternal, true); + workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); } + LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal); + + g_work_queue = std::make_unique>(workQueueDepth, workQueueDepthExternal); // transfer ownership to eventBase/HTTP via .release() eventBase = base_ctr.release(); eventHTTP = http_ctr.release(); @@ -460,18 +466,12 @@ void StartHTTPServer() { LogPrint(BCLog::HTTP, "Starting HTTP server\n"); int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L); - int rpcThreadsExternals = std::max((long)gArgs.GetArg("-rpcexternalthreads", DEFAULT_HTTP_THREADS), 1L); LogPrintf("HTTP: starting %d worker threads\n", rpcThreads); g_thread_http = std::thread(ThreadHTTP, eventBase); for (int i = 0; i < rpcThreads; i++) { g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i); } - if (g_work_queue_external) { - for (int i = 0; i < rpcThreadsExternals; i++) { - g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue_external.get(), i); - } - } } void InterruptHTTPServer() @@ -481,9 +481,6 @@ void InterruptHTTPServer() // Reject requests on current connections evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr); } - if (g_work_queue_external) { - g_work_queue_external->Interrupt(); - } if (g_work_queue) { g_work_queue->Interrupt(); } @@ -492,9 +489,6 @@ void InterruptHTTPServer() void StopHTTPServer() { LogPrint(BCLog::HTTP, "Stopping HTTP server\n"); - if (g_work_queue_external) { - g_work_queue_external->Interrupt(); - } if (g_work_queue) { LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n"); for (auto& thread : g_thread_http_workers) { @@ -520,7 +514,6 @@ void StopHTTPServer() event_base_free(eventBase); eventBase = nullptr; } - g_work_queue_external.reset(); g_work_queue.reset(); LogPrint(BCLog::HTTP, "Stopped HTTP server\n"); } diff --git a/src/init.cpp b/src/init.cpp index e780da3341..85c9e5a0fa 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -768,7 +768,6 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-rpcport=", strprintf("Listen for JSON-RPC connections on (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcservertimeout=", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcthreads=", strprintf("Set the number of threads to service RPC calls (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC); - argsman.AddArg("-rpcexternalthreads=", strprintf("Set the number of threads to service RPC calls from external consumers (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC); argsman.AddArg("-rpcuser=", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcexternaluser=", "Username for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcwhitelist=", "Set a whitelist to filter incoming RPC calls for a specific user. The field comes in the format: :,,...,. If multiple whitelists are set for a given user, they are set-intersected. See -rpcwhitelistdefault documentation for information on default whitelist behavior.", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);