diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 2c1d7e028e..83b4733df1 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -44,16 +44,11 @@ static uint256 MakeQuorumKey(const CQuorum& q) return hw.GetHash(); } -CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker) : params(_params), blsCache(_blsWorker), stopQuorumThreads(false) +CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker) : params(_params), blsCache(_blsWorker) { - interruptQuorumDataReceived.reset(); } -CQuorum::~CQuorum() -{ - interruptQuorumDataReceived(); - stopQuorumThreads = true; -} +CQuorum::~CQuorum() = default; void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum, const uint256& _minedBlockHash, const std::vector& _members) { @@ -155,177 +150,6 @@ bool CQuorum::ReadContributions(CEvoDB& evoDb) return true; } -void CQuorum::StartCachePopulatorThread(std::shared_ptr _this) -{ - if (_this->quorumVvec == nullptr) { - return; - } - - cxxtimer::Timer t(true); - LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- start\n"); - - // this thread will exit after some time - // when then later some other thread tries to get keys, it will be much faster - _this->cachePopulatorThread = std::thread([_this, t]() { - RenameThread("dash-q-cachepop"); - for (size_t i = 0; i < _this->members.size() && !_this->stopQuorumThreads && !ShutdownRequested(); i++) { - if (_this->qc.validMembers[i]) { - _this->GetPubKeyShare(i); - } - } - LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- done. time=%d\n", t.count()); - }); - _this->cachePopulatorThread.detach(); -} - -size_t CQuorum::GetQuorumRecoveryStartOffset(const CBlockIndex* pIndex) const -{ - const size_t nActiveQuorums = params.signingActiveQuorumCount + 1; - const std::vector vecQuorums = quorumManager->ScanQuorums(qc.llmqType, pIndex, nActiveQuorums); - assert(vecQuorums.size() > 0); - std::set setAllTypeMembers; - for (auto& pQuorum : vecQuorums) { - auto& vecValid = pQuorum->qc.validMembers; - for (size_t i = 0; i< vecValid.size(); ++i) { - if (vecValid[i]) { - setAllTypeMembers.emplace(pQuorum->members[i]->proTxHash); - } - } - } - std::vector vecAllTypeMembers{setAllTypeMembers.begin(), setAllTypeMembers.end()}; - std::sort(vecAllTypeMembers.begin(), vecAllTypeMembers.end()); - size_t nMyIndex{0}; - for (size_t i = 0; i< vecAllTypeMembers.size(); ++i) { - if (activeMasternodeInfo.proTxHash == vecAllTypeMembers[i]) { - nMyIndex = i; - break; - } - } - return nMyIndex % qc.validMembers.size(); -} - -void CQuorum::StartQuorumDataRecoveryThread(const CQuorumCPtr _this, const CBlockIndex* pIndex, uint16_t nDataMaskIn) -{ - if (_this->fQuorumDataRecoveryThreadRunning) { - LogPrint(BCLog::LLMQ, "CQuorum::%s -- Already running\n", __func__); - return; - } - _this->fQuorumDataRecoveryThreadRunning = true; - - std::thread([_this, pIndex, nDataMaskIn]() { - const auto& strThreadName = "dash-q-recovery"; - RenameThread(strThreadName); - - size_t nTries{0}; - uint16_t nDataMask{nDataMaskIn}; - int64_t nTimeLastSuccess{0}; - uint256* pCurrentMemberHash{nullptr}; - std::vector vecMemberHashes; - const size_t nMyStartOffset{_this->GetQuorumRecoveryStartOffset(pIndex)}; - const int64_t nRequestTimeout{10}; - - auto printLog = [&](const std::string& strMessage) { - const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()}; - LogPrint(BCLog::LLMQ, "%s -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n", - strThreadName, strMessage, _this->qc.llmqType, _this->qc.quorumHash.ToString(), nDataMask, nDataMaskIn, strMember, nTries); - }; - printLog("Start"); - - while (!masternodeSync.IsBlockchainSynced() && !_this->stopQuorumThreads && !ShutdownRequested()) { - _this->interruptQuorumDataReceived.reset(); - _this->interruptQuorumDataReceived.sleep_for(std::chrono::seconds(nRequestTimeout)); - } - - if (_this->stopQuorumThreads || ShutdownRequested()) { - printLog("Aborted"); - return; - } - - vecMemberHashes.reserve(_this->qc.validMembers.size()); - for (auto& member : _this->members) { - if (_this->IsValidMember(member->proTxHash) && member->proTxHash != activeMasternodeInfo.proTxHash) { - vecMemberHashes.push_back(member->proTxHash); - } - } - std::sort(vecMemberHashes.begin(), vecMemberHashes.end()); - - printLog("Try to request"); - - while (nDataMask > 0 && !_this->stopQuorumThreads && !ShutdownRequested()) { - - if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && _this->quorumVvec != nullptr) { - nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; - printLog("Received quorumVvec"); - } - - if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && _this->skShare.IsValid()) { - nDataMask &= ~llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; - printLog("Received skShare"); - } - - if (nDataMask == 0) { - printLog("Success"); - break; - } - - if ((GetAdjustedTime() - nTimeLastSuccess) > nRequestTimeout) { - if (nTries >= vecMemberHashes.size()) { - printLog("All tried but failed"); - break; - } - // Access the member list of the quorum with the calculated offset applied to balance the load equally - pCurrentMemberHash = &vecMemberHashes[(nMyStartOffset + nTries++) % vecMemberHashes.size()]; - { - LOCK(cs_data_requests); - auto it = mapQuorumDataRequests.find(std::make_pair(*pCurrentMemberHash, true)); - if (it != mapQuorumDataRequests.end() && !it->second.IsExpired()) { - printLog("Already asked"); - continue; - } - } - // Sleep a bit depending on the start offset to balance out multiple requests to same masternode - _this->interruptQuorumDataReceived.sleep_for(std::chrono::milliseconds(nMyStartOffset * 100)); - nTimeLastSuccess = GetAdjustedTime(); - g_connman->AddPendingMasternode(*pCurrentMemberHash); - printLog("Connect"); - } - - g_connman->ForEachNode([&](CNode* pNode) { - - if (pCurrentMemberHash == nullptr || pNode->verifiedProRegTxHash != *pCurrentMemberHash) { - return; - } - - if (quorumManager->RequestQuorumData(pNode, _this->qc.llmqType, _this->pindexQuorum, nDataMask, activeMasternodeInfo.proTxHash)) { - nTimeLastSuccess = GetAdjustedTime(); - printLog("Requested"); - } else { - LOCK(cs_data_requests); - auto it = mapQuorumDataRequests.find(std::make_pair(pNode->verifiedProRegTxHash, true)); - if (it == mapQuorumDataRequests.end()) { - printLog("Failed"); - pNode->fDisconnect = true; - pCurrentMemberHash = nullptr; - return; - } else if (it->second.IsProcessed()) { - printLog("Processed"); - pNode->fDisconnect = true; - pCurrentMemberHash = nullptr; - return; - } else { - printLog("Waiting"); - return; - } - } - }); - _this->interruptQuorumDataReceived.reset(); - _this->interruptQuorumDataReceived.sleep_for(std::chrono::seconds(1)); - } - _this->fQuorumDataRecoveryThreadRunning = false; - printLog("Done"); - }).detach(); -} - CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) : evoDb(_evoDb), blsWorker(_blsWorker), @@ -333,6 +157,27 @@ CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessi { CLLMQUtils::InitQuorumsCache(mapQuorumsCache); CLLMQUtils::InitQuorumsCache(scanQuorumsCache); + quorumThreadInterrupt.reset(); +} + +CQuorumManager::~CQuorumManager() +{ + Stop(); +} + +void CQuorumManager::Start() +{ + int workerCount = std::thread::hardware_concurrency() / 2; + workerCount = std::max(std::min(1, workerCount), 4); + workerPool.resize(workerCount); + RenameThreadPool(workerPool, "dash-q-mngr"); +} + +void CQuorumManager::Stop() +{ + quorumThreadInterrupt(); + workerPool.clear_queue(); + workerPool.stop(true); } void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const @@ -383,7 +228,7 @@ void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) } // Finally start the thread which triggers the requests for this quorum - CQuorum::StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask); + StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask); } } } @@ -477,7 +322,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l // pre-populate caches in the background // recovering public key shares is quite expensive and would result in serious lags for the first few signing // sessions if the shares would be calculated on-demand - CQuorum::StartCachePopulatorThread(quorum); + StartCachePopulatorThread(quorum); } mapQuorumsCache[llmqType].insert(quorumHash, quorum); @@ -663,6 +508,32 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock return BuildQuorumFromCommitment(llmqType, pindexQuorum); } +size_t CQuorumManager::GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const +{ + const size_t nActiveQuorums = pQuorum->params.signingActiveQuorumCount + 1; + const std::vector vecQuorums = ScanQuorums(pQuorum->params.type, pIndex, nActiveQuorums); + assert(vecQuorums.size() > 0); + std::set setAllTypeMembers; + for (auto& q : vecQuorums) { + auto& vecValid = q->qc.validMembers; + for (size_t i = 0; i < vecValid.size(); ++i) { + if (vecValid[i]) { + setAllTypeMembers.emplace(q->members[i]->proTxHash); + } + } + } + std::vector vecAllTypeMembers{setAllTypeMembers.begin(), setAllTypeMembers.end()}; + std::sort(vecAllTypeMembers.begin(), vecAllTypeMembers.end()); + size_t nIndex{0}; + for (size_t i = 0; i < vecAllTypeMembers.size(); ++i) { + if (activeMasternodeInfo.proTxHash == vecAllTypeMembers[i]) { + nIndex = i; + break; + } + } + return nIndex % pQuorum->qc.validMembers.size(); +} + void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, CDataStream& vRecv) { auto strFunc = __func__; @@ -810,7 +681,7 @@ void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, vRecv >> verficationVector; if (pQuorum->SetVerificationVector(verficationVector)) { - CQuorum::StartCachePopulatorThread(pQuorum); + StartCachePopulatorThread(pQuorum); } else { errorHandler("Invalid quorum verification vector"); return; @@ -850,9 +721,145 @@ void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, } } pQuorum->WriteContributions(evoDb); - pQuorum->interruptQuorumDataReceived(); return; } } +void CQuorumManager::StartCachePopulatorThread(const CQuorumCPtr pQuorum) const +{ + if (pQuorum->quorumVvec == nullptr) { + return; + } + + cxxtimer::Timer t(true); + LogPrint(BCLog::LLMQ, "CQuorumManager::StartCachePopulatorThread -- start\n"); + + // when then later some other thread tries to get keys, it will be much faster + workerPool.push([pQuorum, t, this](int threadId) { + for (size_t i = 0; i < pQuorum->members.size() && !quorumThreadInterrupt; i++) { + if (pQuorum->qc.validMembers[i]) { + pQuorum->GetPubKeyShare(i); + } + } + LogPrint(BCLog::LLMQ, "CQuorumManager::StartCachePopulatorThread -- done. time=%d\n", t.count()); + }); +} + +void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMaskIn) const +{ + if (pQuorum->fQuorumDataRecoveryThreadRunning) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Already running\n", __func__); + return; + } + pQuorum->fQuorumDataRecoveryThreadRunning = true; + + workerPool.push([pQuorum, pIndex, nDataMaskIn, this](int threadId) { + size_t nTries{0}; + uint16_t nDataMask{nDataMaskIn}; + int64_t nTimeLastSuccess{0}; + uint256* pCurrentMemberHash{nullptr}; + std::vector vecMemberHashes; + const size_t nMyStartOffset{GetQuorumRecoveryStartOffset(pQuorum, pIndex)}; + const int64_t nRequestTimeout{10}; + + auto printLog = [&](const std::string& strMessage) { + const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()}; + LogPrint(BCLog::LLMQ, "CQuorumManager::StartQuorumDataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n", + strMessage, pQuorum->qc.llmqType, pQuorum->qc.quorumHash.ToString(), nDataMask, nDataMaskIn, strMember, nTries); + }; + printLog("Start"); + + while (!masternodeSync.IsBlockchainSynced() && !quorumThreadInterrupt) { + quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout)); + } + + if (quorumThreadInterrupt) { + printLog("Aborted"); + return; + } + + vecMemberHashes.reserve(pQuorum->qc.validMembers.size()); + for (auto& member : pQuorum->members) { + if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != activeMasternodeInfo.proTxHash) { + vecMemberHashes.push_back(member->proTxHash); + } + } + std::sort(vecMemberHashes.begin(), vecMemberHashes.end()); + + printLog("Try to request"); + + while (nDataMask > 0 && !quorumThreadInterrupt) { + + if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && pQuorum->quorumVvec != nullptr) { + nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; + printLog("Received quorumVvec"); + } + + if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->skShare.IsValid()) { + nDataMask &= ~llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; + printLog("Received skShare"); + } + + if (nDataMask == 0) { + printLog("Success"); + break; + } + + if ((GetAdjustedTime() - nTimeLastSuccess) > nRequestTimeout) { + if (nTries >= vecMemberHashes.size()) { + printLog("All tried but failed"); + break; + } + // Access the member list of the quorum with the calculated offset applied to balance the load equally + pCurrentMemberHash = &vecMemberHashes[(nMyStartOffset + nTries++) % vecMemberHashes.size()]; + { + LOCK(cs_data_requests); + auto it = mapQuorumDataRequests.find(std::make_pair(*pCurrentMemberHash, true)); + if (it != mapQuorumDataRequests.end() && !it->second.IsExpired()) { + printLog("Already asked"); + continue; + } + } + // Sleep a bit depending on the start offset to balance out multiple requests to same masternode + quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(nMyStartOffset * 100)); + nTimeLastSuccess = GetAdjustedTime(); + g_connman->AddPendingMasternode(*pCurrentMemberHash); + printLog("Connect"); + } + + g_connman->ForEachNode([&](CNode* pNode) { + + if (pCurrentMemberHash == nullptr || pNode->verifiedProRegTxHash != *pCurrentMemberHash) { + return; + } + + if (quorumManager->RequestQuorumData(pNode, pQuorum->qc.llmqType, pQuorum->pindexQuorum, nDataMask, activeMasternodeInfo.proTxHash)) { + nTimeLastSuccess = GetAdjustedTime(); + printLog("Requested"); + } else { + LOCK(cs_data_requests); + auto it = mapQuorumDataRequests.find(std::make_pair(pNode->verifiedProRegTxHash, true)); + if (it == mapQuorumDataRequests.end()) { + printLog("Failed"); + pNode->fDisconnect = true; + pCurrentMemberHash = nullptr; + return; + } else if (it->second.IsProcessed()) { + printLog("Processed"); + pNode->fDisconnect = true; + pCurrentMemberHash = nullptr; + return; + } else { + printLog("Waiting"); + return; + } + } + }); + quorumThreadInterrupt.sleep_for(std::chrono::seconds(1)); + } + pQuorum->fQuorumDataRecoveryThreadRunning = false; + printLog("Done"); + }); +} + } // namespace llmq diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 85bf415b8e..290a96616a 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -17,6 +17,8 @@ #include #include +#include + namespace llmq { @@ -154,9 +156,6 @@ private: // Recovery of public key shares is very slow, so we start a background thread that pre-populates a cache so that // the public key shares are ready when needed later mutable CBLSWorkerCache blsCache; - std::atomic stopQuorumThreads; - std::thread cachePopulatorThread; - mutable CThreadInterrupt interruptQuorumDataReceived; mutable std::atomic fQuorumDataRecoveryThreadRunning{false}; public: @@ -177,12 +176,6 @@ public: private: void WriteContributions(CEvoDB& evoDb); bool ReadContributions(CEvoDB& evoDb); - static void StartCachePopulatorThread(std::shared_ptr _this); - static void StartQuorumDataRecoveryThread(const CQuorumCPtr _this, const CBlockIndex* pIndex, uint16_t nDataMask); - /// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data recovery members of a quorum's - /// memberlist and is calculated based on a list of all member of all active quorums for the given llmqType in a way that each member - /// should receive the same number of request if all active llmqType members requests data from one llmqType quorum. - size_t GetQuorumRecoveryStartOffset(const CBlockIndex* pIndex) const; }; /** @@ -202,8 +195,15 @@ private: mutable std::map> mapQuorumsCache; mutable std::map, StaticSaltedHasher>> scanQuorumsCache; + mutable ctpl::thread_pool workerPool; + mutable CThreadInterrupt quorumThreadInterrupt; + public: CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager); + ~CQuorumManager(); + + void Start(); + void Stop(); void TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const; @@ -230,6 +230,13 @@ private: bool BuildQuorumContributions(const CFinalCommitment& fqc, std::shared_ptr& quorum) const; CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const CBlockIndex* pindex) const; + /// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data recovery members of a quorum's + /// memberlist and is calculated based on a list of all member of all active quorums for the given llmqType in a way that each member + /// should receive the same number of request if all active llmqType members requests data from one llmqType quorum. + size_t GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const; + + void StartCachePopulatorThread(const CQuorumCPtr pQuorum) const; + void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const; }; extern CQuorumManager* quorumManager; diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index 845c8b5bc6..b0c962202c 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -73,6 +73,9 @@ void StartLLMQSystem() if (quorumDKGSessionManager) { quorumDKGSessionManager->StartThreads(); } + if (quorumManager) { + quorumManager->Start(); + } if (quorumSigSharesManager) { quorumSigSharesManager->RegisterAsRecoveredSigsListener(); quorumSigSharesManager->StartWorkerThread(); @@ -97,6 +100,9 @@ void StopLLMQSystem() quorumSigSharesManager->StopWorkerThread(); quorumSigSharesManager->UnregisterAsRecoveredSigsListener(); } + if (quorumManager) { + quorumManager->Stop(); + } if (quorumDKGSessionManager) { quorumDKGSessionManager->StopThreads(); }