Rework handling of CSigSharesManager worker thread (#2703)

This commit is contained in:
UdjinM6 2019-02-15 17:11:50 +03:00 committed by Alexander Block
parent 3e4286a584
commit bedfc262e2
6 changed files with 36 additions and 20 deletions

View File

@ -211,6 +211,7 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC(); InterruptRPC();
InterruptREST(); InterruptREST();
InterruptTorControl(); InterruptTorControl();
llmq::InterruptLLMQSystem();
if (g_connman) if (g_connman)
g_connman->Interrupt(); g_connman->Interrupt();
threadGroup.interrupt_all(); threadGroup.interrupt_all();

View File

@ -29,16 +29,17 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
quorumSigSharesManager = new CSigSharesManager(); quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(unitTests); quorumSigningManager = new CSigningManager(unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler); chainLocksHandler = new CChainLocksHandler(scheduler);
}
quorumSigSharesManager->StartWorkerThread(); void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
} }
void DestroyLLMQSystem() void DestroyLLMQSystem()
{ {
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
}
delete chainLocksHandler; delete chainLocksHandler;
chainLocksHandler = nullptr; chainLocksHandler = nullptr;
delete quorumSigningManager; delete quorumSigningManager;

View File

@ -15,6 +15,7 @@ namespace llmq
static const bool DEFAULT_WATCH_QUORUMS = false; static const bool DEFAULT_WATCH_QUORUMS = false;
void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests); void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests);
void InterruptLLMQSystem();
void DestroyLLMQSystem(); void DestroyLLMQSystem();
} }

View File

@ -176,6 +176,7 @@ CSigSharesInv CBatchedSigShares::ToInv() const
CSigSharesManager::CSigSharesManager() CSigSharesManager::CSigSharesManager()
{ {
StartWorkerThread();
} }
CSigSharesManager::~CSigSharesManager() CSigSharesManager::~CSigSharesManager()
@ -185,24 +186,23 @@ CSigSharesManager::~CSigSharesManager()
void CSigSharesManager::StartWorkerThread() void CSigSharesManager::StartWorkerThread()
{ {
workThread = std::thread([this]() { workThread = std::thread(&TraceThread<std::function<void()> >,
RenameThread("quorum-sigshares"); "sigshares",
WorkThreadMain(); std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
});
} }
void CSigSharesManager::StopWorkerThread() void CSigSharesManager::StopWorkerThread()
{ {
if (stopWorkThread) {
return;
}
stopWorkThread = true;
if (workThread.joinable()) { if (workThread.joinable()) {
workThread.join(); workThread.join();
} }
} }
void CSigSharesManager::InterruptWorkerThread()
{
workInterrupt();
}
void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{ {
// non-masternodes are not interested in sigshares // non-masternodes are not interested in sigshares
@ -1096,8 +1096,16 @@ void CSigSharesManager::BanNode(NodeId nodeId)
void CSigSharesManager::WorkThreadMain() void CSigSharesManager::WorkThreadMain()
{ {
int64_t lastProcessTime = GetTimeMillis(); workInterrupt.reset();
while (!stopWorkThread && !ShutdownRequested()) {
while (!workInterrupt) {
if (!quorumSigningManager || !g_connman || !quorumSigningManager) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
continue;
}
RemoveBannedNodeStates(); RemoveBannedNodeStates();
quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman); quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman);
ProcessPendingSigShares(*g_connman); ProcessPendingSigShares(*g_connman);
@ -1107,7 +1115,9 @@ void CSigSharesManager::WorkThreadMain()
quorumSigningManager->Cleanup(); quorumSigningManager->Cleanup();
// TODO Wakeup when pending signing is needed? // TODO Wakeup when pending signing is needed?
MilliSleep(100); if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
} }
} }

View File

@ -194,7 +194,7 @@ private:
CCriticalSection cs; CCriticalSection cs;
std::thread workThread; std::thread workThread;
std::atomic<bool> stopWorkThread{false}; CThreadInterrupt workInterrupt;
std::map<SigShareKey, CSigShare> sigShares; std::map<SigShareKey, CSigShare> sigShares;
std::map<uint256, int64_t> firstSeenForSessions; std::map<uint256, int64_t> firstSeenForSessions;
@ -214,8 +214,7 @@ public:
CSigSharesManager(); CSigSharesManager();
~CSigSharesManager(); ~CSigSharesManager();
void StartWorkerThread(); void InterruptWorkerThread();
void StopWorkerThread();
public: public:
void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
@ -224,6 +223,9 @@ public:
void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
private: private:
void StartWorkerThread();
void StopWorkerThread();
void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);

View File

@ -97,6 +97,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
TestingSetup::~TestingSetup() TestingSetup::~TestingSetup()
{ {
UnregisterNodeSignals(GetNodeSignals()); UnregisterNodeSignals(GetNodeSignals());
llmq::InterruptLLMQSystem();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
threadGroup.join_all(); threadGroup.join_all();
UnloadBlockIndex(); UnloadBlockIndex();