refactor: move CConnman, PeerManager out of CSigSharesManager ctor

Co-authored-by: Konstantin Akimov <knstqq@gmail.com>
This commit is contained in:
Kittywhiskers Van Gogh 2024-12-05 22:42:14 +00:00
parent 7498a38076
commit 82d1aed1d6
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
7 changed files with 94 additions and 83 deletions

View File

@ -34,7 +34,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests, *quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)}, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)}, sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)}, shareman{std::make_unique<llmq::CSigSharesManager>(*sigman, mn_activeman, *qman, sporkman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const { clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr); assert(llmq::chainLocksHandler == nullptr);
llmq::chainLocksHandler = std::make_unique<llmq::CChainLocksHandler>(chainman.ActiveChainstate(), *qman, llmq::chainLocksHandler = std::make_unique<llmq::CChainLocksHandler>(chainman.ActiveChainstate(), *qman,
@ -83,7 +83,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
} }
qman->Start(); qman->Start();
shareman->RegisterAsRecoveredSigsListener(); shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread(); shareman->StartWorkerThread(connman, peerman);
sigman->StartWorkerThread(peerman); sigman->StartWorkerThread(peerman);
llmq::chainLocksHandler->Start(); llmq::chainLocksHandler->Start();

View File

@ -433,7 +433,8 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
return ret; return ret;
} }
static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv) static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman,
const CInv& inv)
{ {
CDKGLogger logger(session, __func__, __LINE__); CDKGLogger logger(session, __func__, __LINE__);
std::stringstream ss; std::stringstream ss;
@ -466,7 +467,7 @@ static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman
} }
template <typename Message, int MessageType> template <typename Message, int MessageType>
bool ProcessPendingMessageBatch(CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages, bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
PeerManager& peerman, size_t maxCount) PeerManager& peerman, size_t maxCount)
{ {
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount); auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);

View File

