mirror of
https://github.com/dashpay/dash.git
synced 2024-12-24 19:42:46 +01:00
Merge pull request #3153 from codablock/pr_rpc_stop
Backport fixes for "stop" RPC behavior
This commit is contained in:
commit
b4e19f8dd5
@ -4,6 +4,7 @@
|
||||
|
||||
#include "httpserver.h"
|
||||
|
||||
#include "init.h"
|
||||
#include "chainparamsbase.h"
|
||||
#include "compat.h"
|
||||
#include "util.h"
|
||||
@ -20,7 +21,6 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <signal.h>
|
||||
#include <future>
|
||||
|
||||
#include <event2/thread.h>
|
||||
#include <event2/buffer.h>
|
||||
@ -37,6 +37,10 @@
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
/** Maximum size of http request (request line + headers) */
|
||||
static const size_t MAX_HEADERS_SIZE = 8192;
|
||||
|
||||
@ -73,34 +77,13 @@ private:
|
||||
std::deque<std::unique_ptr<WorkItem>> queue;
|
||||
bool running;
|
||||
size_t maxDepth;
|
||||
int numThreads;
|
||||
|
||||
/** RAII object to keep track of number of running worker threads */
|
||||
class ThreadCounter
|
||||
{
|
||||
public:
|
||||
WorkQueue &wq;
|
||||
ThreadCounter(WorkQueue &w): wq(w)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(wq.cs);
|
||||
wq.numThreads += 1;
|
||||
}
|
||||
~ThreadCounter()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(wq.cs);
|
||||
wq.numThreads -= 1;
|
||||
wq.cond.notify_all();
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
WorkQueue(size_t _maxDepth) : running(true),
|
||||
maxDepth(_maxDepth),
|
||||
numThreads(0)
|
||||
maxDepth(_maxDepth)
|
||||
{
|
||||
}
|
||||
/** Precondition: worker threads have all stopped
|
||||
* (call WaitExit)
|
||||
/** Precondition: worker threads have all stopped (they have been joined).
|
||||
*/
|
||||
~WorkQueue()
|
||||
{
|
||||
@ -119,7 +102,6 @@ public:
|
||||
/** Thread function */
|
||||
void Run()
|
||||
{
|
||||
ThreadCounter count(*this);
|
||||
while (true) {
|
||||
std::unique_ptr<WorkItem> i;
|
||||
{
|
||||
@ -141,14 +123,6 @@ public:
|
||||
running = false;
|
||||
cond.notify_all();
|
||||
}
|
||||
/** Wait for worker threads to exit */
|
||||
void WaitExit()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(cs);
|
||||
while (numThreads > 0){
|
||||
cond.wait(lock);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct HTTPPathHandler
|
||||
@ -450,20 +424,17 @@ bool UpdateHTTPServerLogging(bool enable) {
|
||||
}
|
||||
|
||||
std::thread threadHTTP;
|
||||
std::future<bool> threadResult;
|
||||
static std::vector<std::thread> g_thread_http_workers;
|
||||
|
||||
bool StartHTTPServer()
|
||||
{
|
||||
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
|
||||
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
|
||||
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
|
||||
std::packaged_task<bool(event_base*, evhttp*)> task(ThreadHTTP);
|
||||
threadResult = task.get_future();
|
||||
threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);
|
||||
threadHTTP = std::thread(ThreadHTTP, eventBase, eventHTTP);
|
||||
|
||||
for (int i = 0; i < rpcThreads; i++) {
|
||||
std::thread rpc_worker(HTTPWorkQueueRun, workQueue);
|
||||
rpc_worker.detach();
|
||||
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -472,10 +443,6 @@ void InterruptHTTPServer()
|
||||
{
|
||||
LogPrint(BCLog::HTTP, "Interrupting HTTP server\n");
|
||||
if (eventHTTP) {
|
||||
// Unlisten sockets
|
||||
for (evhttp_bound_socket *socket : boundSockets) {
|
||||
evhttp_del_accept_socket(eventHTTP, socket);
|
||||
}
|
||||
// Reject requests on current connections
|
||||
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
|
||||
}
|
||||
@ -488,27 +455,21 @@ void StopHTTPServer()
|
||||
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
|
||||
if (workQueue) {
|
||||
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
|
||||
#ifndef WIN32
|
||||
// ToDo: Disabling WaitExit() for Windows platforms is an ugly workaround for the wallet not
|
||||
// closing during a repair-restart. It doesn't hurt, though, because threadHTTP.timed_join
|
||||
// below takes care of this and sends a loopbreak.
|
||||
workQueue->WaitExit();
|
||||
#endif
|
||||
for (auto& thread: g_thread_http_workers) {
|
||||
thread.join();
|
||||
}
|
||||
g_thread_http_workers.clear();
|
||||
delete workQueue;
|
||||
workQueue = nullptr;
|
||||
}
|
||||
// Unlisten sockets, these are what make the event loop running, which means
|
||||
// that after this and all connections are closed the event loop will quit.
|
||||
for (evhttp_bound_socket *socket : boundSockets) {
|
||||
evhttp_del_accept_socket(eventHTTP, socket);
|
||||
}
|
||||
boundSockets.clear();
|
||||
if (eventBase) {
|
||||
LogPrint(BCLog::HTTP, "Waiting for HTTP event thread to exit\n");
|
||||
// Give event loop a few seconds to exit (to send back last RPC responses), then break it
|
||||
// Before this was solved with event_base_loopexit, but that didn't work as expected in
|
||||
// at least libevent 2.0.21 and always introduced a delay. In libevent
|
||||
// master that appears to be solved, so in the future that solution
|
||||
// could be used again (if desirable).
|
||||
// (see discussion in https://github.com/bitcoin/bitcoin/pull/6990)
|
||||
if (threadResult.valid() && threadResult.wait_for(std::chrono::milliseconds(2000)) == std::future_status::timeout) {
|
||||
LogPrintf("HTTP event loop did not exit within allotted time, sending loopbreak\n");
|
||||
event_base_loopbreak(eventBase);
|
||||
}
|
||||
threadHTTP.join();
|
||||
}
|
||||
if (eventHTTP) {
|
||||
@ -613,6 +574,9 @@ void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
|
||||
void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
|
||||
{
|
||||
assert(!replySent && req);
|
||||
if (ShutdownRequested()) {
|
||||
WriteHeader("Connection", "close");
|
||||
}
|
||||
// Send event to main http thread to send reply message
|
||||
struct evbuffer* evb = evhttp_request_get_output_buffer(req);
|
||||
assert(evb);
|
||||
|
@ -182,6 +182,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
|
||||
{ "echojson", 7, "arg7" },
|
||||
{ "echojson", 8, "arg8" },
|
||||
{ "echojson", 9, "arg9" },
|
||||
{ "stop", 0, "wait" },
|
||||
};
|
||||
|
||||
class CRPCConvertTable
|
||||
|
@ -293,6 +293,9 @@ UniValue help(const JSONRPCRequest& jsonRequest)
|
||||
UniValue stop(const JSONRPCRequest& jsonRequest)
|
||||
{
|
||||
// Accept the deprecated and ignored 'detach' boolean argument
|
||||
// Also accept the hidden 'wait' integer argument (milliseconds)
|
||||
// For instance, 'stop 1000' makes the call wait 1 second before returning
|
||||
// to the client (intended for testing)
|
||||
if (jsonRequest.fHelp || jsonRequest.params.size() > 1)
|
||||
throw std::runtime_error(
|
||||
"stop\n"
|
||||
@ -300,6 +303,9 @@ UniValue stop(const JSONRPCRequest& jsonRequest)
|
||||
// Event loop will exit after current HTTP requests have been handled, so
|
||||
// this reply will get back to the client.
|
||||
StartShutdown();
|
||||
if (jsonRequest.params[0].isNum()) {
|
||||
MilliSleep(jsonRequest.params[0].get_int());
|
||||
}
|
||||
return "Dash Core server stopping";
|
||||
}
|
||||
|
||||
@ -327,7 +333,7 @@ static const CRPCCommand vRPCCommands[] =
|
||||
// --------------------- ------------------------ ----------------------- ------ ----------
|
||||
/* Overall control/query calls */
|
||||
{ "control", "help", &help, true, {"command"} },
|
||||
{ "control", "stop", &stop, true, {} },
|
||||
{ "control", "stop", &stop, true, {"wait"} },
|
||||
{ "control", "uptime", &uptime, true, {} },
|
||||
};
|
||||
|
||||
|
28
test/functional/feature_shutdown.py
Executable file
28
test/functional/feature_shutdown.py
Executable file
@ -0,0 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2018 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test bitcoind shutdown."""
|
||||
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal, get_rpc_proxy
|
||||
from threading import Thread
|
||||
|
||||
def test_long_call(node):
|
||||
block = node.waitfornewblock()
|
||||
assert_equal(block['height'], 0)
|
||||
|
||||
class ShutdownTest(BitcoinTestFramework):
|
||||
|
||||
def set_test_params(self):
|
||||
self.setup_clean_chain = True
|
||||
self.num_nodes = 1
|
||||
|
||||
def run_test(self):
|
||||
node = get_rpc_proxy(self.nodes[0].url, 1, timeout=600, coveragedir=self.nodes[0].coverage_dir)
|
||||
Thread(target=test_long_call, args=(node,)).start()
|
||||
# wait 1 second to ensure event loop waits for current connections to close
|
||||
self.stop_node(0, wait=1000)
|
||||
|
||||
if __name__ == '__main__':
|
||||
ShutdownTest().main()
|
@ -280,16 +280,16 @@ class BitcoinTestFramework(object):
|
||||
for node in self.nodes:
|
||||
coverage.write_all_rpc_commands(self.options.coveragedir, node.rpc)
|
||||
|
||||
def stop_node(self, i):
|
||||
def stop_node(self, i, wait=0):
|
||||
"""Stop a dashd test node"""
|
||||
self.nodes[i].stop_node()
|
||||
self.nodes[i].stop_node(wait=wait)
|
||||
self.nodes[i].wait_until_stopped()
|
||||
|
||||
def stop_nodes(self):
|
||||
def stop_nodes(self, wait=0):
|
||||
"""Stop multiple dashd test nodes"""
|
||||
for node in self.nodes:
|
||||
# Issue RPC to stop nodes
|
||||
node.stop_node()
|
||||
node.stop_node(wait=wait)
|
||||
|
||||
for node in self.nodes:
|
||||
# Wait for nodes to stop
|
||||
|
@ -119,13 +119,13 @@ class TestNode():
|
||||
wallet_path = "wallet/%s" % wallet_name
|
||||
return self.rpc / wallet_path
|
||||
|
||||
def stop_node(self):
|
||||
def stop_node(self, wait=0):
|
||||
"""Stop the node."""
|
||||
if not self.running:
|
||||
return
|
||||
self.log.debug("Stopping node")
|
||||
try:
|
||||
self.stop()
|
||||
self.stop(wait=wait)
|
||||
except http.client.CannotSendRequest:
|
||||
self.log.exception("Unable to stop node.")
|
||||
del self.p2ps[:]
|
||||
|
@ -137,6 +137,7 @@ BASE_SCRIPTS= [
|
||||
'resendwallettransactions.py',
|
||||
'minchainwork.py',
|
||||
'p2p-acceptblock.py', # NOTE: needs dash_hash to pass
|
||||
'feature_shutdown.py',
|
||||
]
|
||||
|
||||
EXTENDED_SCRIPTS = [
|
||||
|
Loading…
Reference in New Issue
Block a user