From bedfc262e2331924ceb4113ce90890f279b4cb3a Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Fri, 15 Feb 2019 17:11:50 +0300 Subject: [PATCH] Rework handling of CSigSharesManager worker thread (#2703) --- src/init.cpp | 1 + src/llmq/quorums_init.cpp | 11 +++++----- src/llmq/quorums_init.h | 1 + src/llmq/quorums_signing_shares.cpp | 34 +++++++++++++++++++---------- src/llmq/quorums_signing_shares.h | 8 ++++--- src/test/test_dash.cpp | 1 + 6 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 618452b73..bb22cf3e6 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -211,6 +211,7 @@ void Interrupt(boost::thread_group& threadGroup) InterruptRPC(); InterruptREST(); InterruptTorControl(); + llmq::InterruptLLMQSystem(); if (g_connman) g_connman->Interrupt(); threadGroup.interrupt_all(); diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index 2b357dae9..30a867b9a 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -29,16 +29,17 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests) quorumSigSharesManager = new CSigSharesManager(); quorumSigningManager = new CSigningManager(unitTests); chainLocksHandler = new CChainLocksHandler(scheduler); +} - quorumSigSharesManager->StartWorkerThread(); +void InterruptLLMQSystem() +{ + if (quorumSigSharesManager) { + quorumSigSharesManager->InterruptWorkerThread(); + } } void DestroyLLMQSystem() { - if (quorumSigSharesManager) { - quorumSigSharesManager->StopWorkerThread(); - } - delete chainLocksHandler; chainLocksHandler = nullptr; delete quorumSigningManager; diff --git a/src/llmq/quorums_init.h b/src/llmq/quorums_init.h index 621674063..a01f1622c 100644 --- a/src/llmq/quorums_init.h +++ b/src/llmq/quorums_init.h @@ -15,6 +15,7 @@ namespace llmq static const bool DEFAULT_WATCH_QUORUMS = false; void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests); +void InterruptLLMQSystem(); void DestroyLLMQSystem(); } diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index f07e220bc..41b21bb3d 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -176,6 +176,7 @@ CSigSharesInv CBatchedSigShares::ToInv() const CSigSharesManager::CSigSharesManager() { + StartWorkerThread(); } CSigSharesManager::~CSigSharesManager() @@ -185,24 +186,23 @@ CSigSharesManager::~CSigSharesManager() void CSigSharesManager::StartWorkerThread() { - workThread = std::thread([this]() { - RenameThread("quorum-sigshares"); - WorkThreadMain(); - }); + workThread = std::thread(&TraceThread >, + "sigshares", + std::function(std::bind(&CSigSharesManager::WorkThreadMain, this))); } void CSigSharesManager::StopWorkerThread() { - if (stopWorkThread) { - return; - } - - stopWorkThread = true; if (workThread.joinable()) { workThread.join(); } } +void CSigSharesManager::InterruptWorkerThread() +{ + workInterrupt(); +} + void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) { // non-masternodes are not interested in sigshares @@ -1096,8 +1096,16 @@ void CSigSharesManager::BanNode(NodeId nodeId) void CSigSharesManager::WorkThreadMain() { - int64_t lastProcessTime = GetTimeMillis(); - while (!stopWorkThread && !ShutdownRequested()) { + workInterrupt.reset(); + + while (!workInterrupt) { + if (!quorumSigningManager || !g_connman || !quorumSigningManager) { + if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + return; + } + continue; + } + RemoveBannedNodeStates(); quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman); ProcessPendingSigShares(*g_connman); @@ -1107,7 +1115,9 @@ void CSigSharesManager::WorkThreadMain() quorumSigningManager->Cleanup(); // TODO Wakeup when pending signing is needed? - MilliSleep(100); + if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + return; + } } } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 19b0f8265..d59756de9 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -194,7 +194,7 @@ private: CCriticalSection cs; std::thread workThread; - std::atomic stopWorkThread{false}; + CThreadInterrupt workInterrupt; std::map sigShares; std::map firstSeenForSessions; @@ -214,8 +214,7 @@ public: CSigSharesManager(); ~CSigSharesManager(); - void StartWorkerThread(); - void StopWorkerThread(); + void InterruptWorkerThread(); public: void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); @@ -224,6 +223,9 @@ public: void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); private: + void StartWorkerThread(); + void StopWorkerThread(); + void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); diff --git a/src/test/test_dash.cpp b/src/test/test_dash.cpp index 4c1db970e..e191a67d1 100644 --- a/src/test/test_dash.cpp +++ b/src/test/test_dash.cpp @@ -97,6 +97,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha TestingSetup::~TestingSetup() { UnregisterNodeSignals(GetNodeSignals()); + llmq::InterruptLLMQSystem(); threadGroup.interrupt_all(); threadGroup.join_all(); UnloadBlockIndex();