@ -178,14 +178,15 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash)
////////////////////// //////////////////////
void CSigSharesManager::StartWorkerThread() void CSigSharesManager::StartWorkerThread(CConnman& connman, PeerManager& peerman)
{ {
// can't start new thread if we have one running already // can't start new thread if we have one running already
if (workThread.joinable()) { if (workThread.joinable()) {
assert(false); assert(false);
} }
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); workThread = std::thread(&util::TraceThread, "sigshares",
[this, &connman, &peerman] { WorkThreadMain(connman, peerman); });
} }
void CSigSharesManager::StopWorkerThread() void CSigSharesManager::StopWorkerThread()
@ -215,7 +216,8 @@ void CSigSharesManager::InterruptWorkerThread()
workInterrupt(); workInterrupt();
} }
void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv) void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const CSporkManager& sporkman,
const std::string& msg_type, CDataStream& vRecv)
{ {
// non-masternodes are not interested in sigshares // non-masternodes are not interested in sigshares
if (m_mn_activeman == nullptr) return; if (m_mn_activeman == nullptr) return;
@ -227,12 +229,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) { if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId()); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId());
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
for (const auto& sigShare : receivedSigShares) { for (const auto& sigShare : receivedSigShares) {
ProcessMessageSigShare(pfrom.GetId(), sigShare); ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare);
} }
} }
@ -241,12 +243,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs; vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId()); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
if (!ranges::all_of(msgs, if (!ranges::all_of(msgs,
[this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) { [this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) {
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
} else if (msg_type == NetMsgType::QSIGSHARESINV) { } else if (msg_type == NetMsgType::QSIGSHARESINV) {
@ -254,12 +256,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs; vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId()); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId());
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
if (!ranges::all_of(msgs, if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) { [this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) {
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
} else if (msg_type == NetMsgType::QGETSIGSHARES) { } else if (msg_type == NetMsgType::QGETSIGSHARES) {
@ -267,12 +269,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs; vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId()); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId());
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
if (!ranges::all_of(msgs, if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) { [this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) {
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
} else if (msg_type == NetMsgType::QBSIGSHARES) { } else if (msg_type == NetMsgType::QBSIGSHARES) {
@ -284,12 +286,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
} }
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId()); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
if (!ranges::all_of(msgs, if (!ranges::all_of(msgs,
[this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) { [this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) {
BanNode(pfrom.GetId()); BanNode(pfrom.GetId(), peerman);
return; return;
} }
} }
@ -454,7 +456,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const
return true; return true;
} }
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare)
{ {
assert(m_mn_activeman); assert(m_mn_activeman);
@ -479,12 +481,12 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s
if (sigShare.getQuorumMember() >= quorum->members.size()) { if (sigShare.getQuorumMember() >= quorum->members.size()) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
BanNode(fromId); BanNode(fromId, peerman);
return; return;
} }
if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) { if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__); LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
BanNode(fromId); BanNode(fromId, peerman);
return; return;
} }
@ -620,7 +622,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify(
return true; return true;
} }
bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman) bool CSigSharesManager::ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman)
{ {
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes; std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums; std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
@ -646,7 +648,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
// we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
// deserialization in the message thread // deserialization in the message thread
if (!sigShare.sigShare.Get().IsValid()) { if (!sigShare.sigShare.Get().IsValid()) {
BanNode(nodeId); BanNode(nodeId, peerman);
// don't process any additional shares from this node // don't process any additional shares from this node
break; break;
} }
@ -678,25 +680,26 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n", LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
__func__, nodeId); __func__, nodeId);
// this will also cause re-requesting of the shares that were sent by this node // this will also cause re-requesting of the shares that were sent by this node
BanNode(nodeId); BanNode(nodeId, peerman);
continue; continue;
} }
ProcessPendingSigShares(v, quorums, connman); ProcessPendingSigShares(v, quorums, peerman, connman);
} }
return sigSharesByNodes.size() >= nMaxBatchSize; return sigSharesByNodes.size() >= nMaxBatchSize;
} }
// It's ensured that no duplicates are passed to this method // It's ensured that no duplicates are passed to this method
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess, void CSigSharesManager::ProcessPendingSigShares(
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums, const std::vector<CSigShare>& sigSharesToProcess,
const CConnman& connman) const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
PeerManager& peerman, const CConnman& connman)
{ {
cxxtimer::Timer t(true); cxxtimer::Timer t(true);
for (const auto& sigShare : sigSharesToProcess) { for (const auto& sigShare : sigSharesToProcess) {
auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()); auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash());
ProcessSigShare(sigShare, connman, quorums.at(quorumKey)); ProcessSigShare(peerman, sigShare, connman, quorums.at(quorumKey));
} }
t.stop(); t.stop();
@ -705,7 +708,8 @@ void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& si
} }
// sig shares are already verified when entering this method // sig shares are already verified when entering this method
void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum) void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
const CQuorumCPtr& quorum)
{ {
assert(m_mn_activeman); assert(m_mn_activeman);
@ -754,11 +758,12 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma
} }
if (canTryRecovery) { if (canTryRecovery) {
TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash()); TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash());
} }
} }
void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) void CSigSharesManager::TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id,
const uint256& msgHash)
{ {
if (sigman.HasRecoveredSigForId(quorum->params.type, id)) { if (sigman.HasRecoveredSigForId(quorum->params.type, id)) {
return; return;
@ -817,7 +822,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
} }
} }
sigman.ProcessRecoveredSig(rs, *m_peerman); sigman.ProcessRecoveredSig(rs, peerman);
} }
CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt) CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
@ -1027,7 +1032,9 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
} }
} }
void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce) void CSigSharesManager::CollectSigSharesToAnnounce(
const CConnman& connman,
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
{ {
AssertLockHeld(cs); AssertLockHeld(cs);
@ -1035,8 +1042,8 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
// TODO: remove NO_THREAD_SAFETY_ANALYSIS // TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation // using here template ForEach makes impossible to use lock annotation
sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey, sigSharesQueuedToAnnounce.ForEach([this, &connman, &quorumNodesMap,
bool) NO_THREAD_SAFETY_ANALYSIS { &sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) NO_THREAD_SAFETY_ANALYSIS {
AssertLockHeld(cs); AssertLockHeld(cs);
const auto& signHash = sigShareKey.first; const auto& signHash = sigShareKey.first;
auto quorumMember = sigShareKey.second; auto quorumMember = sigShareKey.second;
@ -1084,7 +1091,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
sigSharesQueuedToAnnounce.Clear(); sigSharesQueuedToAnnounce.Clear();
} }
bool CSigSharesManager::SendMessages() bool CSigSharesManager::SendMessages(CConnman& connman)
{ {
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest; std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend; std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
@ -1113,7 +1120,7 @@ bool CSigSharesManager::SendMessages()
LOCK(cs); LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend); CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce); CollectSigSharesToAnnounce(connman, sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes()); CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());
for (auto& [nodeId, sigShareMap] : sigSharesToRequest) { for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
@ -1254,7 +1261,7 @@ CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionI
return sigShare; return sigShare;
} }
void CSigSharesManager::Cleanup() void CSigSharesManager::Cleanup(const CConnman& connman)
{ {
int64_t now = GetTime<std::chrono::seconds>().count(); int64_t now = GetTime<std::chrono::seconds>().count();
if (now - lastCleanupTime < 5) { if (now - lastCleanupTime < 5) {
@ -1407,13 +1414,13 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
timeSeenForSessions.erase(signHash); timeSeenForSessions.erase(signHash);
} }
void CSigSharesManager::RemoveBannedNodeStates() void CSigSharesManager::RemoveBannedNodeStates(PeerManager& peerman)
{ {
// Called regularly to cleanup local node states for banned nodes // Called regularly to cleanup local node states for banned nodes
LOCK(cs); LOCK(cs);
for (auto it = nodeStates.begin(); it != nodeStates.end();) { for (auto it = nodeStates.begin(); it != nodeStates.end();) {
if (Assert(m_peerman)->IsBanned(it->first)) { if (peerman.IsBanned(it->first)) {
// re-request sigshares from other nodes // re-request sigshares from other nodes
// TODO: remove NO_THREAD_SAFETY_ANALYSIS // TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation // using here template ForEach makes impossible to use lock annotation
@ -1428,23 +1435,21 @@ void CSigSharesManager::RemoveBannedNodeStates()
} }
} }
void CSigSharesManager::BanNode(NodeId nodeId) void CSigSharesManager::BanNode(NodeId nodeId, PeerManager& peerman)
{ {
if (nodeId == -1) { if (nodeId == -1) {
return; return;
} }
{ peerman.Misbehaving(nodeId, 100);
Assert(m_peerman)->Misbehaving(nodeId, 100);
}
LOCK(cs); LOCK(cs);
auto it = nodeStates.find(nodeId); auto it = nodeStates.find(nodeId);
if (it == nodeStates.end()) { if (it == nodeStates.end()) {
return; return;
} }
auto& nodeState = it->second;
auto& nodeState = it->second;
// Whatever we requested from him, let's request it from someone else now // Whatever we requested from him, let's request it from someone else now
// TODO: remove NO_THREAD_SAFETY_ANALYSIS // TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation // using here template ForEach makes impossible to use lock annotation
@ -1453,26 +1458,25 @@ void CSigSharesManager::BanNode(NodeId nodeId)
sigSharesRequested.Erase(k); sigSharesRequested.Erase(k);
}); });
nodeState.requestedSigShares.Clear(); nodeState.requestedSigShares.Clear();
nodeState.banned = true; nodeState.banned = true;
} }
void CSigSharesManager::WorkThreadMain() void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
{ {
int64_t lastSendTime = 0; int64_t lastSendTime = 0;
while (!workInterrupt) { while (!workInterrupt) {
RemoveBannedNodeStates(); RemoveBannedNodeStates(peerman);
bool fMoreWork = ProcessPendingSigShares(connman); bool fMoreWork = ProcessPendingSigShares(peerman, connman);
SignPendingSigShares(); SignPendingSigShares(connman, peerman);
if (GetTimeMillis() - lastSendTime > 100) { if (GetTimeMillis() - lastSendTime > 100) {
SendMessages(); SendMessages(connman);
lastSendTime = GetTimeMillis(); lastSendTime = GetTimeMillis();
} }
Cleanup(); Cleanup(connman);
// TODO Wakeup when pending signing is needed? // TODO Wakeup when pending signing is needed?
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
@ -1487,7 +1491,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
pendingSigns.emplace_back(quorum, id, msgHash); pendingSigns.emplace_back(quorum, id, msgHash);
} }
void CSigSharesManager::SignPendingSigShares() void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManager& peerman)
{ {
std::vector<PendingSignatureData> v; std::vector<PendingSignatureData> v;
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns)); WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));
@ -1497,7 +1501,7 @@ void CSigSharesManager::SignPendingSigShares()
if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
auto sigShare = *opt_sigShare; auto sigShare = *opt_sigShare;
ProcessSigShare(sigShare, connman, pQuorum); ProcessSigShare(peerman, sigShare, connman, pQuorum);
if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
LOCK(cs); LOCK(cs);

