Merge #6045: feat: one more queue for "external" requests from 3rd parties

241f073932 feat: rpc external users are comma separated list (Konstantin Akimov)
68def970ad refactor: re-order arguments options alphabetically (Konstantin Akimov)
c7efd56a07 feat: rpc external users: use 2 queues but no extra threads (Konstantin Akimov)
c575a5808a feat: change handler to '/' for external users, use only rpc user name to choose queue (Konstantin Akimov)
f1c1fd873e feat: implementation for /external handler for RPC (Konstantin Akimov)

Pull request description:

  ## Issue being fixed or feature implemented
  To avoid struggling to response to critical rpc requests, and split them from 3rd parties who uses a node as an external service, there are introduced one more queue of requests that will be served without throttling for instance consensus important rpcs

  ## What was done?
  new command line arguments:
   - `rpcexternaluser` - List of comma-separated usernames for JSON-RPC external connections. If not specified, there's no special queue is created, all requests in one queue
   - `rpcexternalworkqueue=<n>` - Set the depth of the work queue to service external RPC calls

  ## How Has This Been Tested?
  Functional test `rpc_platform_filter.py` is updated to test new functionality

  ## Breaking Changes
  NA

  ## Checklist:
  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas
  - [x] I have added or updated relevant unit/integration/functional/e2e tests
  - [ ] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone

ACKs for top commit:
  UdjinM6:
    utACK 241f073932
  PastaPastaPasta:
    utACK 241f073932

Tree-SHA512: 15b371f24f5302853b85419e2b20c29749d6aae1c98a541d7471f1d3a681643063302c2a5ecce04dfad2da9101ea69d2f08a7e0e11a28609c6011d78273c57a7
This commit is contained in:
pasta 2024-06-24 11:54:10 -05:00
commit 5baa522225
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
4 changed files with 83 additions and 19 deletions

View File

@ -146,7 +146,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna
return multiUserAuthorized(strUserPass); return multiUserAuthorized(strUserPass);
} }
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req) static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req, bool external = false)
{ {
// JSONRPC handles only POST // JSONRPC handles only POST
if (req->GetRequestMethod() != HTTPRequest::POST) { if (req->GetRequestMethod() != HTTPRequest::POST) {

View File

@ -69,12 +69,14 @@ private:
Mutex cs; Mutex cs;
std::condition_variable cond GUARDED_BY(cs); std::condition_variable cond GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs); std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
std::deque<std::unique_ptr<WorkItem>> external_queue GUARDED_BY(cs);
bool running GUARDED_BY(cs); bool running GUARDED_BY(cs);
const size_t maxDepth; const size_t maxDepth;
const size_t m_external_depth;
public: public:
explicit WorkQueue(size_t _maxDepth) : running(true), explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true),
maxDepth(_maxDepth) maxDepth(_maxDepth), m_external_depth(external_depth)
{ {
} }
/** Precondition: worker threads have all stopped (they have been joined). /** Precondition: worker threads have all stopped (they have been joined).
@ -83,13 +85,19 @@ public:
{ {
} }
/** Enqueue a work item */ /** Enqueue a work item */
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs)
{ {
LOCK(cs); LOCK(cs);
if (!running || queue.size() >= maxDepth) { if (!running) {
return false; return false;
} }
if (is_external) {
if (external_queue.size() >= m_external_depth) return false;
external_queue.emplace_back(std::unique_ptr<WorkItem>(item));
} else {
if (queue.size() >= maxDepth) return false;
queue.emplace_back(std::unique_ptr<WorkItem>(item)); queue.emplace_back(std::unique_ptr<WorkItem>(item));
}
cond.notify_one(); cond.notify_one();
return true; return true;
} }
@ -100,12 +108,18 @@ public:
std::unique_ptr<WorkItem> i; std::unique_ptr<WorkItem> i;
{ {
WAIT_LOCK(cs, lock); WAIT_LOCK(cs, lock);
while (running && queue.empty()) while (running && external_queue.empty() && queue.empty())
cond.wait(lock); cond.wait(lock);
if (!running && queue.empty()) if (!running && external_queue.empty() && queue.empty())
break; break;
if (!queue.empty()) {
i = std::move(queue.front()); i = std::move(queue.front());
queue.pop_front(); queue.pop_front();
} else {
i = std::move(external_queue.front());
external_queue.pop_front();
LogPrintf("HTTP: Calling handler for external user...\n");
}
} }
(*i)(); (*i)();
} }
@ -140,6 +154,8 @@ static struct evhttp* eventHTTP = nullptr;
static std::vector<CSubNet> rpc_allow_subnets; static std::vector<CSubNet> rpc_allow_subnets;
//! Work queue for handling longer requests off the event loop thread //! Work queue for handling longer requests off the event loop thread
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr}; static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
//! List of 'external' RPC users
static std::vector<std::string> g_external_usernames;
//! Handlers for (sub)paths //! Handlers for (sub)paths
static std::vector<HTTPPathHandler> pathHandlers; static std::vector<HTTPPathHandler> pathHandlers;
//! Bound listening sockets //! Bound listening sockets
@ -252,17 +268,40 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
break; break;
} }
} }
const bool is_external_request = [&hreq]() -> bool {
if (g_external_usernames.empty()) return false;
const std::string strAuth = hreq->GetHeader("authorization").second;
if (strAuth.substr(0, 6) != "Basic ")
return false;
std::string strUserPass64 = TrimString(strAuth.substr(6));
bool invalid;
std::string strUserPass = DecodeBase64(strUserPass64, &invalid);
if (invalid) return false;
if (strUserPass.find(':') == std::string::npos) return false;
const std::string username{strUserPass.substr(0, strUserPass.find(':'))};
return find(g_external_usernames.begin(), g_external_usernames.end(), username) != g_external_usernames.end();
}();
// Dispatch to worker thread // Dispatch to worker thread
if (i != iend) { if (i != iend) {
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)}; auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
assert(g_work_queue); assert(g_work_queue);
if (g_work_queue->Enqueue(item.get())) {
if (g_work_queue->Enqueue(item.get(), is_external_request)) {
item.release(); /* if true, queue took ownership */ item.release(); /* if true, queue took ownership */
} else {
if (is_external_request)
{
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
} else { } else {
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n"); LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded"); item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
} }
}
} else { } else {
hreq->WriteReply(HTTP_NOT_FOUND); hreq->WriteReply(HTTP_NOT_FOUND);
} }
@ -390,9 +429,13 @@ bool InitHTTPServer()
LogPrint(BCLog::HTTP, "Initialized HTTP server\n"); LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L); int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth); int workQueueDepthExternal = 0;
if (const std::string rpc_externaluser{gArgs.GetArg("-rpcexternaluser", "")}; !rpc_externaluser.empty()) {
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth); workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
g_external_usernames = SplitString(rpc_externaluser, ',');
}
LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal);
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, workQueueDepthExternal);
// transfer ownership to eventBase/HTTP via .release() // transfer ownership to eventBase/HTTP via .release()
eventBase = base_ctr.release(); eventBase = base_ctr.release();
eventHTTP = http_ctr.release(); eventHTTP = http_ctr.release();

