llmq: Use thread pool for quorum cache and data recovery threads (#4008)

* llmq: Use thread pool for quorum cache and data recovery threads

Move quorum data and cache thread handling into CQuorumManager.

* llmq: Fix explicit capture list

* llmq: Directly push the lamdas to workerPool

Co-authored-by: xdustinface <xdustinfacex@gmail.com>
This commit is contained in:
UdjinM6 2021-02-20 23:43:43 +03:00 committed by GitHub
parent 10b0c70079
commit c631d042c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 211 additions and 191 deletions

View File

@ -44,16 +44,11 @@ static uint256 MakeQuorumKey(const CQuorum& q)
return hw.GetHash();
}
CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker) : params(_params), blsCache(_blsWorker), stopQuorumThreads(false)
CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker) : params(_params), blsCache(_blsWorker)
{
interruptQuorumDataReceived.reset();
}
CQuorum::~CQuorum()
{
interruptQuorumDataReceived();
stopQuorumThreads = true;
}
CQuorum::~CQuorum() = default;
void CQuorum::Init(const CFinalCommitment& _qc, const CBlockIndex* _pindexQuorum, const uint256& _minedBlockHash, const std::vector<CDeterministicMNCPtr>& _members)
{
@ -155,177 +150,6 @@ bool CQuorum::ReadContributions(CEvoDB& evoDb)
return true;
}
void CQuorum::StartCachePopulatorThread(std::shared_ptr<CQuorum> _this)
{
if (_this->quorumVvec == nullptr) {
return;
}
cxxtimer::Timer t(true);
LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- start\n");
// this thread will exit after some time
// when then later some other thread tries to get keys, it will be much faster
_this->cachePopulatorThread = std::thread([_this, t]() {
RenameThread("dash-q-cachepop");
for (size_t i = 0; i < _this->members.size() && !_this->stopQuorumThreads && !ShutdownRequested(); i++) {
if (_this->qc.validMembers[i]) {
_this->GetPubKeyShare(i);
}
}
LogPrint(BCLog::LLMQ, "CQuorum::StartCachePopulatorThread -- done. time=%d\n", t.count());
});
_this->cachePopulatorThread.detach();
}
size_t CQuorum::GetQuorumRecoveryStartOffset(const CBlockIndex* pIndex) const
{
const size_t nActiveQuorums = params.signingActiveQuorumCount + 1;
const std::vector<CQuorumCPtr> vecQuorums = quorumManager->ScanQuorums(qc.llmqType, pIndex, nActiveQuorums);
assert(vecQuorums.size() > 0);
std::set<uint256> setAllTypeMembers;
for (auto& pQuorum : vecQuorums) {
auto& vecValid = pQuorum->qc.validMembers;
for (size_t i = 0; i< vecValid.size(); ++i) {
if (vecValid[i]) {
setAllTypeMembers.emplace(pQuorum->members[i]->proTxHash);
}
}
}
std::vector<uint256> vecAllTypeMembers{setAllTypeMembers.begin(), setAllTypeMembers.end()};
std::sort(vecAllTypeMembers.begin(), vecAllTypeMembers.end());
size_t nMyIndex{0};
for (size_t i = 0; i< vecAllTypeMembers.size(); ++i) {
if (activeMasternodeInfo.proTxHash == vecAllTypeMembers[i]) {
nMyIndex = i;
break;
}
}
return nMyIndex % qc.validMembers.size();
}
void CQuorum::StartQuorumDataRecoveryThread(const CQuorumCPtr _this, const CBlockIndex* pIndex, uint16_t nDataMaskIn)
{
if (_this->fQuorumDataRecoveryThreadRunning) {
LogPrint(BCLog::LLMQ, "CQuorum::%s -- Already running\n", __func__);
return;
}
_this->fQuorumDataRecoveryThreadRunning = true;
std::thread([_this, pIndex, nDataMaskIn]() {
const auto& strThreadName = "dash-q-recovery";
RenameThread(strThreadName);
size_t nTries{0};
uint16_t nDataMask{nDataMaskIn};
int64_t nTimeLastSuccess{0};
uint256* pCurrentMemberHash{nullptr};
std::vector<uint256> vecMemberHashes;
const size_t nMyStartOffset{_this->GetQuorumRecoveryStartOffset(pIndex)};
const int64_t nRequestTimeout{10};
auto printLog = [&](const std::string& strMessage) {
const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()};
LogPrint(BCLog::LLMQ, "%s -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n",
strThreadName, strMessage, _this->qc.llmqType, _this->qc.quorumHash.ToString(), nDataMask, nDataMaskIn, strMember, nTries);
};
printLog("Start");
while (!masternodeSync.IsBlockchainSynced() && !_this->stopQuorumThreads && !ShutdownRequested()) {
_this->interruptQuorumDataReceived.reset();
_this->interruptQuorumDataReceived.sleep_for(std::chrono::seconds(nRequestTimeout));
}
if (_this->stopQuorumThreads || ShutdownRequested()) {
printLog("Aborted");
return;
}
vecMemberHashes.reserve(_this->qc.validMembers.size());
for (auto& member : _this->members) {
if (_this->IsValidMember(member->proTxHash) && member->proTxHash != activeMasternodeInfo.proTxHash) {
vecMemberHashes.push_back(member->proTxHash);
}
}
std::sort(vecMemberHashes.begin(), vecMemberHashes.end());
printLog("Try to request");
while (nDataMask > 0 && !_this->stopQuorumThreads && !ShutdownRequested()) {
if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && _this->quorumVvec != nullptr) {
nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
printLog("Received quorumVvec");
}
if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && _this->skShare.IsValid()) {
nDataMask &= ~llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
printLog("Received skShare");
}
if (nDataMask == 0) {
printLog("Success");
break;
}
if ((GetAdjustedTime() - nTimeLastSuccess) > nRequestTimeout) {
if (nTries >= vecMemberHashes.size()) {
printLog("All tried but failed");
break;
}
// Access the member list of the quorum with the calculated offset applied to balance the load equally
pCurrentMemberHash = &vecMemberHashes[(nMyStartOffset + nTries++) % vecMemberHashes.size()];
{
LOCK(cs_data_requests);
auto it = mapQuorumDataRequests.find(std::make_pair(*pCurrentMemberHash, true));
if (it != mapQuorumDataRequests.end() && !it->second.IsExpired()) {
printLog("Already asked");
continue;
}
}
// Sleep a bit depending on the start offset to balance out multiple requests to same masternode
_this->interruptQuorumDataReceived.sleep_for(std::chrono::milliseconds(nMyStartOffset * 100));
nTimeLastSuccess = GetAdjustedTime();
g_connman->AddPendingMasternode(*pCurrentMemberHash);
printLog("Connect");
}
g_connman->ForEachNode([&](CNode* pNode) {
if (pCurrentMemberHash == nullptr || pNode->verifiedProRegTxHash != *pCurrentMemberHash) {
return;
}
if (quorumManager->RequestQuorumData(pNode, _this->qc.llmqType, _this->pindexQuorum, nDataMask, activeMasternodeInfo.proTxHash)) {
nTimeLastSuccess = GetAdjustedTime();
printLog("Requested");
} else {
LOCK(cs_data_requests);
auto it = mapQuorumDataRequests.find(std::make_pair(pNode->verifiedProRegTxHash, true));
if (it == mapQuorumDataRequests.end()) {
printLog("Failed");
pNode->fDisconnect = true;
pCurrentMemberHash = nullptr;
return;
} else if (it->second.IsProcessed()) {
printLog("Processed");
pNode->fDisconnect = true;
pCurrentMemberHash = nullptr;
return;
} else {
printLog("Waiting");
return;
}
}
});
_this->interruptQuorumDataReceived.reset();
_this->interruptQuorumDataReceived.sleep_for(std::chrono::seconds(1));
}
_this->fQuorumDataRecoveryThreadRunning = false;
printLog("Done");
}).detach();
}
CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
evoDb(_evoDb),
blsWorker(_blsWorker),
@ -333,6 +157,27 @@ CQuorumManager::CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessi
{
CLLMQUtils::InitQuorumsCache(mapQuorumsCache);
CLLMQUtils::InitQuorumsCache(scanQuorumsCache);
quorumThreadInterrupt.reset();
}
CQuorumManager::~CQuorumManager()
{
Stop();
}
void CQuorumManager::Start()
{
int workerCount = std::thread::hardware_concurrency() / 2;
workerCount = std::max(std::min(1, workerCount), 4);
workerPool.resize(workerCount);
RenameThreadPool(workerPool, "dash-q-mngr");
}
void CQuorumManager::Stop()
{
quorumThreadInterrupt();
workerPool.clear_queue();
workerPool.stop(true);
}
void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const
@ -383,7 +228,7 @@ void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex)
}
// Finally start the thread which triggers the requests for this quorum
CQuorum::StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask);
StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask);
}
}
}
@ -477,7 +322,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l
// pre-populate caches in the background
// recovering public key shares is quite expensive and would result in serious lags for the first few signing
// sessions if the shares would be calculated on-demand
CQuorum::StartCachePopulatorThread(quorum);
StartCachePopulatorThread(quorum);
}
mapQuorumsCache[llmqType].insert(quorumHash, quorum);
@ -663,6 +508,32 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock
return BuildQuorumFromCommitment(llmqType, pindexQuorum);
}
size_t CQuorumManager::GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const
{
const size_t nActiveQuorums = pQuorum->params.signingActiveQuorumCount + 1;
const std::vector<CQuorumCPtr> vecQuorums = ScanQuorums(pQuorum->params.type, pIndex, nActiveQuorums);
assert(vecQuorums.size() > 0);
std::set<uint256> setAllTypeMembers;
for (auto& q : vecQuorums) {
auto& vecValid = q->qc.validMembers;
for (size_t i = 0; i < vecValid.size(); ++i) {
if (vecValid[i]) {
setAllTypeMembers.emplace(q->members[i]->proTxHash);
}
}
}
std::vector<uint256> vecAllTypeMembers{setAllTypeMembers.begin(), setAllTypeMembers.end()};
std::sort(vecAllTypeMembers.begin(), vecAllTypeMembers.end());
size_t nIndex{0};
for (size_t i = 0; i < vecAllTypeMembers.size(); ++i) {
if (activeMasternodeInfo.proTxHash == vecAllTypeMembers[i]) {
nIndex = i;
break;
}
}
return nIndex % pQuorum->qc.validMembers.size();
}
void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, CDataStream& vRecv)
{
auto strFunc = __func__;
@ -810,7 +681,7 @@ void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand,
vRecv >> verficationVector;
if (pQuorum->SetVerificationVector(verficationVector)) {
CQuorum::StartCachePopulatorThread(pQuorum);
StartCachePopulatorThread(pQuorum);
} else {
errorHandler("Invalid quorum verification vector");
return;
@ -850,9 +721,145 @@ void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand,
}
}
pQuorum->WriteContributions(evoDb);
pQuorum->interruptQuorumDataReceived();
return;
}
}
void CQuorumManager::StartCachePopulatorThread(const CQuorumCPtr pQuorum) const
{
if (pQuorum->quorumVvec == nullptr) {
return;
}
cxxtimer::Timer t(true);
LogPrint(BCLog::LLMQ, "CQuorumManager::StartCachePopulatorThread -- start\n");
// when then later some other thread tries to get keys, it will be much faster
workerPool.push([pQuorum, t, this](int threadId) {
for (size_t i = 0; i < pQuorum->members.size() && !quorumThreadInterrupt; i++) {
if (pQuorum->qc.validMembers[i]) {
pQuorum->GetPubKeyShare(i);
}
}
LogPrint(BCLog::LLMQ, "CQuorumManager::StartCachePopulatorThread -- done. time=%d\n", t.count());
});
}
void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMaskIn) const
{
if (pQuorum->fQuorumDataRecoveryThreadRunning) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Already running\n", __func__);
return;
}
pQuorum->fQuorumDataRecoveryThreadRunning = true;
workerPool.push([pQuorum, pIndex, nDataMaskIn, this](int threadId) {
size_t nTries{0};
uint16_t nDataMask{nDataMaskIn};
int64_t nTimeLastSuccess{0};
uint256* pCurrentMemberHash{nullptr};
std::vector<uint256> vecMemberHashes;
const size_t nMyStartOffset{GetQuorumRecoveryStartOffset(pQuorum, pIndex)};
const int64_t nRequestTimeout{10};
auto printLog = [&](const std::string& strMessage) {
const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()};
LogPrint(BCLog::LLMQ, "CQuorumManager::StartQuorumDataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n",
strMessage, pQuorum->qc.llmqType, pQuorum->qc.quorumHash.ToString(), nDataMask, nDataMaskIn, strMember, nTries);
};
printLog("Start");
while (!masternodeSync.IsBlockchainSynced() && !quorumThreadInterrupt) {
quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout));
}
if (quorumThreadInterrupt) {
printLog("Aborted");
return;
}
vecMemberHashes.reserve(pQuorum->qc.validMembers.size());
for (auto& member : pQuorum->members) {
if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != activeMasternodeInfo.proTxHash) {
vecMemberHashes.push_back(member->proTxHash);
}
}
std::sort(vecMemberHashes.begin(), vecMemberHashes.end());
printLog("Try to request");
while (nDataMask > 0 && !quorumThreadInterrupt) {
if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && pQuorum->quorumVvec != nullptr) {
nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
printLog("Received quorumVvec");
}
if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->skShare.IsValid()) {
nDataMask &= ~llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS;
printLog("Received skShare");
}
if (nDataMask == 0) {
printLog("Success");
break;
}
if ((GetAdjustedTime() - nTimeLastSuccess) > nRequestTimeout) {
if (nTries >= vecMemberHashes.size()) {
printLog("All tried but failed");
break;
}
// Access the member list of the quorum with the calculated offset applied to balance the load equally
pCurrentMemberHash = &vecMemberHashes[(nMyStartOffset + nTries++) % vecMemberHashes.size()];
{
LOCK(cs_data_requests);
auto it = mapQuorumDataRequests.find(std::make_pair(*pCurrentMemberHash, true));
if (it != mapQuorumDataRequests.end() && !it->second.IsExpired()) {
printLog("Already asked");
continue;
}
}
// Sleep a bit depending on the start offset to balance out multiple requests to same masternode
quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(nMyStartOffset * 100));
nTimeLastSuccess = GetAdjustedTime();
g_connman->AddPendingMasternode(*pCurrentMemberHash);
printLog("Connect");
}
g_connman->ForEachNode([&](CNode* pNode) {
if (pCurrentMemberHash == nullptr || pNode->verifiedProRegTxHash != *pCurrentMemberHash) {
return;
}
if (quorumManager->RequestQuorumData(pNode, pQuorum->qc.llmqType, pQuorum->pindexQuorum, nDataMask, activeMasternodeInfo.proTxHash)) {
nTimeLastSuccess = GetAdjustedTime();
printLog("Requested");
} else {
LOCK(cs_data_requests);
auto it = mapQuorumDataRequests.find(std::make_pair(pNode->verifiedProRegTxHash, true));
if (it == mapQuorumDataRequests.end()) {
printLog("Failed");
pNode->fDisconnect = true;
pCurrentMemberHash = nullptr;
return;
} else if (it->second.IsProcessed()) {
printLog("Processed");
pNode->fDisconnect = true;
pCurrentMemberHash = nullptr;
return;
} else {
printLog("Waiting");
return;
}
}
});
quorumThreadInterrupt.sleep_for(std::chrono::seconds(1));
}
pQuorum->fQuorumDataRecoveryThreadRunning = false;
printLog("Done");
});
}
} // namespace llmq

View File

@ -17,6 +17,8 @@
#include <bls/bls.h>
#include <bls/bls_worker.h>
#include <ctpl.h>
namespace llmq
{
@ -154,9 +156,6 @@ private:
// Recovery of public key shares is very slow, so we start a background thread that pre-populates a cache so that
// the public key shares are ready when needed later
mutable CBLSWorkerCache blsCache;
std::atomic<bool> stopQuorumThreads;
std::thread cachePopulatorThread;
mutable CThreadInterrupt interruptQuorumDataReceived;
mutable std::atomic<bool> fQuorumDataRecoveryThreadRunning{false};
public:
@ -177,12 +176,6 @@ public:
private:
void WriteContributions(CEvoDB& evoDb);
bool ReadContributions(CEvoDB& evoDb);
static void StartCachePopulatorThread(std::shared_ptr<CQuorum> _this);
static void StartQuorumDataRecoveryThread(const CQuorumCPtr _this, const CBlockIndex* pIndex, uint16_t nDataMask);
/// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data recovery members of a quorum's
/// memberlist and is calculated based on a list of all member of all active quorums for the given llmqType in a way that each member
/// should receive the same number of request if all active llmqType members requests data from one llmqType quorum.
size_t GetQuorumRecoveryStartOffset(const CBlockIndex* pIndex) const;
};
/**
@ -202,8 +195,15 @@ private:
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, CQuorumPtr, StaticSaltedHasher>> mapQuorumsCache;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>> scanQuorumsCache;
mutable ctpl::thread_pool workerPool;
mutable CThreadInterrupt quorumThreadInterrupt;
public:
CQuorumManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager);
~CQuorumManager();
void Start();
void Stop();
void TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const;
@ -230,6 +230,13 @@ private:
bool BuildQuorumContributions(const CFinalCommitment& fqc, std::shared_ptr<CQuorum>& quorum) const;
CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const CBlockIndex* pindex) const;
/// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data recovery members of a quorum's
/// memberlist and is calculated based on a list of all member of all active quorums for the given llmqType in a way that each member
/// should receive the same number of request if all active llmqType members requests data from one llmqType quorum.
size_t GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const;
void StartCachePopulatorThread(const CQuorumCPtr pQuorum) const;
void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const;
};
extern CQuorumManager* quorumManager;

View File

@ -73,6 +73,9 @@ void StartLLMQSystem()
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StartThreads();
}
if (quorumManager) {
quorumManager->Start();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
quorumSigSharesManager->StartWorkerThread();
@ -97,6 +100,9 @@ void StopLLMQSystem()
quorumSigSharesManager->StopWorkerThread();
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
}
if (quorumManager) {
quorumManager->Stop();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopThreads();
}