View File

@ -404,34 +404,35 @@ private:
FastRandomContext rnd GUARDED_BY(cs); FastRandomContext rnd GUARDED_BY(cs);
CConnman& connman;
CSigningManager& sigman; CSigningManager& sigman;
const CActiveMasternodeManager* const m_mn_activeman; const CActiveMasternodeManager* const m_mn_activeman;
const CQuorumManager& qman; const CQuorumManager& qman;
const CSporkManager& m_sporkman; const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;
int64_t lastCleanupTime{0}; int64_t lastCleanupTime{0};
std::atomic<uint32_t> recoveredSigsCounter{0}; std::atomic<uint32_t> recoveredSigsCounter{0};
public: public:
explicit CSigSharesManager(CConnman& _connman, CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman, explicit CSigSharesManager(CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman,
const CQuorumManager& _qman, const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman) : const CQuorumManager& _qman, const CSporkManager& sporkman) :
connman(_connman), sigman(_sigman), m_mn_activeman(mn_activeman), qman(_qman), m_sporkman(sporkman), m_peerman(peerman) sigman(_sigman),
m_mn_activeman(mn_activeman),
qman(_qman),
m_sporkman(sporkman)
{ {
workInterrupt.reset(); workInterrupt.reset();
}; };
CSigSharesManager() = delete; CSigSharesManager() = delete;
~CSigSharesManager() override = default; ~CSigSharesManager() override = default;
void StartWorkerThread(); void StartWorkerThread(CConnman& connman, PeerManager& peerman);
void StopWorkerThread(); void StopWorkerThread();
void RegisterAsRecoveredSigsListener(); void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener(); void UnregisterAsRecoveredSigsListener();
void InterruptWorkerThread(); void InterruptWorkerThread();
void ProcessMessage(const CNode& pnode, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv); void ProcessMessage(const CNode& pnode, PeerManager& peerman, const CSporkManager& sporkman,
const std::string& msg_type, CDataStream& vRecv);
void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
std::optional<CSigShare> CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const; std::optional<CSigShare> CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const;
@ -447,7 +448,7 @@ private:
bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv); bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv);
bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv); bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv);
bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares); bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares);
void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare); void ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare);
static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv); static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv);
static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager,
@ -456,31 +457,36 @@ private:
bool CollectPendingSigSharesToVerify( bool CollectPendingSigSharesToVerify(
size_t maxUniqueSessions, std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares, size_t maxUniqueSessions, std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums); std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingSigShares(const CConnman& connman); bool ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman);
void ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess, void ProcessPendingSigShares(
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums, const std::vector<CSigShare>& sigSharesToProcess,
const CConnman& connman); const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
PeerManager& peerman, const CConnman& connman);
void ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum); void ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); const CQuorumCPtr& quorum);
void TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo);
static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair<uint16_t, CBLSLazySignature>& in); static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair<uint16_t, CBLSLazySignature>& in);
void Cleanup(); void Cleanup(const CConnman& connman);
void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs); void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs);
void RemoveBannedNodeStates(); void RemoveBannedNodeStates(PeerManager& peerman);
void BanNode(NodeId nodeId); void BanNode(NodeId nodeId, PeerManager& peerman);
bool SendMessages(); bool SendMessages(CConnman& connman);
void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToAnnounce(
void SignPendingSigShares(); const CConnman& connman,
void WorkThreadMain(); std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
EXCLUSIVE_LOCKS_REQUIRED(cs);
void SignPendingSigShares(const CConnman& connman, PeerManager& peerman);
void WorkThreadMain(CConnman& connman, PeerManager& peerman);
}; };
} // namespace llmq } // namespace llmq

