From dc2473cd0474dc357399a75b378cafd0c61050c5 Mon Sep 17 00:00:00 2001 From: dustinface <35775977+xdustinface@users.noreply.github.com> Date: Mon, 25 Jan 2021 10:22:28 +0100 Subject: [PATCH] llmq: Some fixes/improvements (#3943) * llmq: Detach dash-q-cachepop from caller There should be no reason to keep this tread attached to its parent, if so, let me know. * llmq: Avoid nullptr access for pindexStart in ScanQuorums * llmq: Add cacheKey in ProcessCommitment * llmq: Erase minable commitments if they have been processed * llmq: Add CLLMQUtils::InitQuorumsCache * llmq: Use unordered_lru_cache for quorumsCache and rename it * llmq: Use unordered_lru_cache for hasMinedCommitmentCache and rename it * llmq: Drop redundant check * llmq: Rename nMaxCount2 -> nScanCommitments * llmq: Refactor storeCache -> fCacheExists * llmq: Rename maxCount -> nCountRequested * llmq: Rename result -> vecResultQuorums * llmq: Return an empty vector if the are zero elements requested * unordered_lru_cache: Add max_size() * llmq: Partially reuse existing cache if more than max is requested * llmq: std::map> for scanQuoumsCache * llmq: Drop params * llmq: Only emplace to cache if there is something available --- src/llmq/quorums.cpp | 94 ++++++++++++++++------------- src/llmq/quorums.h | 8 +-- src/llmq/quorums_blockprocessor.cpp | 30 +++++---- src/llmq/quorums_blockprocessor.h | 5 +- src/llmq/quorums_utils.h | 8 +++ src/unordered_lru_cache.h | 1 + 6 files changed, 86 insertions(+), 60 deletions(-) diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index d7dec02f7f..008c03c6ef 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -43,11 +43,6 @@ CQuorum::~CQuorum() { // most likely the thread is already done stopCachePopulatorThread = true; - // watch out to not join the thread when we're called from inside the thread, which might happen on shutdown. This - // is because on shutdown the thread is the last owner of the shared CQuorum instance and thus the destroyer of it. - if (cachePopulatorThread.joinable() && cachePopulatorThread.get_id() != std::this_thread::get_id()) { - cachePopulatorThread.join(); - } } void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum, const uint256& _minedBlockHash, const std::vector& _members) @@ -152,6 +147,7 @@ void CQuorum::StartCachePopulatorThread(std::shared_ptr _this) } LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- done. time=%d\n", t.count()); }); + _this->cachePopulatorThread.detach(); } CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) : @@ -159,6 +155,8 @@ CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessi blsWorker(_blsWorker), dkgManager(_dkgManager) { + CLLMQUtils::InitQuorumsCache(mapQuorumsCache); + CLLMQUtils::InitQuorumsCache(scanQuorumsCache); } void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const @@ -240,7 +238,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l CQuorum::StartCachePopulatorThread(quorum); } - quorumsCache.emplace(std::make_pair(llmqType, quorumHash), quorum); + mapQuorumsCache[llmqType].emplace(quorumHash, quorum); return quorum; } @@ -286,62 +284,73 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const uint256& quor return quorumBlockProcessor->HasMinedCommitment(llmqType, quorumHash); } -std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t maxCount) const +std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const { const CBlockIndex* pindex; { LOCK(cs_main); pindex = chainActive.Tip(); } - return ScanQuorums(llmqType, pindex, maxCount); + return ScanQuorums(llmqType, pindex, nCountRequested); } -std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t maxCount) const +std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t nCountRequested) const { - auto& params = Params().GetConsensus().llmqs.at(llmqType); + if (pindexStart == nullptr || nCountRequested == 0) { + return {}; + } - auto cacheKey = std::make_pair(llmqType, pindexStart->GetBlockHash()); - const size_t cacheMaxSize = params.signingActiveQuorumCount + 1; + bool fCacheExists{false}; + void* pIndexScanCommitments{(void*)pindexStart}; + size_t nScanCommitments{nCountRequested}; + std::vector vecResultQuorums; - std::vector result; - - if (maxCount <= cacheMaxSize) { + { LOCK(quorumsCacheCs); - if (scanQuorumsCache.get(cacheKey, result)) { - if (result.size() > maxCount) { - result.resize(maxCount); + auto& cache = scanQuorumsCache[llmqType]; + fCacheExists = cache.get(pindexStart->GetBlockHash(), vecResultQuorums); + if (fCacheExists) { + // We have exactly what requested so just return it + if (vecResultQuorums.size() == nCountRequested) { + return vecResultQuorums; } - return result; + // If we have more cached than requested return only a subvector + if (vecResultQuorums.size() > nCountRequested) { + return {vecResultQuorums.begin(), vecResultQuorums.begin() + nCountRequested}; + } + // If we have cached quorums but not enough, subtract what we have from the count and the set correct index where to start + // scanning for the rests + if(vecResultQuorums.size() > 0) { + nScanCommitments -= vecResultQuorums.size(); + pIndexScanCommitments = (void*)vecResultQuorums.back()->pindexQuorum->pprev; + } + } else { + // If there is nothing in cache request at least cache.max_size() because this gets cached then later + nScanCommitments = std::max(nCountRequested, cache.max_size()); } } - - bool storeCache = false; - size_t maxCount2 = maxCount; - if (maxCount2 <= cacheMaxSize) { - maxCount2 = cacheMaxSize; - storeCache = true; - } - - auto quorumIndexes = quorumBlockProcessor->GetMinedCommitmentsUntilBlock(params.type, pindexStart, maxCount2); - result.reserve(quorumIndexes.size()); + // Get the block indexes of the mined commitments to build the required quorums from + auto quorumIndexes = quorumBlockProcessor->GetMinedCommitmentsUntilBlock(llmqType, (const CBlockIndex*)pIndexScanCommitments, nScanCommitments); + vecResultQuorums.reserve(vecResultQuorums.size() + quorumIndexes.size()); for (auto& quorumIndex : quorumIndexes) { assert(quorumIndex); - auto quorum = GetQuorum(params.type, quorumIndex); + auto quorum = GetQuorum(llmqType, quorumIndex); assert(quorum != nullptr); - result.emplace_back(quorum); + vecResultQuorums.emplace_back(quorum); } - if (storeCache) { + size_t nCountResult{vecResultQuorums.size()}; + if (nCountResult > 0 && !fCacheExists) { LOCK(quorumsCacheCs); - scanQuorumsCache.insert(cacheKey, result); + // Don't cache more than cache.max_size() elements + auto& cache = scanQuorumsCache[llmqType]; + size_t nCacheEndIndex = std::min(nCountResult, cache.max_size()); + cache.emplace(pindexStart->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex}); } - - if (result.size() > maxCount) { - result.resize(maxCount); - } - - return result; + // Don't return more than nCountRequested elements + size_t nResultEndIndex = std::min(nCountResult, nCountRequested); + return {vecResultQuorums.begin(), vecResultQuorums.begin() + nResultEndIndex}; } CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const @@ -372,10 +381,9 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock } LOCK(quorumsCacheCs); - - auto it = quorumsCache.find(std::make_pair(llmqType, quorumHash)); - if (it != quorumsCache.end()) { - return it->second; + CQuorumCPtr pQuorum; + if (mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { + return pQuorum; } return BuildQuorumFromCommitment(llmqType, pindexQuorum); diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 0cc059bfb6..8a64e9ffa4 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -86,8 +86,8 @@ private: CDKGSessionManager& dkgManager; mutable CCriticalSection quorumsCacheCs; - mutable std::map, CQuorumPtr> quorumsCache; - mutable unordered_lru_cache, std::vector, StaticSaltedHasher, 32> scanQuorumsCache; + mutable std::map> mapQuorumsCache; + mutable std::map, StaticSaltedHasher>> scanQuorumsCache; public: CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager); @@ -98,10 +98,10 @@ public: // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const; - std::vector ScanQuorums(Consensus::LLMQType llmqType, size_t maxCount) const; + std::vector ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const; // this one is cs_main-free - std::vector ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t maxCount) const; + std::vector ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t nCountRequested) const; private: // all private methods here are cs_main-free diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 6b372b011f..e79dac6c62 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -25,6 +25,12 @@ static const std::string DB_MINED_COMMITMENT_BY_INVERSED_HEIGHT = "q_mcih"; static const std::string DB_BEST_BLOCK_UPGRADE = "q_bbu2"; +CQuorumBlockProcessor::CQuorumBlockProcessor(CEvoDB &_evoDb) : + evoDb(_evoDb) +{ + CLLMQUtils::InitQuorumsCache(mapHasMinedCommitmentCache); +} + void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv) { if (strCommand == NetMsgType::QFCOMMITMENT) { @@ -220,12 +226,15 @@ bool CQuorumBlockProcessor::ProcessCommitment(int nHeight, const uint256& blockH } // Store commitment in DB - evoDb.Write(std::make_pair(DB_MINED_COMMITMENT, std::make_pair(params.type, quorumHash)), std::make_pair(qc, blockHash)); + auto cacheKey = std::make_pair(params.type, quorumHash); + evoDb.Write(std::make_pair(DB_MINED_COMMITMENT, cacheKey), std::make_pair(qc, blockHash)); evoDb.Write(BuildInversedHeightKey(params.type, nHeight), quorumIndex->nHeight); { LOCK(minableCommitmentsCs); - hasMinedCommitmentCache.erase(std::make_pair(params.type, quorumHash)); + mapHasMinedCommitmentCache[qc.llmqType].erase(qc.quorumHash); + minableCommitmentsByQuorum.erase(cacheKey); + minableCommitments.erase(::SerializeHash(qc)); } LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- processed commitment from block. type=%d, quorumHash=%s, signers=%s, validMembers=%d, quorumPublicKey=%s\n", __func__, @@ -254,7 +263,7 @@ bool CQuorumBlockProcessor::UndoBlock(const CBlock& block, const CBlockIndex* pi evoDb.Erase(BuildInversedHeightKey((Consensus::LLMQType)qc.llmqType, pindex->nHeight)); { LOCK(minableCommitmentsCs); - hasMinedCommitmentCache.erase(std::make_pair((Consensus::LLMQType)qc.llmqType, qc.quorumHash)); + mapHasMinedCommitmentCache[qc.llmqType].erase(qc.quorumHash); } // if a reorg happened, we should allow to mine this commitment later @@ -392,21 +401,20 @@ uint256 CQuorumBlockProcessor::GetQuorumBlockHash(Consensus::LLMQType llmqType, bool CQuorumBlockProcessor::HasMinedCommitment(Consensus::LLMQType llmqType, const uint256& quorumHash) { - auto cacheKey = std::make_pair(llmqType, quorumHash); + bool fExists; { LOCK(minableCommitmentsCs); - auto cacheIt = hasMinedCommitmentCache.find(cacheKey); - if (cacheIt != hasMinedCommitmentCache.end()) { - return cacheIt->second; + if (mapHasMinedCommitmentCache[llmqType].get(quorumHash, fExists)) { + return fExists; } } - auto key = std::make_pair(DB_MINED_COMMITMENT, std::make_pair(llmqType, quorumHash)); - bool ret = evoDb.Exists(key); + fExists = evoDb.Exists(std::make_pair(DB_MINED_COMMITMENT, std::make_pair(llmqType, quorumHash))); LOCK(minableCommitmentsCs); - hasMinedCommitmentCache.emplace(cacheKey, ret); - return ret; + mapHasMinedCommitmentCache[llmqType].insert(quorumHash, fExists); + + return fExists; } bool CQuorumBlockProcessor::GetMinedCommitment(Consensus::LLMQType llmqType, const uint256& quorumHash, CFinalCommitment& retQc, uint256& retMinedBlockHash) diff --git a/src/llmq/quorums_blockprocessor.h b/src/llmq/quorums_blockprocessor.h index f3fb130ed1..e8aae0934c 100644 --- a/src/llmq/quorums_blockprocessor.h +++ b/src/llmq/quorums_blockprocessor.h @@ -15,6 +15,7 @@ #include #include +#include class CNode; class CConnman; @@ -32,10 +33,10 @@ private: std::map, uint256> minableCommitmentsByQuorum; std::map minableCommitments; - std::unordered_map, bool, StaticSaltedHasher> hasMinedCommitmentCache; + std::map> mapHasMinedCommitmentCache; public: - explicit CQuorumBlockProcessor(CEvoDB& _evoDb) : evoDb(_evoDb) {} + explicit CQuorumBlockProcessor(CEvoDB& _evoDb); bool UpgradeDB(); diff --git a/src/llmq/quorums_utils.h b/src/llmq/quorums_utils.h index 01d6d54419..67484b6e3b 100644 --- a/src/llmq/quorums_utils.h +++ b/src/llmq/quorums_utils.h @@ -89,6 +89,14 @@ public: } return HexStr(vBytes); } + template + static void InitQuorumsCache(CacheType& cache) + { + for (auto& llmq : Params().GetConsensus().llmqs) { + cache.emplace(std::piecewise_construct, std::forward_as_tuple(llmq.first), + std::forward_as_tuple(llmq.second.signingActiveQuorumCount + 1)); + } + } }; const Consensus::LLMQParams& GetLLMQParams(const Consensus::LLMQType llmqType); diff --git a/src/unordered_lru_cache.h b/src/unordered_lru_cache.h index 385da8246b..17778e3202 100644 --- a/src/unordered_lru_cache.h +++ b/src/unordered_lru_cache.h @@ -27,6 +27,7 @@ public: assert(_maxSize != 0); } + size_t max_size() const { return maxSize; } template void _emplace(const Key& key, Value2&& v)