diff --git a/src/httprpc.cpp b/src/httprpc.cpp index 4990259365..a691e87c00 100644 --- a/src/httprpc.cpp +++ b/src/httprpc.cpp @@ -146,7 +146,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna return multiUserAuthorized(strUserPass); } -static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req) +static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req, bool external = false) { // JSONRPC handles only POST if (req->GetRequestMethod() != HTTPRequest::POST) { diff --git a/src/httpserver.cpp b/src/httpserver.cpp index d0a5ceddba..9fbd0be71b 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -69,12 +69,14 @@ private: Mutex cs; std::condition_variable cond GUARDED_BY(cs); std::deque> queue GUARDED_BY(cs); + std::deque> external_queue GUARDED_BY(cs); bool running GUARDED_BY(cs); const size_t maxDepth; + const size_t m_external_depth; public: - explicit WorkQueue(size_t _maxDepth) : running(true), - maxDepth(_maxDepth) + explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true), + maxDepth(_maxDepth), m_external_depth(external_depth) { } /** Precondition: worker threads have all stopped (they have been joined). @@ -83,13 +85,19 @@ public: { } /** Enqueue a work item */ - bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) + bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs) { LOCK(cs); - if (!running || queue.size() >= maxDepth) { + if (!running) { return false; } - queue.emplace_back(std::unique_ptr(item)); + if (is_external) { + if (external_queue.size() >= m_external_depth) return false; + external_queue.emplace_back(std::unique_ptr(item)); + } else { + if (queue.size() >= maxDepth) return false; + queue.emplace_back(std::unique_ptr(item)); + } cond.notify_one(); return true; } @@ -100,12 +108,18 @@ public: std::unique_ptr i; { WAIT_LOCK(cs, lock); - while (running && queue.empty()) + while (running && external_queue.empty() && queue.empty()) cond.wait(lock); - if (!running && queue.empty()) + if (!running && external_queue.empty() && queue.empty()) break; - i = std::move(queue.front()); - queue.pop_front(); + if (!queue.empty()) { + i = std::move(queue.front()); + queue.pop_front(); + } else { + i = std::move(external_queue.front()); + external_queue.pop_front(); + LogPrintf("HTTP: Calling handler for external user...\n"); + } } (*i)(); } @@ -140,6 +154,8 @@ static struct evhttp* eventHTTP = nullptr; static std::vector rpc_allow_subnets; //! Work queue for handling longer requests off the event loop thread static std::unique_ptr> g_work_queue{nullptr}; +//! List of 'external' RPC users +static std::vector g_external_usernames; //! Handlers for (sub)paths static std::vector pathHandlers; //! Bound listening sockets @@ -252,16 +268,39 @@ static void http_request_cb(struct evhttp_request* req, void* arg) break; } } + const bool is_external_request = [&hreq]() -> bool { + if (g_external_usernames.empty()) return false; + + const std::string strAuth = hreq->GetHeader("authorization").second; + if (strAuth.substr(0, 6) != "Basic ") + return false; + + std::string strUserPass64 = TrimString(strAuth.substr(6)); + bool invalid; + std::string strUserPass = DecodeBase64(strUserPass64, &invalid); + if (invalid) return false; + + if (strUserPass.find(':') == std::string::npos) return false; + const std::string username{strUserPass.substr(0, strUserPass.find(':'))}; + return find(g_external_usernames.begin(), g_external_usernames.end(), username) != g_external_usernames.end(); + }(); // Dispatch to worker thread if (i != iend) { auto item{std::make_unique(std::move(hreq), path, i->handler)}; assert(g_work_queue); - if (g_work_queue->Enqueue(item.get())) { + + if (g_work_queue->Enqueue(item.get(), is_external_request)) { item.release(); /* if true, queue took ownership */ } else { - LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); - item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + if (is_external_request) + { + LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n"); + item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded"); + } else { + LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); + item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); + } } } else { hreq->WriteReply(HTTP_NOT_FOUND); @@ -390,9 +429,13 @@ bool InitHTTPServer() LogPrint(BCLog::HTTP, "Initialized HTTP server\n"); int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); - LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth); - - g_work_queue = std::make_unique>(workQueueDepth); + int workQueueDepthExternal = 0; + if (const std::string rpc_externaluser{gArgs.GetArg("-rpcexternaluser", "")}; !rpc_externaluser.empty()) { + workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); + g_external_usernames = SplitString(rpc_externaluser, ','); + } + LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal); + g_work_queue = std::make_unique>(workQueueDepth, workQueueDepthExternal); // transfer ownership to eventBase/HTTP via .release() eventBase = base_ctr.release(); eventHTTP = http_ctr.release(); diff --git a/src/init.cpp b/src/init.cpp index 3fc7eef7c8..674468b798 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -767,6 +767,8 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-rpcauth=", "Username and HMAC-SHA-256 hashed password for JSON-RPC connections. The field comes in the format: :$. A canonical python script is included in share/rpcuser. The client then connects normally using the rpcuser=/rpcpassword= pair of arguments. This option can be specified multiple times", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcbind=[:port]", "Bind to given address to listen for JSON-RPC connections. Do not expose the RPC server to untrusted networks such as the public internet! This option is ignored unless -rpcallowip is also passed. Port is optional and overrides -rpcport. Use [host]:port notation for IPv6. This option can be specified multiple times (default: 127.0.0.1 and ::1 i.e., localhost, or if -rpcallowip has been specified, 0.0.0.0 and :: i.e., all addresses)", ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpccookiefile=", "Location of the auth cookie. Relative paths will be prefixed by a net-specific datadir location. (default: data dir)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); + argsman.AddArg("-rpcexternaluser=", "List of comma-separated usernames for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); + argsman.AddArg("-rpcexternalworkqueue=", strprintf("Set the depth of the work queue to service external RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcpassword=", "Password for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcport=", strprintf("Listen for JSON-RPC connections on (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcservertimeout=", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC); diff --git a/test/functional/rpc_platform_filter.py b/test/functional/rpc_platform_filter.py index 3ec7ed9dd9..200ed96f48 100755 --- a/test/functional/rpc_platform_filter.py +++ b/test/functional/rpc_platform_filter.py @@ -42,7 +42,7 @@ class HTTPBasicsTest(BitcoinTestFramework): def run_test(self): url = urllib.parse.urlparse(self.nodes[0].url) - def test_command(method, params, auth, expexted_status, should_not_match=False): + def test_command(method, params, auth, expected_status, should_not_match=False): conn = http.client.HTTPConnection(url.hostname, url.port) conn.connect() body = {"method": method} @@ -51,9 +51,9 @@ class HTTPBasicsTest(BitcoinTestFramework): conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)}) resp = conn.getresponse() if should_not_match: - assert resp.status != expexted_status + assert resp.status != expected_status else: - assert_equal(resp.status, expexted_status) + assert_equal(resp.status, expected_status) conn.close() whitelisted = ["getassetunlockstatuses", @@ -114,5 +114,24 @@ class HTTPBasicsTest(BitcoinTestFramework): test_command("debug", ["1"], rpcuser_authpair_operator, 200) + self.log.info("Restart node with -rpcexternaluser") + self.restart_node(0, extra_args=["-rpcexternaluser=platform-user"]) + + external_log_str = "HTTP: Calling handler for external user" + expected_log_str = "ThreadRPCServer method=" + with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]): + test_command("getbestblockhash", [], rpcuser_authpair_platform, 200) + with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str], unexpected_msgs = [external_log_str]): + test_command("getbestblockhash", [], rpcuser_authpair_operator, 200) + + self.log.info("Restart node with multiple external users") + self.restart_node(0, extra_args=["-rpcexternaluser=platform-user,operator"]) + with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]): + test_command("getbestblockhash", [], rpcuser_authpair_platform, 200) + with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]): + test_command("getbestblockhash", [], rpcuser_authpair_operator, 200) + + + if __name__ == '__main__': HTTPBasicsTest().main()