refactor: move PeerManager out of CSigningManager ctor

This commit is contained in:
Kittywhiskers Van Gogh 2024-12-05 22:25:34 +00:00
parent 7ebc61e375
commit 7498a38076
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
5 changed files with 39 additions and 34 deletions

View File

@ -33,8 +33,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db, qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db,
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests, *quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)}, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, peerman, sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)}, shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const { clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr); assert(llmq::chainLocksHandler == nullptr);
@ -85,7 +84,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
qman->Start(); qman->Start();
shareman->RegisterAsRecoveredSigsListener(); shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread(); shareman->StartWorkerThread();
sigman->StartWorkerThread(); sigman->StartWorkerThread(peerman);
llmq::chainLocksHandler->Start(); llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start(); llmq::quorumInstantSendManager->Start();

View File

@ -349,8 +349,11 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
////////////////// //////////////////
CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) : const CQuorumManager& _qman, bool fMemory, bool fWipe) :
db(fMemory, fWipe), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman) db(fMemory, fWipe),
m_mn_activeman(mn_activeman),
m_chainstate(chainstate),
qman(_qman)
{ {
} }
@ -381,13 +384,14 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS
return true; return true;
} }
PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type,
CDataStream& vRecv)
{ {
if (msg_type == NetMsgType::QSIGREC) { if (msg_type == NetMsgType::QSIGREC) {
auto recoveredSig = std::make_shared<CRecoveredSig>(); auto recoveredSig = std::make_shared<CRecoveredSig>();
vRecv >> *recoveredSig; vRecv >> *recoveredSig;
return ProcessMessageRecoveredSig(pfrom, recoveredSig); return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig);
} }
return {}; return {};
} }
@ -416,10 +420,11 @@ static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CR
return true; return true;
} }
PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig) PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig)
{ {
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), WITH_LOCK(::cs_main,
CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()))); peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())));
bool ban = false; bool ban = false;
if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) {
@ -517,22 +522,22 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
} }
} }
void CSigningManager::ProcessPendingReconstructedRecoveredSigs() void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman)
{ {
decltype(pendingReconstructedRecoveredSigs) m; decltype(pendingReconstructedRecoveredSigs) m;
WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs)); WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs));
for (const auto& p : m) { for (const auto& p : m) {
ProcessRecoveredSig(p.second); ProcessRecoveredSig(p.second, peerman);
} }
} }
bool CSigningManager::ProcessPendingRecoveredSigs() bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman)
{ {
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode; std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums; std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
ProcessPendingReconstructedRecoveredSigs(); ProcessPendingReconstructedRecoveredSigs(peerman);
const size_t nMaxBatchSize{32}; const size_t nMaxBatchSize{32};
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
@ -575,7 +580,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
if (batchVerifier.badSources.count(nodeId)) { if (batchVerifier.badSources.count(nodeId)) {
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
Assert(m_peerman)->Misbehaving(nodeId, 100); peerman.Misbehaving(nodeId, 100);
continue; continue;
} }
@ -584,7 +589,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
continue; continue;
} }
ProcessRecoveredSig(recSig); ProcessRecoveredSig(recSig, peerman);
} }
} }
@ -592,7 +597,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
} }
// signature must be verified already // signature must be verified already
void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig) void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman)
{ {
auto llmqType = recoveredSig->getLlmqType(); auto llmqType = recoveredSig->getLlmqType();
@ -631,12 +636,12 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash())); WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash()));
if (m_mn_activeman != nullptr) { if (m_mn_activeman != nullptr) {
Assert(m_peerman)->RelayRecoveredSig(recoveredSig->GetHash()); peerman.RelayRecoveredSig(recoveredSig->GetHash());
} }
auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners); auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners);
for (auto& l : listeners) { for (auto& l : listeners) {
Assert(m_peerman)->PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig));
} }
GetMainSignals().NotifyRecoveredSig(recoveredSig); GetMainSignals().NotifyRecoveredSig(recoveredSig);
@ -799,14 +804,14 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256&
return db.GetVoteForId(llmqType, id, msgHashRet); return db.GetVoteForId(llmqType, id, msgHashRet);
} }
void CSigningManager::StartWorkerThread() void CSigningManager::StartWorkerThread(PeerManager& peerman)
{ {
// can't start new thread if we have one running already // can't start new thread if we have one running already
if (workThread.joinable()) { if (workThread.joinable()) {
assert(false); assert(false);
} }
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); workThread = std::thread(&util::TraceThread, "sigshares", [this, &peerman] { WorkThreadMain(peerman); });
} }
void CSigningManager::StopWorkerThread() void CSigningManager::StopWorkerThread()
@ -826,10 +831,10 @@ void CSigningManager::InterruptWorkerThread()
workInterrupt(); workInterrupt();
} }
void CSigningManager::WorkThreadMain() void CSigningManager::WorkThreadMain(PeerManager& peerman)
{ {
while (!workInterrupt) { while (!workInterrupt) {
bool fMoreWork = ProcessPendingRecoveredSigs(); bool fMoreWork = ProcessPendingRecoveredSigs(peerman);
Cleanup(); Cleanup();

View File

@ -162,7 +162,6 @@ private:
const CActiveMasternodeManager* const m_mn_activeman; const CActiveMasternodeManager* const m_mn_activeman;
const CChainState& m_chainstate; const CChainState& m_chainstate;
const CQuorumManager& qman; const CQuorumManager& qman;
const std::unique_ptr<PeerManager>& m_peerman;
mutable Mutex cs_pending; mutable Mutex cs_pending;
// Incoming and not verified yet // Incoming and not verified yet
@ -178,12 +177,12 @@ private:
public: public:
CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe); const CQuorumManager& _qman, bool fMemory, bool fWipe);
bool AlreadyHave(const CInv& inv) const; bool AlreadyHave(const CInv& inv) const;
bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const;
PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); PeerMsgRet ProcessMessage(const CNode& pnode, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv);
// This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid
// This is the case for example when a signature appears as part of InstantSend or ChainLocks // This is the case for example when a signature appears as part of InstantSend or ChainLocks
@ -196,16 +195,18 @@ public:
void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id);
private: private:
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig); PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>>& retSigShares, std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums); std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
void ProcessPendingReconstructedRecoveredSigs(); void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman);
bool ProcessPendingRecoveredSigs(); // called from the worker thread of CSigSharesManager bool ProcessPendingRecoveredSigs(PeerManager& peerman); // called from the worker thread of CSigSharesManager
public: public:
// TODO - should not be public! // TODO - should not be public!
void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig); void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman);
private: private:
void Cleanup(); // called from the worker thread of CSigSharesManager void Cleanup(); // called from the worker thread of CSigSharesManager
@ -228,10 +229,10 @@ public:
private: private:
std::thread workThread; std::thread workThread;
CThreadInterrupt workInterrupt; CThreadInterrupt workInterrupt;
void WorkThreadMain(); void WorkThreadMain(PeerManager& peerman);
public: public:
void StartWorkerThread(); void StartWorkerThread(PeerManager& peerman);
void StopWorkerThread(); void StopWorkerThread();
void InterruptWorkerThread(); void InterruptWorkerThread();
}; };

View File

@ -817,7 +817,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
} }
} }
sigman.ProcessRecoveredSig(rs); sigman.ProcessRecoveredSig(rs, *m_peerman);
} }
CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt) CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)

View File

@ -5265,7 +5265,7 @@ void PeerManagerImpl::ProcessMessage(
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv); m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
if (msg_type == NetMsgType::CLSIG) { if (msg_type == NetMsgType::CLSIG) {
if (llmq::AreChainLocksEnabled(m_sporkman)) { if (llmq::AreChainLocksEnabled(m_sporkman)) {