llmq: Fix thread handling in CDKGSessionManager and CDKGSessionHandler (#3601)

* llqm: Fix thread handling in CDKGSessionManager and CDKGSessionHandler

* llmq: Removed unused thread_pool from CDKGSessionManager

* Tweak `CDKGSessionHandler::StartThread()`

* llmq: Simplify CDKGSessionHandler's thread naming

* llmq: Make sure CDKGSessionHandler uses a valid LLMQ type

Co-Authored-By: UdjinM6 <UdjinM6@users.noreply.github.com>

Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
This commit is contained in:
dustinface 2020-07-16 22:30:59 +02:00 committed by GitHub
parent d5410fd4a9
commit d157718243
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 27 deletions

View File

@ -84,9 +84,8 @@ void CDKGPendingMessages::Clear()
//////
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
params(_params),
messageHandlerPool(_messageHandlerPool),
blsWorker(_blsWorker),
dkgManager(_dkgManager),
curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
@ -95,18 +94,13 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT)
{
phaseHandlerThread = std::thread([this] {
RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str());
PhaseHandlerThread();
});
if (params.type == Consensus::LLMQ_NONE) {
throw std::runtime_error("Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
}
}
CDKGSessionHandler::~CDKGSessionHandler()
{
stopRequested = true;
if (phaseHandlerThread.joinable()) {
phaseHandlerThread.join();
}
}
void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
@ -145,6 +139,24 @@ void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strComm
}
}
void CDKGSessionHandler::StartThread()
{
if (phaseHandlerThread.joinable()) {
throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
}
std::string threadName = strprintf("q-phase-%d", params.type);
phaseHandlerThread = std::thread(&TraceThread<std::function<void()> >, threadName, std::function<void()>(std::bind(&CDKGSessionHandler::PhaseHandlerThread, this)));
}
void CDKGSessionHandler::StopThread()
{
stopRequested = true;
if (phaseHandlerThread.joinable()) {
phaseHandlerThread.join();
}
}
bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pindexQuorum)
{
//AssertLockHeld(cs_main);

View File

@ -103,7 +103,6 @@ private:
std::atomic<bool> stopRequested{false};
const Consensus::LLMQParams& params;
ctpl::thread_pool& messageHandlerPool;
CBLSWorker& blsWorker;
CDKGSessionManager& dkgManager;
@ -120,12 +119,15 @@ private:
CDKGPendingMessages pendingPrematureCommitments;
public:
CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
~CDKGSessionHandler();
void UpdatedBlockTip(const CBlockIndex *pindexNew);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
void StartThread();
void StopThread();
private:
bool InitNewQuorum(const CBlockIndex* pindexQuorum);

View File

@ -24,27 +24,29 @@ CDKGSessionManager::CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWork
llmqDb(_llmqDb),
blsWorker(_blsWorker)
{
for (const auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, blsWorker, *this));
}
}
CDKGSessionManager::~CDKGSessionManager()
{
}
void CDKGSessionManager::StartMessageHandlerPool()
void CDKGSessionManager::StartThreads()
{
for (const auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, messageHandlerPool, blsWorker, *this));
for (auto& it : dkgSessionHandlers) {
it.second.StartThread();
}
messageHandlerPool.resize(2);
RenameThreadPool(messageHandlerPool, "dash-q-msg");
}
void CDKGSessionManager::StopMessageHandlerPool()
void CDKGSessionManager::StopThreads()
{
messageHandlerPool.stop(true);
for (auto& it : dkgSessionHandlers) {
it.second.StopThread();
}
}
void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload)

View File

@ -23,7 +23,6 @@ class CDKGSessionManager
private:
CDBWrapper& llmqDb;
CBLSWorker& blsWorker;
ctpl::thread_pool messageHandlerPool;
std::map<Consensus::LLMQType, CDKGSessionHandler> dkgSessionHandlers;
@ -50,8 +49,8 @@ public:
CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWorker);
~CDKGSessionManager();
void StartMessageHandlerPool();
void StopMessageHandlerPool();
void StartThreads();
void StopThreads();
void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);

View File

@ -70,7 +70,7 @@ void StartLLMQSystem()
blsWorker->Start();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StartMessageHandlerPool();
quorumDKGSessionManager->StartThreads();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
@ -97,7 +97,7 @@ void StopLLMQSystem()
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopMessageHandlerPool();
quorumDKGSessionManager->StopThreads();
}
if (blsWorker) {
blsWorker->Stop();