Separate init/destroy and start/stop steps in LLMQ flow (#2709)

This commit is contained in:
UdjinM6 2019-02-17 14:39:43 +03:00 committed by GitHub
parent 9f58690322
commit 26db020d17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 88 additions and 19 deletions

View File

@ -236,6 +236,7 @@ void PrepareShutdown()
StopREST(); StopREST();
StopRPC(); StopRPC();
StopHTTPServer(); StopHTTPServer();
llmq::StopLLMQSystem();
// fRPCInWarmup should be `false` if we completed the loading sequence // fRPCInWarmup should be `false` if we completed the loading sequence
// before a shutdown request was received // before a shutdown request was received
@ -2066,6 +2067,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
} }
llmq::StartLLMQSystem();
// ********************************************************* Step 12: start node // ********************************************************* Step 12: start node
//// debug print //// debug print

View File

@ -28,10 +28,18 @@ std::string CChainLockSig::ToString() const
CChainLocksHandler::CChainLocksHandler(CScheduler* _scheduler) : CChainLocksHandler::CChainLocksHandler(CScheduler* _scheduler) :
scheduler(_scheduler) scheduler(_scheduler)
{ {
quorumSigningManager->RegisterRecoveredSigsListener(this);
} }
CChainLocksHandler::~CChainLocksHandler() CChainLocksHandler::~CChainLocksHandler()
{
}
void CChainLocksHandler::RegisterAsRecoveredSigsListener()
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}
void CChainLocksHandler::UnregisterAsRecoveredSigsListener()
{ {
quorumSigningManager->UnregisterRecoveredSigsListener(this); quorumSigningManager->UnregisterRecoveredSigsListener(this);
} }

View File

@ -68,7 +68,9 @@ public:
CChainLocksHandler(CScheduler* _scheduler); CChainLocksHandler(CScheduler* _scheduler);
~CChainLocksHandler(); ~CChainLocksHandler();
public: void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();
bool AlreadyHave(const CInv& inv); bool AlreadyHave(const CInv& inv);
bool GetChainLockByHash(const uint256& hash, CChainLockSig& ret); bool GetChainLockByHash(const uint256& hash, CChainLockSig& ret);

View File

@ -110,6 +110,10 @@ UniValue CDKGDebugSessionStatus::ToJson(int detailLevel) const
CDKGDebugManager::CDKGDebugManager(CScheduler* _scheduler) : CDKGDebugManager::CDKGDebugManager(CScheduler* _scheduler) :
scheduler(_scheduler) scheduler(_scheduler)
{
}
void CDKGDebugManager::StartScheduler()
{ {
if (scheduler) { if (scheduler) {
scheduler->scheduleEvery([&]() { scheduler->scheduleEvery([&]() {

View File

@ -158,6 +158,8 @@ private:
public: public:
CDKGDebugManager(CScheduler* _scheduler); CDKGDebugManager(CScheduler* _scheduler);
void StartScheduler();
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
bool PreVerifyDebugStatusMessage(const uint256& hash, CDKGDebugStatus& status, bool& retBan); bool PreVerifyDebugStatusMessage(const uint256& hash, CDKGDebugStatus& status, bool& retBan);
void ScheduleProcessPending(); void ScheduleProcessPending();

View File

@ -24,18 +24,26 @@ static const std::string DB_SKCONTRIB = "qdkg_S";
CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) : CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) :
evoDb(_evoDb), evoDb(_evoDb),
blsWorker(_blsWorker) blsWorker(_blsWorker)
{
}
CDKGSessionManager::~CDKGSessionManager()
{
}
void CDKGSessionManager::StartMessageHandlerPool()
{ {
for (const auto& qt : Params().GetConsensus().llmqs) { for (const auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct, dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first), std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, _evoDb, messageHandlerPool, blsWorker, *this)); std::forward_as_tuple(qt.second, evoDb, messageHandlerPool, blsWorker, *this));
} }
messageHandlerPool.resize(2); messageHandlerPool.resize(2);
RenameThreadPool(messageHandlerPool, "quorum-msg"); RenameThreadPool(messageHandlerPool, "quorum-msg");
} }
CDKGSessionManager::~CDKGSessionManager() void CDKGSessionManager::StopMessageHandlerPool()
{ {
messageHandlerPool.stop(true); messageHandlerPool.stop(true);
} }

View File

@ -50,6 +50,9 @@ public:
CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker); CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker);
~CDKGSessionManager(); ~CDKGSessionManager();
void StartMessageHandlerPool();
void StopMessageHandlerPool();
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);