View File

@ -767,6 +767,8 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-rpcauth=<userpw>", "Username and HMAC-SHA-256 hashed password for JSON-RPC connections. The field <userpw> comes in the format: <USERNAME>:<SALT>$<HASH>. A canonical python script is included in share/rpcuser. The client then connects normally using the rpcuser=<USERNAME>/rpcpassword=<PASSWORD> pair of arguments. This option can be specified multiple times", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcauth=<userpw>", "Username and HMAC-SHA-256 hashed password for JSON-RPC connections. The field <userpw> comes in the format: <USERNAME>:<SALT>$<HASH>. A canonical python script is included in share/rpcuser. The client then connects normally using the rpcuser=<USERNAME>/rpcpassword=<PASSWORD> pair of arguments. This option can be specified multiple times", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcbind=<addr>[:port]", "Bind to given address to listen for JSON-RPC connections. Do not expose the RPC server to untrusted networks such as the public internet! This option is ignored unless -rpcallowip is also passed. Port is optional and overrides -rpcport. Use [host]:port notation for IPv6. This option can be specified multiple times (default: 127.0.0.1 and ::1 i.e., localhost, or if -rpcallowip has been specified, 0.0.0.0 and :: i.e., all addresses)", ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcbind=<addr>[:port]", "Bind to given address to listen for JSON-RPC connections. Do not expose the RPC server to untrusted networks such as the public internet! This option is ignored unless -rpcallowip is also passed. Port is optional and overrides -rpcport. Use [host]:port notation for IPv6. This option can be specified multiple times (default: 127.0.0.1 and ::1 i.e., localhost, or if -rpcallowip has been specified, 0.0.0.0 and :: i.e., all addresses)", ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpccookiefile=<loc>", "Location of the auth cookie. Relative paths will be prefixed by a net-specific datadir location. (default: data dir)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); argsman.AddArg("-rpccookiefile=<loc>", "Location of the auth cookie. Relative paths will be prefixed by a net-specific datadir location. (default: data dir)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-rpcexternaluser=<users>", "List of comma-separated usernames for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcexternalworkqueue=<n>", strprintf("Set the depth of the work queue to service external RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcpassword=<pw>", "Password for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC); argsman.AddArg("-rpcpassword=<pw>", "Password for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC); argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);

View File

@ -42,7 +42,7 @@ class HTTPBasicsTest(BitcoinTestFramework):
def run_test(self): def run_test(self):
url = urllib.parse.urlparse(self.nodes[0].url) url = urllib.parse.urlparse(self.nodes[0].url)
def test_command(method, params, auth, expexted_status, should_not_match=False): def test_command(method, params, auth, expected_status, should_not_match=False):
conn = http.client.HTTPConnection(url.hostname, url.port) conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect() conn.connect()
body = {"method": method} body = {"method": method}
@ -51,9 +51,9 @@ class HTTPBasicsTest(BitcoinTestFramework):
conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)}) conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
resp = conn.getresponse() resp = conn.getresponse()
if should_not_match: if should_not_match:
assert resp.status != expexted_status assert resp.status != expected_status
else: else:
assert_equal(resp.status, expexted_status) assert_equal(resp.status, expected_status)
conn.close() conn.close()
whitelisted = ["getassetunlockstatuses", whitelisted = ["getassetunlockstatuses",
@ -114,5 +114,24 @@ class HTTPBasicsTest(BitcoinTestFramework):
test_command("debug", ["1"], rpcuser_authpair_operator, 200) test_command("debug", ["1"], rpcuser_authpair_operator, 200)
self.log.info("Restart node with -rpcexternaluser")
self.restart_node(0, extra_args=["-rpcexternaluser=platform-user"])
external_log_str = "HTTP: Calling handler for external user"
expected_log_str = "ThreadRPCServer method="
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
test_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str], unexpected_msgs = [external_log_str]):
test_command("getbestblockhash", [], rpcuser_authpair_operator, 200)
self.log.info("Restart node with multiple external users")
self.restart_node(0, extra_args=["-rpcexternaluser=platform-user,operator"])
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
test_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
test_command("getbestblockhash", [], rpcuser_authpair_operator, 200)
if __name__ == '__main__': if __name__ == '__main__':
HTTPBasicsTest().main() HTTPBasicsTest().main()