llmq: Some fixes/improvements (#3943)

* llmq: Detach dash-q-cachepop from caller

There should be no reason to keep this tread attached
to its parent, if so, let me know.

* llmq: Avoid nullptr access for pindexStart in ScanQuorums

* llmq: Add cacheKey in ProcessCommitment

* llmq: Erase minable commitments if they have been processed

* llmq: Add CLLMQUtils::InitQuorumsCache

* llmq: Use unordered_lru_cache for quorumsCache and rename it

* llmq: Use unordered_lru_cache for hasMinedCommitmentCache and rename it

* llmq: Drop redundant check

* llmq: Rename nMaxCount2 -> nScanCommitments

* llmq: Refactor storeCache -> fCacheExists

* llmq: Rename maxCount -> nCountRequested

* llmq: Rename result -> vecResultQuorums

* llmq: Return an empty vector if the are zero elements requested

* unordered_lru_cache: Add max_size()

* llmq: Partially reuse existing cache if more than max is requested

* llmq: std::map<LLMQType, unordered_lru_cache<...>> for scanQuoumsCache

* llmq: Drop params

* llmq: Only emplace to cache if there is something available
This commit is contained in:
dustinface 2021-01-25 10:22:28 +01:00 committed by GitHub
parent f69d0f4d64
commit dc2473cd04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 60 deletions

View File

@ -43,11 +43,6 @@ CQuorum::~CQuorum()
{
// most likely the thread is already done
stopCachePopulatorThread = true;
// watch out to not join the thread when we're called from inside the thread, which might happen on shutdown. This
// is because on shutdown the thread is the last owner of the shared CQuorum instance and thus the destroyer of it.
if (cachePopulatorThread.joinable() && cachePopulatorThread.get_id() != std::this_thread::get_id()) {
cachePopulatorThread.join();
}
}
void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum, const uint256& _minedBlockHash, const std::vector<CDeterministicMNCPtr>& _members)
@ -152,6 +147,7 @@ void CQuorum::StartCachePopulatorThread(std::shared_ptr<CQuorum> _this)
}
LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- done. time=%d\n", t.count());
});
_this->cachePopulatorThread.detach();
}
CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
@ -159,6 +155,8 @@ CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessi
blsWorker(_blsWorker),
dkgManager(_dkgManager)
{
CLLMQUtils::InitQuorumsCache(mapQuorumsCache);
CLLMQUtils::InitQuorumsCache(scanQuorumsCache);
}
void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const
@ -240,7 +238,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l
CQuorum::StartCachePopulatorThread(quorum);
}
quorumsCache.emplace(std::make_pair(llmqType, quorumHash), quorum);
mapQuorumsCache[llmqType].emplace(quorumHash, quorum);
return quorum;
}
@ -286,62 +284,73 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const uint256& quor
return quorumBlockProcessor->HasMinedCommitment(llmqType, quorumHash);
}
std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t maxCount) const
std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const
{
const CBlockIndex* pindex;
{
LOCK(cs_main);
pindex = chainActive.Tip();
}
return ScanQuorums(llmqType, pindex, maxCount);
return ScanQuorums(llmqType, pindex, nCountRequested);
}
std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t maxCount) const
std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t nCountRequested) const
{
auto& params = Params().GetConsensus().llmqs.at(llmqType);
if (pindexStart == nullptr || nCountRequested == 0) {
return {};
}
auto cacheKey = std::make_pair(llmqType, pindexStart->GetBlockHash());
const size_t cacheMaxSize = params.signingActiveQuorumCount + 1;
bool fCacheExists{false};
void* pIndexScanCommitments{(void*)pindexStart};
size_t nScanCommitments{nCountRequested};
std::vector<CQuorumCPtr> vecResultQuorums;
std::vector<CQuorumCPtr> result;
if (maxCount <= cacheMaxSize) {
{
LOCK(quorumsCacheCs);
if (scanQuorumsCache.get(cacheKey, result)) {
if (result.size() > maxCount) {
result.resize(maxCount);
auto& cache = scanQuorumsCache[llmqType];
fCacheExists = cache.get(pindexStart->GetBlockHash(), vecResultQuorums);
if (fCacheExists) {
// We have exactly what requested so just return it
if (vecResultQuorums.size() == nCountRequested) {
return vecResultQuorums;
}
return result;
// If we have more cached than requested return only a subvector
if (vecResultQuorums.size() > nCountRequested) {
return {vecResultQuorums.begin(), vecResultQuorums.begin() + nCountRequested};
}
// If we have cached quorums but not enough, subtract what we have from the count and the set correct index where to start
// scanning for the rests
if(vecResultQuorums.size() > 0) {
nScanCommitments -= vecResultQuorums.size();
pIndexScanCommitments = (void*)vecResultQuorums.back()->pindexQuorum->pprev;
}
} else {
// If there is nothing in cache request at least cache.max_size() because this gets cached then later
nScanCommitments = std::max(nCountRequested, cache.max_size());
}
}
bool storeCache = false;
size_t maxCount2 = maxCount;
if (maxCount2 <= cacheMaxSize) {
maxCount2 = cacheMaxSize;
storeCache = true;
}
auto quorumIndexes = quorumBlockProcessor->GetMinedCommitmentsUntilBlock(params.type, pindexStart, maxCount2);
result.reserve(quorumIndexes.size());
// Get the block indexes of the mined commitments to build the required quorums from
auto quorumIndexes = quorumBlockProcessor->GetMinedCommitmentsUntilBlock(llmqType, (const CBlockIndex*)pIndexScanCommitments, nScanCommitments);
vecResultQuorums.reserve(vecResultQuorums.size() + quorumIndexes.size());
for (auto& quorumIndex : quorumIndexes) {
assert(quorumIndex);
auto quorum = GetQuorum(params.type, quorumIndex);
auto quorum = GetQuorum(llmqType, quorumIndex);
assert(quorum != nullptr);
result.emplace_back(quorum);
vecResultQuorums.emplace_back(quorum);
}
if (storeCache) {
size_t nCountResult{vecResultQuorums.size()};
if (nCountResult > 0 && !fCacheExists) {
LOCK(quorumsCacheCs);
scanQuorumsCache.insert(cacheKey, result);
// Don't cache more than cache.max_size() elements
auto& cache = scanQuorumsCache[llmqType];
size_t nCacheEndIndex = std::min(nCountResult, cache.max_size());
cache.emplace(pindexStart->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex});
}
if (result.size() > maxCount) {
result.resize(maxCount);
}
return result;
// Don't return more than nCountRequested elements
size_t nResultEndIndex = std::min(nCountResult, nCountRequested);
return {vecResultQuorums.begin(), vecResultQuorums.begin() + nResultEndIndex};
}
CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const
@ -372,10 +381,9 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock
}
LOCK(quorumsCacheCs);
auto it = quorumsCache.find(std::make_pair(llmqType, quorumHash));
if (it != quorumsCache.end()) {
return it->second;
CQuorumCPtr pQuorum;
if (mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) {
return pQuorum;
}
return BuildQuorumFromCommitment(llmqType, pindexQuorum);

View File

@ -86,8 +86,8 @@ private:
CDKGSessionManager& dkgManager;
mutable CCriticalSection quorumsCacheCs;
mutable std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumPtr> quorumsCache;
mutable unordered_lru_cache<std::pair<Consensus::LLMQType, uint256>, std::vector<CQuorumCPtr>, StaticSaltedHasher, 32> scanQuorumsCache;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, CQuorumCPtr, StaticSaltedHasher>> mapQuorumsCache;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>> scanQuorumsCache;
public:
CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager);
@ -98,10 +98,10 @@ public:
// all these methods will lock cs_main for a short period of time
CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
std::vector<CQuorumCPtr> ScanQuorums(Consensus::LLMQType llmqType, size_t maxCount) const;
std::vector<CQuorumCPtr> ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const;
// this one is cs_main-free
std::vector<CQuorumCPtr> ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t maxCount) const;
std::vector<CQuorumCPtr> ScanQuorums(Consensus::LLMQType llmqType, const CBlockIndex* pindexStart, size_t nCountRequested) const;
private:
// all private methods here are cs_main-free

View File

@ -25,6 +25,12 @@ static const std::string DB_MINED_COMMITMENT_BY_INVERSED_HEIGHT = "q_mcih";
static const std::string DB_BEST_BLOCK_UPGRADE = "q_bbu2";
CQuorumBlockProcessor::CQuorumBlockProcessor(CEvoDB &_evoDb) :
evoDb(_evoDb)
{
CLLMQUtils::InitQuorumsCache(mapHasMinedCommitmentCache);
}
void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv)
{
if (strCommand == NetMsgType::QFCOMMITMENT) {
@ -220,12 +226,15 @@ bool CQuorumBlockProcessor::ProcessCommitment(int nHeight, const uint256& blockH
}
// Store commitment in DB
evoDb.Write(std::make_pair(DB_MINED_COMMITMENT, std::make_pair(params.type, quorumHash)), std::make_pair(qc, blockHash));
auto cacheKey = std::make_pair(params.type, quorumHash);
evoDb.Write(std::make_pair(DB_MINED_COMMITMENT, cacheKey), std::make_pair(qc, blockHash));
evoDb.Write(BuildInversedHeightKey(params.type, nHeight), quorumIndex->nHeight);
{
LOCK(minableCommitmentsCs);
hasMinedCommitmentCache.erase(std::make_pair(params.type, quorumHash));
mapHasMinedCommitmentCache[qc.llmqType].erase(qc.quorumHash);
minableCommitmentsByQuorum.erase(cacheKey);
minableCommitments.erase(::SerializeHash(qc));
}
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- processed commitment from block. type=%d, quorumHash=%s, signers=%s, validMembers=%d, quorumPublicKey=%s\n", __func__,
@ -254,7 +263,7 @@ bool CQuorumBlockProcessor::UndoBlock(const CBlock& block, const CBlockIndex* pi
evoDb.Erase(BuildInversedHeightKey((Consensus::LLMQType)qc.llmqType, pindex->nHeight));
{
LOCK(minableCommitmentsCs);
hasMinedCommitmentCache.erase(std::make_pair((Consensus::LLMQType)qc.llmqType, qc.quorumHash));
mapHasMinedCommitmentCache[qc.llmqType].erase(qc.quorumHash);
}
// if a reorg happened, we should allow to mine this commitment later
@ -392,21 +401,20 @@ uint256 CQuorumBlockProcessor::GetQuorumBlockHash(Consensus::LLMQType llmqType,
bool CQuorumBlockProcessor::HasMinedCommitment(Consensus::LLMQType llmqType, const uint256& quorumHash)
{
auto cacheKey = std::make_pair(llmqType, quorumHash);
bool fExists;
{
LOCK(minableCommitmentsCs);
auto cacheIt = hasMinedCommitmentCache.find(cacheKey);
if (cacheIt != hasMinedCommitmentCache.end()) {
return cacheIt->second;
if (mapHasMinedCommitmentCache[llmqType].get(quorumHash, fExists)) {
return fExists;
}
}
auto key = std::make_pair(DB_MINED_COMMITMENT, std::make_pair(llmqType, quorumHash));
bool ret = evoDb.Exists(key);
fExists = evoDb.Exists(std::make_pair(DB_MINED_COMMITMENT, std::make_pair(llmqType, quorumHash)));
LOCK(minableCommitmentsCs);
hasMinedCommitmentCache.emplace(cacheKey, ret);
return ret;
mapHasMinedCommitmentCache[llmqType].insert(quorumHash, fExists);
return fExists;
}
bool CQuorumBlockProcessor::GetMinedCommitment(Consensus::LLMQType llmqType, const uint256& quorumHash, CFinalCommitment& retQc, uint256& retMinedBlockHash)

View File

@ -15,6 +15,7 @@
#include <map>
#include <unordered_map>
#include <unordered_lru_cache.h>
class CNode;
class CConnman;
@ -32,10 +33,10 @@ private:
std::map<std::pair<Consensus::LLMQType, uint256>, uint256> minableCommitmentsByQuorum;
std::map<uint256, CFinalCommitment> minableCommitments;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, bool, StaticSaltedHasher> hasMinedCommitmentCache;
std::map<Consensus::LLMQType, unordered_lru_cache<uint256, bool, StaticSaltedHasher>> mapHasMinedCommitmentCache;
public:
explicit CQuorumBlockProcessor(CEvoDB& _evoDb) : evoDb(_evoDb) {}
explicit CQuorumBlockProcessor(CEvoDB& _evoDb);
bool UpgradeDB();

View File

@ -89,6 +89,14 @@ public:
}
return HexStr(vBytes);
}
template <typename CacheType>
static void InitQuorumsCache(CacheType& cache)
{
for (auto& llmq : Params().GetConsensus().llmqs) {
cache.emplace(std::piecewise_construct, std::forward_as_tuple(llmq.first),
std::forward_as_tuple(llmq.second.signingActiveQuorumCount + 1));
}
}
};
const Consensus::LLMQParams& GetLLMQParams(const Consensus::LLMQType llmqType);

View File

@ -27,6 +27,7 @@ public:
assert(_maxSize != 0);
}
size_t max_size() const { return maxSize; }
template<typename Value2>
void _emplace(const Key& key, Value2&& v)