View File

@ -31,13 +31,6 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
chainLocksHandler = new CChainLocksHandler(scheduler); chainLocksHandler = new CChainLocksHandler(scheduler);
} }
void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
}
void DestroyLLMQSystem() void DestroyLLMQSystem()
{ {
delete chainLocksHandler; delete chainLocksHandler;
@ -56,4 +49,40 @@ void DestroyLLMQSystem()
quorumDKGDebugManager = nullptr; quorumDKGDebugManager = nullptr;
} }
void StartLLMQSystem()
{
if (quorumDKGDebugManager) {
quorumDKGDebugManager->StartScheduler();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StartMessageHandlerPool();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StartWorkerThread();
}
if (chainLocksHandler) {
chainLocksHandler->RegisterAsRecoveredSigsListener();
}
}
void StopLLMQSystem()
{
if (chainLocksHandler) {
chainLocksHandler->UnregisterAsRecoveredSigsListener();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopMessageHandlerPool();
}
}
void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
}
} }

View File

@ -14,10 +14,14 @@ namespace llmq
// If true, we will connect to all new quorums and watch their communication // If true, we will connect to all new quorums and watch their communication
static const bool DEFAULT_WATCH_QUORUMS = false; static const bool DEFAULT_WATCH_QUORUMS = false;
// Init/destroy LLMQ globals
void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests); void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests);
void InterruptLLMQSystem();
void DestroyLLMQSystem(); void DestroyLLMQSystem();
// Manage scheduled tasks, threads, listeners etc.
void StartLLMQSystem();
void StopLLMQSystem();
void InterruptLLMQSystem();
} }
#endif //DASH_QUORUMS_INIT_H #endif //DASH_QUORUMS_INIT_H

View File

@ -148,16 +148,20 @@ CSigSharesInv CBatchedSigShares::ToInv() const
CSigSharesManager::CSigSharesManager() CSigSharesManager::CSigSharesManager()
{ {
StartWorkerThread(); workInterrupt.reset();
} }
CSigSharesManager::~CSigSharesManager() CSigSharesManager::~CSigSharesManager()
{ {
StopWorkerThread();
} }
void CSigSharesManager::StartWorkerThread() void CSigSharesManager::StartWorkerThread()
{ {
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}
workThread = std::thread(&TraceThread<std::function<void()> >, workThread = std::thread(&TraceThread<std::function<void()> >,
"sigshares", "sigshares",
std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this))); std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
@ -165,6 +169,11 @@ void CSigSharesManager::StartWorkerThread()
void CSigSharesManager::StopWorkerThread() void CSigSharesManager::StopWorkerThread()
{ {
// make sure to call InterruptWorkerThread() first
if (!workInterrupt) {
assert(false);
}
if (workThread.joinable()) { if (workThread.joinable()) {
workThread.join(); workThread.join();
} }
@ -1086,8 +1095,6 @@ void CSigSharesManager::BanNode(NodeId nodeId)
void CSigSharesManager::WorkThreadMain() void CSigSharesManager::WorkThreadMain()
{ {
workInterrupt.reset();
int64_t lastSendTime = 0; int64_t lastSendTime = 0;
while (!workInterrupt) { while (!workInterrupt) {

View File

@ -361,6 +361,8 @@ public:
CSigSharesManager(); CSigSharesManager();
~CSigSharesManager(); ~CSigSharesManager();
void StartWorkerThread();
void StopWorkerThread();
void InterruptWorkerThread(); void InterruptWorkerThread();
public: public:
@ -370,9 +372,6 @@ public:
void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
private: private:
void StartWorkerThread();
void StopWorkerThread();
void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);