View File

@ -4651,13 +4651,13 @@ void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, con
}); });
} }
bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const
{ {
LOCK(cs_vPendingMasternodes); LOCK(cs_vPendingMasternodes);
return masternodeQuorumNodes.count(std::make_pair(llmqType, quorumHash)); return masternodeQuorumNodes.count(std::make_pair(llmqType, quorumHash));
} }
std::set<uint256> CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) std::set<uint256> CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) const
{ {
LOCK(cs_vPendingMasternodes); LOCK(cs_vPendingMasternodes);
std::set<uint256> result; std::set<uint256> result;
@ -4700,7 +4700,7 @@ void CConnman::RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const u
masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash)); masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash));
} }
bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const
{ {
// Let's see if this is an outgoing connection to an address that is known to be a masternode // Let's see if this is an outgoing connection to an address that is known to be a masternode
// We however only need to know this if the node did not authenticate itself as a MN yet // We however only need to know this if the node did not authenticate itself as a MN yet

View File

@ -1503,12 +1503,12 @@ public:
bool AddPendingMasternode(const uint256& proTxHash); bool AddPendingMasternode(const uint256& proTxHash);
void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes); void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes); void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
std::set<uint256> GetMasternodeQuorums(Consensus::LLMQType llmqType); std::set<uint256> GetMasternodeQuorums(Consensus::LLMQType llmqType) const;
// also returns QWATCH nodes // also returns QWATCH nodes
std::set<NodeId> GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const; std::set<NodeId> GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash);
bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list); bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const;
bool IsMasternodeQuorumRelayMember(const uint256& protxHash); bool IsMasternodeQuorumRelayMember(const uint256& protxHash);
void AddPendingProbeConnections(const std::set<uint256>& proTxHashes); void AddPendingProbeConnections(const std::set<uint256>& proTxHashes);

View File

@ -5264,7 +5264,7 @@ void PeerManagerImpl::ProcessMessage(
PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId());
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv); m_llmq_ctx->shareman->ProcessMessage(pfrom, *this, m_sporkman, msg_type, vRecv);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
if (msg_type == NetMsgType::CLSIG) { if (msg_type == NetMsgType::CLSIG) {