refactor: significant Mutex refactoring

Includes:
RecursiveMutex -> Mutex,
renaming of `cs` to something more meaningful,
usage of atomics where trivially possible,
introduce a method CQuorum::SetVerificationVector to avoid needing to lock an internal mutex externally

fix: avoid cs_vvec_shShare double-lock

Co-authored-by: UdjinM6 <udjinm6@users.noreply.github.com>
This commit is contained in:
pasta 2024-03-23 19:55:52 -05:00
parent c3f34dcd98
commit acd0f49d7b
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
11 changed files with 132 additions and 150 deletions

View File

@ -138,13 +138,13 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, const Chainsta
void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret) const
{
LOCK(cs);
LOCK(cs_lockStatus);
ret = localStatus;
}
void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex)
{
LOCK(cs);
LOCK(cs_lockStatus);
auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
@ -157,7 +157,7 @@ void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int
void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqParams, int quorumIndex, const uint256& quorumHash, int quorumHeight)
{
LOCK(cs);
LOCK(cs_lockStatus);
auto it = localStatus.sessions.find(std::make_pair(llmqParams.type, quorumIndex));
if (it == localStatus.sessions.end()) {
@ -176,7 +176,7 @@ void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqP
void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex, std::function<bool(CDKGDebugSessionStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);
auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
@ -190,7 +190,7 @@ void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, in
void CDKGDebugManager::UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int quorumIndex, size_t memberIdx, std::function<bool(CDKGDebugMemberStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);
auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {

View File

@ -96,8 +96,8 @@ public:
class CDKGDebugManager
{
private:
mutable RecursiveMutex cs;
CDKGDebugStatus localStatus GUARDED_BY(cs);
mutable Mutex cs_lockStatus;
CDKGDebugStatus localStatus GUARDED_BY(cs_lockStatus);
public:
CDKGDebugManager();

View File

@ -261,8 +261,6 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con
void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)
{
LOCK(cs_pending);
CDKGLogger logger(*this, __func__, __LINE__);
retBan = false;
@ -336,15 +334,11 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)
logger.Batch("decrypted our contribution share. time=%d", t2.count());
bool verifyPending = false;
receivedSkContributions[member->idx] = skContribution;
vecEncryptedContributions[member->idx] = qc.contributions;
LOCK(cs_pending);
pendingContributionVerifications.emplace_back(member->idx);
if (pendingContributionVerifications.size() >= 32) {
verifyPending = true;
}
if (verifyPending) {
VerifyPendingContributions();
}
}

View File

@ -308,13 +308,13 @@ private:
// we expect to only receive a single vvec and contribution per member, but we must also be able to relay
// conflicting messages as otherwise an attacker might be able to broadcast conflicting (valid+invalid) messages
// and thus split the quorum. Such members are later removed from the quorum.
mutable RecursiveMutex invCs;
mutable Mutex invCs;
std::map<uint256, CDKGContribution> contributions GUARDED_BY(invCs);
std::map<uint256, CDKGComplaint> complaints GUARDED_BY(invCs);
std::map<uint256, CDKGJustification> justifications GUARDED_BY(invCs);
std::map<uint256, CDKGPrematureCommitment> prematureCommitments GUARDED_BY(invCs);
mutable RecursiveMutex cs_pending;
mutable Mutex cs_pending;
std::vector<size_t> pendingContributionVerifications GUARDED_BY(cs_pending);
// filled by ReceivePrematureCommitment and used by FinalizeCommitments

View File

@ -76,7 +76,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,
EraseObjectRequest(from, CInv(invType, hash));
}
LOCK(cs);
LOCK(cs_messages);
if (messagesPerNode[from] >= maxMessagesPerNode) {
// TODO ban?
@ -95,7 +95,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,
std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
{
LOCK(cs);
LOCK(cs_messages);
std::list<BinaryMessage> ret;
while (!pendingMessages.empty() && ret.size() < maxCount) {
@ -108,7 +108,7 @@ std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMes
bool CDKGPendingMessages::HasSeen(const uint256& hash) const
{
LOCK(cs);
LOCK(cs_messages);
return seenMessages.count(hash) != 0;
}
@ -120,7 +120,7 @@ void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)
void CDKGPendingMessages::Clear()
{
LOCK(cs);
LOCK(cs_messages);
pendingMessages.clear();
messagesPerNode.clear();
seenMessages.clear();
@ -135,7 +135,7 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
if (quorumIndex > 0 && !IsQuorumRotationEnabled(params, pindexNew)) {
return;
}
LOCK(cs);
LOCK(cs_phase_qhash);
int quorumStageInt = (pindexNew->nHeight - quorumIndex) % params.dkgInterval;
@ -207,7 +207,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)
std::pair<QuorumPhase, uint256> CDKGSessionHandler::GetPhaseAndQuorumHash() const
{
LOCK(cs);
LOCK(cs_phase_qhash);
return std::make_pair(phase, quorumHash);
}
@ -304,9 +304,8 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember * curSession->GetMyMemberIndex().value_or(0));
int64_t endTime = GetTimeMillis() + sleepTime;
int heightTmp{-1};
int heightStart{-1};
heightTmp = heightStart = WITH_LOCK(cs, return currentHeight);
int heightTmp{currentHeight.load()};
int heightStart{heightTmp};
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - starting sleep for %d ms, curPhase=%d\n", __func__, params.name, quorumIndex, sleepTime, ToUnderlying(curPhase));
@ -315,22 +314,20 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due to stop/shutdown requested\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
{
LOCK(cs);
if (currentHeight > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (currentHeight - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = currentHeight;
}
if (phase != curPhase || quorumHash != expectedQuorumHash) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
auto cur_height = currentHeight.load();
if (cur_height > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (cur_height - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = cur_height;
}
if (WITH_LOCK(cs_phase_qhash, return phase != curPhase || quorumHash != expectedQuorumHash)) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
if (!runWhileWaiting()) {
UninterruptibleSleep(std::chrono::milliseconds{100});
@ -505,18 +502,13 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi
void CDKGSessionHandler::HandleDKGRound()
{
uint256 curQuorumHash;
WaitForNextPhase(std::nullopt, QuorumPhase::Initialized);
{
LOCK(cs);
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
curQuorumHash = quorumHash;
}
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
uint256 curQuorumHash = WITH_LOCK(cs_phase_qhash, return quorumHash);
const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(cs_main, return m_chainstate.m_blockman.LookupBlockIndex(curQuorumHash));

View File

@ -54,13 +54,13 @@ public:
using BinaryMessage = std::pair<NodeId, std::shared_ptr<CDataStream>>;
private:
mutable RecursiveMutex cs;
std::atomic<PeerManager*> m_peerman{nullptr};
const int invType;
size_t maxMessagesPerNode GUARDED_BY(cs);
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs);
std::set<uint256> seenMessages GUARDED_BY(cs);
const size_t maxMessagesPerNode;
mutable Mutex cs_messages;
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs_messages);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs_messages);
std::set<uint256> seenMessages GUARDED_BY(cs_messages);
public:
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
@ -117,7 +117,6 @@ private:
friend class CDKGSessionManager;
private:
mutable RecursiveMutex cs;
std::atomic<bool> stopRequested{false};
CBLSWorker& blsWorker;
@ -134,9 +133,10 @@ private:
const Consensus::LLMQParams params;
const int quorumIndex;
QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle};
int currentHeight GUARDED_BY(cs) {-1};
uint256 quorumHash GUARDED_BY(cs);
std::atomic<int> currentHeight {-1};
mutable Mutex cs_phase_qhash;
QuorumPhase phase GUARDED_BY(cs_phase_qhash) {QuorumPhase::Idle};
uint256 quorumHash GUARDED_BY(cs_phase_qhash);
std::unique_ptr<CDKGSession> curSession;
std::thread phaseHandlerThread;

View File

@ -293,7 +293,7 @@ bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution&
for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Initialized || dkgType.phase > QuorumPhase::Contribute) {
continue;
}
@ -314,7 +314,7 @@ bool CDKGSessionManager::GetComplaint(const uint256& hash, CDKGComplaint& ret) c
for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Contribute || dkgType.phase > QuorumPhase::Complain) {
continue;
}
@ -335,7 +335,7 @@ bool CDKGSessionManager::GetJustification(const uint256& hash, CDKGJustification
for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Complain || dkgType.phase > QuorumPhase::Justify) {
continue;
}
@ -356,7 +356,7 @@ bool CDKGSessionManager::GetPrematureCommitment(const uint256& hash, CDKGPrematu
for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Justify || dkgType.phase > QuorumPhase::Commit) {
continue;
}

View File

@ -93,7 +93,7 @@ bool CQuorum::SetVerificationVector(const std::vector<CBLSPublicKey>& quorumVecI
{
const auto quorumVecInSerialized = ::SerializeHash(quorumVecIn);
LOCK(cs);
LOCK(cs_vvec_shShare);
if (quorumVecInSerialized != qc->quorumVvecHash) {
return false;
}
@ -106,7 +106,7 @@ bool CQuorum::SetSecretKeyShare(const CBLSSecretKey& secretKeyShare, const CActi
if (!secretKeyShare.IsValid() || (secretKeyShare.GetPublicKey() != GetPubKeyShare(GetMemberIndex(mn_activeman.GetProTxHash())))) {
return false;
}
LOCK(cs);
LOCK(cs_vvec_shShare);
skShare = secretKeyShare;
return true;
}
@ -131,8 +131,8 @@ bool CQuorum::IsValidMember(const uint256& proTxHash) const
CBLSPublicKey CQuorum::GetPubKeyShare(size_t memberIdx) const
{
LOCK(cs);
if (!HasVerificationVector() || memberIdx >= members.size() || !qc->validMembers[memberIdx]) {
LOCK(cs_vvec_shShare);
if (!HasVerificationVectorInternal() || memberIdx >= members.size() || !qc->validMembers[memberIdx]) {
return CBLSPublicKey();
}
const auto& m = members[memberIdx];
@ -140,13 +140,18 @@ CBLSPublicKey CQuorum::GetPubKeyShare(size_t memberIdx) const
}
bool CQuorum::HasVerificationVector() const {
LOCK(cs);
LOCK(cs_vvec_shShare);
return HasVerificationVectorInternal();
}
bool CQuorum::HasVerificationVectorInternal() const {
AssertLockHeld(cs_vvec_shShare);
return quorumVvec != nullptr;
}
CBLSSecretKey CQuorum::GetSkShare() const
{
LOCK(cs);
LOCK(cs_vvec_shShare);
return skShare;
}
@ -165,8 +170,8 @@ void CQuorum::WriteContributions(CEvoDB& evoDb) const
{
uint256 dbKey = MakeQuorumKey(*this);
LOCK(cs);
if (HasVerificationVector()) {
LOCK(cs_vvec_shShare);
if (HasVerificationVectorInternal()) {
CDataStream s(SER_DISK, CLIENT_VERSION);
WriteCompactSize(s, quorumVvec->size());
for (auto& pubkey : *quorumVvec) {
@ -196,7 +201,7 @@ bool CQuorum::ReadContributions(CEvoDB& evoDb)
qv.emplace_back(pubkey);
}
LOCK(cs);
LOCK(cs_vvec_shShare);
quorumVvec = std::make_shared<std::vector<CBLSPublicKey>>(std::move(qv));
// We ignore the return value here as it is ok if this fails. If it fails, it usually means that we are not a
// member of the quorum but observed the whole DKG process to have the quorum verification vector.
@ -434,17 +439,14 @@ bool CQuorumManager::BuildQuorumContributions(const CFinalCommitmentPtr& fqc, co
}
cxxtimer::Timer t2(true);
LOCK(quorum->cs);
quorum->quorumVvec = blsWorker.BuildQuorumVerificationVector(vvecs);
quorum->SetVerificationVector(blsWorker.BuildQuorumVerificationVector(vvecs));
if (!quorum->HasVerificationVector()) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build quorumVvec\n", __func__);
// without the quorum vvec, there can't be a skShare, so we fail here. Failure is not fatal here, as it still
// allows to use the quorum as a non-member (verification through the quorum pub key)
return false;
}
quorum->skShare = blsWorker.AggregateSecretKeys(skContributions);
if (!quorum->skShare.IsValid()) {
quorum->skShare.Reset();
if (!quorum->SetSecretKeyShare(blsWorker.AggregateSecretKeys(skContributions), *m_mn_activeman)) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build skShare\n", __func__);
// We don't bail out here as this is not a fatal error and still allows us to recover public key shares (as we
// have a valid quorum vvec at this point)
@ -747,7 +749,7 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t
return sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded);
}
WITH_LOCK(pQuorum->cs, ssResponseData << *pQuorum->quorumVvec);
WITH_LOCK(pQuorum->cs_vvec_shShare, ssResponseData << *pQuorum->quorumVvec);
}
// Check if request wants ENCRYPTED_CONTRIBUTIONS data
@ -821,7 +823,7 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t
if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) {
assert(m_mn_activeman);
if (WITH_LOCK(pQuorum->cs, return pQuorum->quorumVvec->size() != size_t(pQuorum->params.threshold))) {
if (WITH_LOCK(pQuorum->cs_vvec_shShare, return pQuorum->quorumVvec->size() != size_t(pQuorum->params.threshold))) {
return errorHandler("No valid quorum verification vector available", 0); // Don't bump score because we asked for it
}

View File

@ -20,6 +20,7 @@
#include <atomic>
#include <map>
#include <utility>
class CActiveMasternodeManager;
class CBlockIndex;
@ -182,10 +183,10 @@ private:
mutable CBLSWorkerCache blsCache;
mutable std::atomic<bool> fQuorumDataRecoveryThreadRunning{false};
mutable RecursiveMutex cs;
mutable Mutex cs_vvec_shShare;
// These are only valid when we either participated in the DKG or fully watched it
BLSVerificationVectorPtr quorumVvec GUARDED_BY(cs);
CBLSSecretKey skShare GUARDED_BY(cs);
BLSVerificationVectorPtr quorumVvec GUARDED_BY(cs_vvec_shShare);
CBLSSecretKey skShare GUARDED_BY(cs_vvec_shShare);
public:
CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker);
@ -193,9 +194,13 @@ public:
void Init(CFinalCommitmentPtr _qc, const CBlockIndex* _pQuorumBaseBlockIndex, const uint256& _minedBlockHash, Span<CDeterministicMNCPtr> _members);
bool SetVerificationVector(const std::vector<CBLSPublicKey>& quorumVecIn);
void SetVerificationVector(BLSVerificationVectorPtr vvec_in) {
LOCK(cs_vvec_shShare);
quorumVvec = std::move(vvec_in);
}
bool SetSecretKeyShare(const CBLSSecretKey& secretKeyShare, const CActiveMasternodeManager& mn_activeman);
bool HasVerificationVector() const;
bool HasVerificationVector() const LOCKS_EXCLUDED(cs_vvec_shShare);
bool IsMember(const uint256& proTxHash) const;
bool IsValidMember(const uint256& proTxHash) const;
int GetMemberIndex(const uint256& proTxHash) const;
@ -204,6 +209,7 @@ public:
CBLSSecretKey GetSkShare() const;
private:
bool HasVerificationVectorInternal() const EXCLUSIVE_LOCKS_REQUIRED(cs_vvec_shShare);
void WriteContributions(CEvoDB& evoDb) const;
bool ReadContributions(CEvoDB& evoDb);
};

View File

@ -247,7 +247,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForId(Consensus::LLMQType llmqType, const
auto cacheKey = std::make_pair(llmqType, id);
bool ret;
{
LOCK(cs);
LOCK(cs_cache);
if (hasSigForIdCache.get(cacheKey, ret)) {
return ret;
}
@ -257,7 +257,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForId(Consensus::LLMQType llmqType, const
auto k = std::make_tuple(std::string("rs_r"), llmqType, id);
ret = db->Exists(k);
LOCK(cs);
LOCK(cs_cache);
hasSigForIdCache.insert(cacheKey, ret);
return ret;
}
@ -266,7 +266,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash) const
{
bool ret;
{
LOCK(cs);
LOCK(cs_cache);
if (hasSigForSessionCache.get(signHash, ret)) {
return ret;
}
@ -275,7 +275,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash) const
auto k = std::make_tuple(std::string("rs_s"), signHash);
ret = db->Exists(k);
LOCK(cs);
LOCK(cs_cache);
hasSigForSessionCache.insert(signHash, ret);
return ret;
}
@ -284,7 +284,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash) const
{
bool ret;
{
LOCK(cs);
LOCK(cs_cache);
if (hasSigForHashCache.get(hash, ret)) {
return ret;
}
@ -293,7 +293,7 @@ bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash) const
auto k = std::make_tuple(std::string("rs_h"), hash);
ret = db->Exists(k);
LOCK(cs);
LOCK(cs_cache);
hasSigForHashCache.insert(hash, ret);
return ret;
}
@ -361,7 +361,7 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
db->WriteBatch(batch);
{
LOCK(cs);
LOCK(cs_cache);
hasSigForIdCache.insert(std::make_pair(recSig.getLlmqType(), recSig.getId()), true);
hasSigForSessionCache.insert(signHash, true);
hasSigForHashCache.insert(recSig.GetHash(), true);
@ -370,8 +370,6 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey)
{
AssertLockHeld(cs);
CRecoveredSig recSig;
if (!ReadRecoveredSig(llmqType, id, recSig)) {
return;
@ -401,6 +399,7 @@ void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType l
}
}
LOCK(cs_cache);
hasSigForIdCache.erase(std::make_pair(recSig.getLlmqType(), recSig.getId()));
hasSigForSessionCache.erase(signHash);
if (deleteHashKey) {
@ -412,7 +411,6 @@ void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType l
// This will leave the byHash key in-place so that HasRecoveredSigForHash still returns true
void CRecoveredSigsDb::TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id)
{
LOCK(cs);
CDBBatch batch(*db);
RemoveRecoveredSig(batch, llmqType, id, false, false);
db->WriteBatch(batch);
@ -451,15 +449,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
}
CDBBatch batch(*db);
{
LOCK(cs);
for (const auto& e : toDelete) {
RemoveRecoveredSig(batch, e.first, e.second, true, false);
for (const auto& e : toDelete) {
RemoveRecoveredSig(batch, e.first, e.second, true, false);
if (batch.SizeEstimate() >= (1 << 24)) {
db->WriteBatch(batch);
batch.Clear();
}
if (batch.SizeEstimate() >= (1 << 24)) {
db->WriteBatch(batch);
batch.Clear();
}
}
@ -551,7 +546,7 @@ bool CSigningManager::AlreadyHave(const CInv& inv) const
return false;
}
{
LOCK(cs);
LOCK(cs_pending);
if (pendingReconstructedRecoveredSigs.count(inv.hash)) {
return true;
}
@ -607,7 +602,7 @@ PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__,
recoveredSig->buildSignHash().ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), pfrom.GetId());
LOCK(cs);
LOCK(cs_pending);
if (pendingReconstructedRecoveredSigs.count(recoveredSig->GetHash())) {
// no need to perform full verification
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- already pending reconstructed sig, signHash=%s, id=%s, msgHash=%s, node=%d\n", __func__,
@ -649,7 +644,7 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
{
{
LOCK(cs);
LOCK(cs_pending);
if (pendingRecoveredSigs.empty()) {
return;
}
@ -714,7 +709,7 @@ void CSigningManager::ProcessPendingReconstructedRecoveredSigs()
{
decltype(pendingReconstructedRecoveredSigs) m;
{
LOCK(cs);
LOCK(cs_pending);
m = std::move(pendingReconstructedRecoveredSigs);
}
for (const auto& p : m) {
@ -795,43 +790,36 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
return;
}
std::vector<CRecoveredSigsListener*> listeners;
{
LOCK(cs);
listeners = recoveredSigsListeners;
auto signHash = recoveredSig->buildSignHash();
auto signHash = recoveredSig->buildSignHash();
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- valid recSig. signHash=%s, id=%s, msgHash=%s\n", __func__,
signHash.ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString());
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- valid recSig. signHash=%s, id=%s, msgHash=%s\n", __func__,
signHash.ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString());
if (db.HasRecoveredSigForId(llmqType, recoveredSig->getId())) {
CRecoveredSig otherRecoveredSig;
if (db.GetRecoveredSigById(llmqType, recoveredSig->getId(), otherRecoveredSig)) {
auto otherSignHash = otherRecoveredSig.buildSignHash();
if (signHash != otherSignHash) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__,
signHash.ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), otherSignHash.ToString());
} else {
// Looks like we're trying to process a recSig that is already known. This might happen if the same
// recSig comes in through regular QRECSIG messages and at the same time through some other message
// which allowed to reconstruct a recSig (e.g. ISLOCK). In this case, just bail out.
}
return;
if (db.HasRecoveredSigForId(llmqType, recoveredSig->getId())) {
CRecoveredSig otherRecoveredSig;
if (db.GetRecoveredSigById(llmqType, recoveredSig->getId(), otherRecoveredSig)) {
auto otherSignHash = otherRecoveredSig.buildSignHash();
if (signHash != otherSignHash) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__,
signHash.ToString(), recoveredSig->getId().ToString(), recoveredSig->getMsgHash().ToString(), otherSignHash.ToString());
} else {
// This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish
// between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we
// never had that recSig
// Looks like we're trying to process a recSig that is already known. This might happen if the same
// recSig comes in through regular QRECSIG messages and at the same time through some other message
// which allowed to reconstruct a recSig (e.g. ISLOCK). In this case, just bail out.
}
return;
} else {
// This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish
// between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we
// never had that recSig
}
db.WriteRecoveredSig(*recoveredSig);
pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash());
}
db.WriteRecoveredSig(*recoveredSig);
WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash()));
if (m_mn_activeman != nullptr) {
CInv inv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash());
connman.ForEachNode([&](const CNode* pnode) {
@ -841,6 +829,7 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
});
}
auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners);
for (auto& l : listeners) {
l->HandleNewRecoveredSig(*recoveredSig);
}
@ -850,7 +839,7 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
void CSigningManager::PushReconstructedRecoveredSig(const std::shared_ptr<const llmq::CRecoveredSig>& recoveredSig)
{
LOCK(cs);
LOCK(cs_pending);
pendingReconstructedRecoveredSigs.emplace(std::piecewise_construct, std::forward_as_tuple(recoveredSig->GetHash()), std::forward_as_tuple(recoveredSig));
}
@ -876,13 +865,13 @@ void CSigningManager::Cleanup()
void CSigningManager::RegisterRecoveredSigsListener(CRecoveredSigsListener* l)
{
LOCK(cs);
LOCK(cs_listeners);
recoveredSigsListeners.emplace_back(l);
}
void CSigningManager::UnregisterRecoveredSigsListener(CRecoveredSigsListener* l)
{
LOCK(cs);
LOCK(cs_listeners);
auto itRem = std::remove(recoveredSigsListeners.begin(), recoveredSigsListeners.end(), l);
recoveredSigsListeners.erase(itRem, recoveredSigsListeners.end());
}
@ -917,8 +906,6 @@ bool CSigningManager::AsyncSignIfMember(Consensus::LLMQType llmqType, CSigShares
}
{
LOCK(cs);
bool hasVoted = db.HasVotedOnId(llmqType, id);
if (hasVoted) {
uint256 prevMsgHash;

View File

@ -114,10 +114,10 @@ class CRecoveredSigsDb
private:
std::unique_ptr<CDBWrapper> db{nullptr};
mutable RecursiveMutex cs;
mutable unordered_lru_cache<std::pair<Consensus::LLMQType, uint256>, bool, StaticSaltedHasher, 30000> hasSigForIdCache GUARDED_BY(cs);
mutable unordered_lru_cache<uint256, bool, StaticSaltedHasher, 30000> hasSigForSessionCache GUARDED_BY(cs);
mutable unordered_lru_cache<uint256, bool, StaticSaltedHasher, 30000> hasSigForHashCache GUARDED_BY(cs);
mutable Mutex cs_cache;
mutable unordered_lru_cache<std::pair<Consensus::LLMQType, uint256>, bool, StaticSaltedHasher, 30000> hasSigForIdCache GUARDED_BY(cs_cache);
mutable unordered_lru_cache<uint256, bool, StaticSaltedHasher, 30000> hasSigForSessionCache GUARDED_BY(cs_cache);
mutable unordered_lru_cache<uint256, bool, StaticSaltedHasher, 30000> hasSigForHashCache GUARDED_BY(cs_cache);
public:
explicit CRecoveredSigsDb(bool fMemory, bool fWipe);
@ -145,7 +145,7 @@ private:
void MigrateRecoveredSigs();
bool ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret) const;
void RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey) EXCLUSIVE_LOCKS_REQUIRED(cs);
void RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey);
};
class CRecoveredSigsListener
@ -159,7 +159,6 @@ public:
class CSigningManager
{
private:
mutable RecursiveMutex cs;
CRecoveredSigsDb db;
CConnman& connman;
@ -167,15 +166,17 @@ private:
const CQuorumManager& qman;
const std::unique_ptr<PeerManager>& m_peerman;
mutable Mutex cs_pending;
// Incoming and not verified yet
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> pendingRecoveredSigs GUARDED_BY(cs);
std::unordered_map<uint256, std::shared_ptr<const CRecoveredSig>, StaticSaltedHasher> pendingReconstructedRecoveredSigs GUARDED_BY(cs);
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> pendingRecoveredSigs GUARDED_BY(cs_pending);
std::unordered_map<uint256, std::shared_ptr<const CRecoveredSig>, StaticSaltedHasher> pendingReconstructedRecoveredSigs GUARDED_BY(cs_pending);
FastRandomContext rnd GUARDED_BY(cs);
FastRandomContext rnd GUARDED_BY(cs_pending);
int64_t lastCleanupTime{0};
std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs);
mutable Mutex cs_listeners;
std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs_listeners);
public:
CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman,