diff --git a/src/coinjoin/client.cpp b/src/coinjoin/client.cpp index 9ee0fa0d91..85c35bc8a7 100644 --- a/src/coinjoin/client.cpp +++ b/src/coinjoin/client.cpp @@ -30,18 +30,20 @@ #include #include -PeerMsgRet CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) +PeerMsgRet CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, CConnman& connman, PeerManager& peerman, + std::string_view msg_type, CDataStream& vRecv) { if (m_is_masternode) return {}; if (!m_mn_sync.IsBlockchainSynced()) return {}; if (msg_type == NetMsgType::DSQUEUE) { - return CCoinJoinClientQueueManager::ProcessDSQueue(peer, vRecv); + return CCoinJoinClientQueueManager::ProcessDSQueue(peer, connman, peerman, vRecv); } return {}; } -PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataStream& vRecv) +PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CConnman& connman, PeerManager& peerman, + CDataStream& vRecv) { assert(m_mn_metaman.IsValid()); @@ -50,7 +52,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS { LOCK(cs_main); - Assert(peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash())); + peerman.EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash())); } if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) { @@ -102,8 +104,9 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS } // if the queue is ready, submit if we can - if (dsq.fReady && m_walletman.ForAnyCJClientMan([this, &dmn](std::unique_ptr& clientman) { - return clientman->TrySubmitDenominate(dmn->pdmnState->addr, this->connman); + if (dsq.fReady && + m_walletman.ForAnyCJClientMan([this, &connman, &dmn](std::unique_ptr& clientman) { + return clientman->TrySubmitDenominate(dmn->pdmnState->addr, connman); })) { LogPrint(BCLog::COINJOIN, "DSQUEUE -- CoinJoin queue (%s) is ready on masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToStringAddrPort()); @@ -132,7 +135,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq)); } } // cs_ProcessDSQueue - peerman->RelayDSQ(dsq); + peerman.RelayDSQ(dsq); return {}; } @@ -1917,11 +1920,11 @@ void CoinJoinWalletManager::Add(const std::shared_ptr& wallet) g_wallet_init_interface.InitCoinJoinSettings(*this); } -void CoinJoinWalletManager::DoMaintenance() +void CoinJoinWalletManager::DoMaintenance(CConnman& connman) { LOCK(cs_wallet_manager_map); for (auto& [_, clientman] : m_wallet_manager_map) { - clientman->DoMaintenance(m_chainman, m_connman, m_mempool); + clientman->DoMaintenance(m_chainman, connman, m_mempool); } } diff --git a/src/coinjoin/client.h b/src/coinjoin/client.h index 0dd1cccb90..2eccc1cfc2 100644 --- a/src/coinjoin/client.h +++ b/src/coinjoin/client.h @@ -76,11 +76,10 @@ public: using wallet_name_cjman_map = std::map>; public: - CoinJoinWalletManager(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, - CMasternodeMetaMan& mn_metaman, const CTxMemPool& mempool, const CMasternodeSync& mn_sync, + CoinJoinWalletManager(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, + const CTxMemPool& mempool, const CMasternodeSync& mn_sync, const std::unique_ptr& queueman, bool is_masternode) : m_chainman(chainman), - m_connman(connman), m_dmnman(dmnman), m_mn_metaman(mn_metaman), m_mempool(mempool), @@ -97,7 +96,7 @@ public: } void Add(const std::shared_ptr& wallet); - void DoMaintenance(); + void DoMaintenance(CConnman& connman); void Remove(const std::string& name); void Flush(const std::string& name); @@ -122,7 +121,6 @@ public: private: ChainstateManager& m_chainman; - CConnman& m_connman; CDeterministicMNManager& m_dmnman; CMasternodeMetaMan& m_mn_metaman; const CTxMemPool& m_mempool; @@ -234,8 +232,6 @@ public: class CCoinJoinClientQueueManager : public CCoinJoinBaseManager { private: - CConnman& connman; - std::unique_ptr& peerman; CoinJoinWalletManager& m_walletman; CDeterministicMNManager& m_dmnman; CMasternodeMetaMan& m_mn_metaman; @@ -245,20 +241,18 @@ private: const bool m_is_masternode; public: - explicit CCoinJoinClientQueueManager(CConnman& _connman, std::unique_ptr& _peerman, - CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman, + explicit CCoinJoinClientQueueManager(CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync, bool is_masternode) : - connman(_connman), - peerman(_peerman), m_walletman(walletman), m_dmnman(dmnman), m_mn_metaman(mn_metaman), m_mn_sync(mn_sync), m_is_masternode{is_masternode} {}; - PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); - PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& peer, CConnman& connman, PeerManager& peerman, std::string_view msg_type, + CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); + PeerMsgRet ProcessDSQueue(const CNode& peer, CConnman& connman, PeerManager& peerman, CDataStream& vRecv); void DoMaintenance(); }; diff --git a/src/coinjoin/context.cpp b/src/coinjoin/context.cpp index 8ce72e3b97..8ab6f9e636 100644 --- a/src/coinjoin/context.cpp +++ b/src/coinjoin/context.cpp @@ -15,12 +15,11 @@ CJContext::CJContext(ChainstateManager& chainman, CConnman& connman, CDeterminis std::unique_ptr& peerman, bool relay_txes) : dstxman{std::make_unique()}, #ifdef ENABLE_WALLET - walletman{std::make_unique(chainman, connman, dmnman, mn_metaman, mempool, mn_sync, queueman, - /* is_masternode = */ mn_activeman != nullptr)}, - queueman{relay_txes - ? std::make_unique(connman, peerman, *walletman, dmnman, mn_metaman, - mn_sync, /* is_masternode = */ mn_activeman != nullptr) - : nullptr}, + walletman{std::make_unique(chainman, dmnman, mn_metaman, mempool, mn_sync, queueman, + /*is_masternode=*/mn_activeman != nullptr)}, + queueman{relay_txes ? std::make_unique(*walletman, dmnman, mn_metaman, mn_sync, + /*is_masternode=*/mn_activeman != nullptr) + : nullptr}, #endif // ENABLE_WALLET server{std::make_unique(chainman, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman, mn_sync, peerman)} diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index 701f96ccaa..3f412026f8 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -93,7 +93,7 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con m_llmq_ctx->isman->UpdatedBlockTip(pindexNew); m_llmq_ctx->clhandler->UpdatedBlockTip(); - m_llmq_ctx->qman->UpdatedBlockTip(pindexNew, fInitialDownload); + m_llmq_ctx->qman->UpdatedBlockTip(pindexNew, m_connman, fInitialDownload); m_llmq_ctx->qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload); m_llmq_ctx->ehfSignalsHandler->UpdatedBlockTip(pindexNew, /* is_masternode = */ m_mn_activeman != nullptr); @@ -107,7 +107,7 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& { assert(m_cj_ctx && m_llmq_ctx); - m_llmq_ctx->isman->TransactionAddedToMempool(ptx); + m_llmq_ctx->isman->TransactionAddedToMempool(m_peerman, ptx); m_llmq_ctx->clhandler->TransactionAddedToMempool(ptx, nAcceptTime); m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); } diff --git a/src/evo/deterministicmns.h b/src/evo/deterministicmns.h index 52119350f1..81621d8bd9 100644 --- a/src/evo/deterministicmns.h +++ b/src/evo/deterministicmns.h @@ -30,7 +30,6 @@ class CBlock; class CBlockIndex; class CChainState; class CCoinsViewCache; -class CConnman; class CEvoDB; class TxValidationState; @@ -583,7 +582,6 @@ private: std::atomic to_cleanup {0}; CChainState& m_chainstate; - CConnman& connman; CEvoDB& m_evoDb; std::unordered_map mnListsCache GUARDED_BY(cs); @@ -592,8 +590,11 @@ private: const CBlockIndex* m_initial_snapshot_index GUARDED_BY(cs) {nullptr}; public: - explicit CDeterministicMNManager(CChainState& chainstate, CConnman& _connman, CEvoDB& evoDb) : - m_chainstate(chainstate), connman(_connman), m_evoDb(evoDb) {} + explicit CDeterministicMNManager(CChainState& chainstate, CEvoDB& evoDb) : + m_chainstate(chainstate), + m_evoDb(evoDb) + { + } ~CDeterministicMNManager() = default; bool ProcessBlock(const CBlock& block, gsl::not_null pindex, BlockValidationState& state, diff --git a/src/init.cpp b/src/init.cpp index 092bc971e2..ef6fb6a82e 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1893,7 +1893,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Same logic as above with pblocktree node.dmnman.reset(); - node.dmnman = std::make_unique(chainman.ActiveChainstate(), *node.connman, *node.evodb); + node.dmnman = std::make_unique(chainman.ActiveChainstate(), *node.evodb); node.mempool->ConnectManagers(node.dmnman.get()); node.cpoolman.reset(); @@ -1907,12 +1907,10 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.llmq_ctx->Stop(); } node.llmq_ctx.reset(); - node.llmq_ctx = std::make_unique(chainman, *node.connman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, - *node.mempool, node.mn_activeman.get(), *node.mn_sync, node.peerman, /* unit_tests = */ false, /* wipe = */ fReset || fReindexChainState); + node.llmq_ctx = std::make_unique(chainman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, + *node.mempool, node.mn_activeman.get(), *node.mn_sync, /*unit_tests=*/false, /*wipe=*/fReset || fReindexChainState); // Enable CMNHFManager::{Process, Undo}Block node.mnhf_manager->ConnectManagers(node.chainman.get(), node.llmq_ctx->qman.get()); - // Have to start it early to let VerifyDB check ChainLock signatures in coinbase - node.llmq_ctx->Start(); node.chain_helper.reset(); node.chain_helper = std::make_unique(*node.cpoolman, *node.dmnman, *node.mnhf_manager, *node.govman, *(node.llmq_ctx->quorum_block_processor), *node.chainman, @@ -2287,6 +2285,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 10a: schedule Dash-specific tasks + node.llmq_ctx->Start(*node.connman, *node.peerman); + node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync), std::cref(*node.peerman), std::cref(*node.govman)), std::chrono::seconds{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), std::ref(*node.cj_ctx)), std::chrono::minutes{1}); @@ -2302,7 +2302,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) #ifdef ENABLE_WALLET } else if (!ignores_incoming_txs) { node.scheduler->scheduleEvery(std::bind(&CCoinJoinClientQueueManager::DoMaintenance, std::ref(*node.cj_ctx->queueman)), std::chrono::seconds{1}); - node.scheduler->scheduleEvery(std::bind(&CoinJoinWalletManager::DoMaintenance, std::ref(*node.cj_ctx->walletman)), std::chrono::seconds{1}); + node.scheduler->scheduleEvery(std::bind(&CoinJoinWalletManager::DoMaintenance, std::ref(*node.cj_ctx->walletman), std::ref(*node.connman)), std::chrono::seconds{1}); #endif // ENABLE_WALLET } diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 2d58262f80..8e8307b5c5 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -18,24 +18,22 @@ #include #include -LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, - CEvoDB& evo_db, CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman, +LLMQContext::LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CEvoDB& evo_db, + CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, - const CMasternodeSync& mn_sync, const std::unique_ptr& peerman, bool unit_tests, - bool wipe) : + const CMasternodeSync& mn_sync, bool unit_tests, bool wipe) : is_masternode{mn_activeman != nullptr}, bls_worker{std::make_shared()}, dkg_debugman{std::make_unique()}, quorum_block_processor{std::make_unique(chainman.ActiveChainstate(), dmnman, evo_db)}, - qdkgsman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, - *dkg_debugman, mn_metaman, *quorum_block_processor, - mn_activeman, sporkman, peerman, unit_tests, wipe)}, - qman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, *qdkgsman, - evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman, - unit_tests, wipe)}, - sigman{std::make_unique(mn_activeman, chainman.ActiveChainstate(), *qman, peerman, - unit_tests, wipe)}, - shareman{std::make_unique(connman, *sigman, mn_activeman, *qman, sporkman, peerman)}, + qdkgsman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), dmnman, *dkg_debugman, + mn_metaman, *quorum_block_processor, mn_activeman, sporkman, + unit_tests, wipe)}, + qman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db, + *quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests, + wipe)}, + sigman{std::make_unique(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)}, + shareman{std::make_unique(*sigman, mn_activeman, *qman, sporkman)}, clhandler{[&]() -> llmq::CChainLocksHandler* const { assert(llmq::chainLocksHandler == nullptr); llmq::chainLocksHandler = std::make_unique(chainman.ActiveChainstate(), *qman, @@ -48,15 +46,19 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm llmq::quorumInstantSendManager = std::make_unique(*llmq::chainLocksHandler, chainman.ActiveChainstate(), *qman, *sigman, *shareman, sporkman, - mempool, mn_sync, peerman, - is_masternode, unit_tests, wipe); + mempool, mn_sync, is_masternode, + unit_tests, wipe); return llmq::quorumInstantSendManager.get(); }()}, ehfSignalsHandler{std::make_unique(chainman, mnhfman, *sigman, *shareman, *qman)} { + // Have to start it early to let VerifyDB check ChainLock signatures in coinbase + bls_worker->Start(); } LLMQContext::~LLMQContext() { + bls_worker->Stop(); + // LLMQContext doesn't own these objects, but still need to care of them for consistency: llmq::quorumInstantSendManager.reset(); llmq::chainLocksHandler.reset(); @@ -70,21 +72,21 @@ void LLMQContext::Interrupt() { llmq::quorumInstantSendManager->InterruptWorkerThread(); } -void LLMQContext::Start() { +void LLMQContext::Start(CConnman& connman, PeerManager& peerman) +{ assert(clhandler == llmq::chainLocksHandler.get()); assert(isman == llmq::quorumInstantSendManager.get()); - bls_worker->Start(); if (is_masternode) { - qdkgsman->StartThreads(); + qdkgsman->StartThreads(connman, peerman); } qman->Start(); shareman->RegisterAsRecoveredSigsListener(); - shareman->StartWorkerThread(); - sigman->StartWorkerThread(); + shareman->StartWorkerThread(connman, peerman); + sigman->StartWorkerThread(peerman); llmq::chainLocksHandler->Start(); - llmq::quorumInstantSendManager->Start(); + llmq::quorumInstantSendManager->Start(peerman); } void LLMQContext::Stop() { @@ -101,5 +103,4 @@ void LLMQContext::Stop() { if (is_masternode) { qdkgsman->StopThreads(); } - bls_worker->Stop(); } diff --git a/src/llmq/context.h b/src/llmq/context.h index de0a76a0bc..0e92e6a90f 100644 --- a/src/llmq/context.h +++ b/src/llmq/context.h @@ -39,14 +39,14 @@ private: public: LLMQContext() = delete; LLMQContext(const LLMQContext&) = delete; - LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, CEvoDB& evo_db, + LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CEvoDB& evo_db, CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool, - const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, - const std::unique_ptr& peerman, bool unit_tests, bool wipe); + const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, bool unit_tests, + bool wipe); ~LLMQContext(); void Interrupt(); - void Start(); + void Start(CConnman& connman, PeerManager& peerman); void Stop(); /** Guaranteed if LLMQContext is initialized then all members are valid too diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index b1b01014d3..b63ebf45a4 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -72,14 +72,12 @@ CDKGMember::CDKGMember(const CDeterministicMNCPtr& _dmn, size_t _idx) : } CDKGSession::CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params, - CBLSWorker& _blsWorker, CConnman& _connman, CDeterministicMNManager& dmnman, - CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, - CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman) : + CBLSWorker& _blsWorker, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman) : params(_params), blsWorker(_blsWorker), cache(_blsWorker), - connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), dkgDebugManager(_dkgDebugManager), @@ -156,7 +154,7 @@ bool CDKGSession::Init(const uint256& _myProTxHash, int _quorumIndex) return true; } -void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages) +void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { CDKGLogger logger(*this, __func__, __LINE__); @@ -174,10 +172,10 @@ void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages) logger.Batch("generated contributions. time=%d", t1.count()); logger.Flush(); - SendContributions(pendingMessages); + SendContributions(pendingMessages, peerman); } -void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages) +void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { CDKGLogger logger(*this, __func__, __LINE__); @@ -226,7 +224,7 @@ void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, nullptr, qc); + pendingMessages.PushPendingMessage(-1, qc, peerman); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -417,7 +415,7 @@ void CDKGSession::VerifyPendingContributions() pendingContributionVerifications.clear(); } -void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages) +void CDKGSession::VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman) { if (!AreWeMember()) { return; @@ -453,12 +451,12 @@ void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages) logger.Batch("verified contributions. time=%d", t1.count()); logger.Flush(); - VerifyConnectionAndMinProtoVersions(); + VerifyConnectionAndMinProtoVersions(connman); - SendComplaint(pendingMessages); + SendComplaint(pendingMessages, peerman); } -void CDKGSession::VerifyConnectionAndMinProtoVersions() const +void CDKGSession::VerifyConnectionAndMinProtoVersions(CConnman& connman) const { assert(m_mn_metaman.IsValid()); @@ -499,7 +497,7 @@ void CDKGSession::VerifyConnectionAndMinProtoVersions() const } } -void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages) +void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { CDKGLogger logger(*this, __func__, __LINE__); @@ -538,7 +536,7 @@ void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, nullptr, qc); + pendingMessages.PushPendingMessage(-1, qc, peerman); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -645,7 +643,7 @@ std::optional CDKGSession::ReceiveMessage(const CDKGComplaint& qc) return inv; } -void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages) +void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { if (!AreWeMember()) { return; @@ -682,11 +680,12 @@ void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages) logger.Flush(); if (!justifyFor.empty()) { - SendJustification(pendingMessages, justifyFor); + SendJustification(pendingMessages, peerman, justifyFor); } } -void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const std::set& forMembers) +void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, + const std::set& forMembers) { CDKGLogger logger(*this, __func__, __LINE__); @@ -731,7 +730,7 @@ void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const return true; }); - pendingMessages.PushPendingMessage(-1, nullptr, qj); + pendingMessages.PushPendingMessage(-1, qj, peerman); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification @@ -885,7 +884,7 @@ std::optional CDKGSession::ReceiveMessage(const CDKGJustification& qj) return inv; } -void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages) +void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { if (!AreWeMember()) { return; @@ -927,10 +926,10 @@ void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages) logger.Flush(); - SendCommitment(pendingMessages); + SendCommitment(pendingMessages, peerman); } -void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages) +void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman) { CDKGLogger logger(*this, __func__, __LINE__); @@ -1041,7 +1040,7 @@ void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages) return true; }); - pendingMessages.PushPendingMessage(-1, nullptr, qc); + pendingMessages.PushPendingMessage(-1, qc, peerman); } // only performs cheap verifications, but not the signature of the message. this is checked with batched verification diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index b92db5646a..82a1263b92 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -22,6 +22,7 @@ class CConnman; class CDeterministicMN; class CMasternodeMetaMan; class CSporkManager; +class PeerManager; using CDeterministicMNCPtr = std::shared_ptr; @@ -279,7 +280,6 @@ private: CBLSWorker& blsWorker; CBLSWorkerCache cache; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGSessionManager& dkgManager; CDKGDebugManager& dkgDebugManager; @@ -326,9 +326,9 @@ private: public: CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, - CConnman& _connman, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, - CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman); + CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, + CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, + const CSporkManager& sporkman); // TODO: remove Init completely bool Init(const uint256& _myProTxHash, int _quorumIndex); @@ -349,28 +349,28 @@ public: */ // Phase 1: contribution - void Contribute(CDKGPendingMessages& pendingMessages); - void SendContributions(CDKGPendingMessages& pendingMessages); + void Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman); + void SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman); bool PreVerifyMessage(const CDKGContribution& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGContribution& qc); void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending); // Phase 2: complaint - void VerifyAndComplain(CDKGPendingMessages& pendingMessages); - void VerifyConnectionAndMinProtoVersions() const; - void SendComplaint(CDKGPendingMessages& pendingMessages); + void VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman); + void VerifyConnectionAndMinProtoVersions(CConnman& connman) const; + void SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman); bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGComplaint& qc); // Phase 3: justification - void VerifyAndJustify(CDKGPendingMessages& pendingMessages); - void SendJustification(CDKGPendingMessages& pendingMessages, const std::set& forMembers); + void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman); + void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const std::set& forMembers); bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const; std::optional ReceiveMessage(const CDKGJustification& qj); // Phase 4: commit - void VerifyAndCommit(CDKGPendingMessages& pendingMessages); - void SendCommitment(CDKGPendingMessages& pendingMessages); + void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman); + void SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman); bool PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGPrematureCommitment& qc); diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 09e24bf429..a773af25f7 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -24,16 +24,13 @@ namespace llmq { -CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, - CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, - CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const std::unique_ptr& peerman, +CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, + CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex) : blsWorker(_blsWorker), m_chainstate(chainstate), - connman(_connman), m_dmnman(dmnman), dkgDebugManager(_dkgDebugManager), dkgManager(_dkgManager), @@ -41,11 +38,10 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai quorumBlockProcessor(_quorumBlockProcessor), m_mn_activeman(mn_activeman), m_sporkman(sporkman), - m_peerman(peerman), params(_params), quorumIndex(_quorumIndex), - curSession(std::make_unique(nullptr, _params, _blsWorker, _connman, dmnman, _dkgManager, - _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)), + curSession(std::make_unique(nullptr, _params, _blsWorker, dmnman, _dkgManager, _dkgDebugManager, + m_mn_metaman, m_mn_activeman, sporkman)), pendingContributions( (size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) @@ -60,18 +56,8 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai CDKGSessionHandler::~CDKGSessionHandler() = default; -void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv) +void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman) { - // if peer is not -1 we should always pass valid peerman - assert(from == -1 || peerman != nullptr); - if (peerman != nullptr) { - if (m_peerman == nullptr) { - m_peerman = peerman; - } - // we should never use one different PeerManagers for same queue - assert(m_peerman == peerman); - } - // this will also consume the data, even if we bail out early auto pm = std::make_shared(std::move(vRecv)); @@ -80,7 +66,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, uint256 hash = hw.GetHash(); if (from != -1) { - WITH_LOCK(::cs_main, Assert(m_peerman.load())->EraseObjectRequest(from, CInv(invType, hash))); + WITH_LOCK(::cs_main, peerman.EraseObjectRequest(from, CInv(invType, hash))); } LOCK(cs_messages); @@ -119,10 +105,10 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const return seenMessages.count(hash) != 0; } -void CDKGPendingMessages::Misbehaving(const NodeId from, const int score) +void CDKGPendingMessages::Misbehaving(const NodeId from, const int score, PeerManager& peerman) { if (from == -1) return; - m_peerman.load()->Misbehaving(from, score); + peerman.Misbehaving(from, score); } void CDKGPendingMessages::Clear() @@ -162,28 +148,30 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew) params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase)); } -void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) +void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, + CDataStream& vRecv) { // We don't handle messages in the calling thread as deserialization/processing of these would block everything if (msg_type == NetMsgType::QCONTRIB) { - pendingContributions.PushPendingMessage(pfrom.GetId(), peerman, vRecv); + pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv, peerman); } else if (msg_type == NetMsgType::QCOMPLAINT) { - pendingComplaints.PushPendingMessage(pfrom.GetId(), peerman, vRecv); + pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv, peerman); } else if (msg_type == NetMsgType::QJUSTIFICATION) { - pendingJustifications.PushPendingMessage(pfrom.GetId(), peerman, vRecv); + pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv, peerman); } else if (msg_type == NetMsgType::QPCOMMITMENT) { - pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), peerman, vRecv); + pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv, peerman); } } -void CDKGSessionHandler::StartThread() +void CDKGSessionHandler::StartThread(CConnman& connman, PeerManager& peerman) { if (phaseHandlerThread.joinable()) { throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread."); } m_thread_name = strprintf("llmq-%d-%d", ToUnderlying(params.type), quorumIndex); - phaseHandlerThread = std::thread(util::TraceThread, m_thread_name.c_str(), [this] { PhaseHandlerThread(); }); + phaseHandlerThread = std::thread(&util::TraceThread, m_thread_name.c_str(), + [this, &connman, &peerman] { PhaseHandlerThread(connman, peerman); }); } void CDKGSessionHandler::StopThread() @@ -200,7 +188,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex) return false; } - curSession = std::make_unique(pQuorumBaseBlockIndex, params, blsWorker, connman, m_dmnman, dkgManager, + curSession = std::make_unique(pQuorumBaseBlockIndex, params, blsWorker, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman); if (!curSession->Init(m_mn_activeman->GetProTxHash(), quorumIndex)) { @@ -445,7 +433,8 @@ std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector< return ret; } -static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv) +static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman, + const CInv& inv) { CDKGLogger logger(session, __func__, __LINE__); std::stringstream ss; @@ -476,9 +465,10 @@ static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman logger.Batch("forMember[%s] NOTrelayMembers[%s]", session.ProTx().ToString().substr(0, 4), ss2.str()); logger.Flush(); } + template -bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSession& session, - CDKGPendingMessages& pendingMessages, size_t maxCount) +bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages, + PeerManager& peerman, size_t maxCount) { auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); if (msgs.empty()) { @@ -493,7 +483,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes if (!p.second) { LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId); { - pendingMessages.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100, peerman); } continue; } @@ -502,7 +492,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes if (ban) { LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId); { - pendingMessages.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100, peerman); } } LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId); @@ -519,7 +509,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes LOCK(cs_main); for (auto nodeId : badNodes) { LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId); - pendingMessages.Misbehaving(nodeId, 100); + pendingMessages.Misbehaving(nodeId, 100, peerman); } } @@ -529,15 +519,15 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes continue; } const std::optional inv = session.ReceiveMessage(*p.second); - if (inv && peerman) { - RelayInvToParticipants(session, connman, *peerman, *inv); + if (inv) { + RelayInvToParticipants(session, connman, peerman, *inv); } } return true; } -void CDKGSessionHandler::HandleDKGRound() +void CDKGSessionHandler::HandleDKGRound(CConnman& connman, PeerManager& peerman) { WaitForNextPhase(std::nullopt, QuorumPhase::Initialized); @@ -570,60 +560,53 @@ void CDKGSessionHandler::HandleDKGRound() WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); // Contribute - auto fContributeStart = [this]() { - curSession->Contribute(pendingContributions); - }; - auto fContributeWait = [this] { - return ProcessPendingMessageBatch(connman, m_peerman.get(), *curSession, - pendingContributions, 8); + auto fContributeStart = [this, &peerman]() { curSession->Contribute(pendingContributions, peerman); }; + auto fContributeWait = [this, &connman, &peerman] { + return ProcessPendingMessageBatch(connman, *curSession, + pendingContributions, peerman, 8); }; HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); // Complain - auto fComplainStart = [this]() { - curSession->VerifyAndComplain(pendingComplaints); + auto fComplainStart = [this, &connman, &peerman]() { + curSession->VerifyAndComplain(connman, pendingComplaints, peerman); }; - auto fComplainWait = [this] { - return ProcessPendingMessageBatch(connman, m_peerman.get(), *curSession, - pendingComplaints, 8); + auto fComplainWait = [this, &connman, &peerman] { + return ProcessPendingMessageBatch(connman, *curSession, pendingComplaints, + peerman, 8); }; HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); // Justify - auto fJustifyStart = [this]() { - curSession->VerifyAndJustify(pendingJustifications); - }; - auto fJustifyWait = [this] { - return ProcessPendingMessageBatch(connman, m_peerman.get(), - *curSession, - pendingJustifications, 8); + auto fJustifyStart = [this, &peerman]() { curSession->VerifyAndJustify(pendingJustifications, peerman); }; + auto fJustifyWait = [this, &connman, &peerman] { + return ProcessPendingMessageBatch(connman, *curSession, + pendingJustifications, peerman, 8); }; HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); // Commit - auto fCommitStart = [this]() { - curSession->VerifyAndCommit(pendingPrematureCommitments); - }; - auto fCommitWait = [this] { + auto fCommitStart = [this, &peerman]() { curSession->VerifyAndCommit(pendingPrematureCommitments, peerman); }; + auto fCommitWait = [this, &connman, &peerman] { return ProcessPendingMessageBatch( - connman, m_peerman.get(), *curSession, pendingPrematureCommitments, 8); + connman, *curSession, pendingPrematureCommitments, peerman, 8); }; HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); auto finalCommitments = curSession->FinalizeCommitments(); for (const auto& fqc : finalCommitments) { if (auto inv_opt = quorumBlockProcessor.AddMineableCommitment(fqc); inv_opt.has_value()) { - Assert(m_peerman.get())->RelayInv(inv_opt.value()); + peerman.RelayInv(inv_opt.value()); } } } -void CDKGSessionHandler::PhaseHandlerThread() +void CDKGSessionHandler::PhaseHandlerThread(CConnman& connman, PeerManager& peerman) { while (!stopRequested) { try { LogPrint(BCLog::LLMQ_DKG, "CDKGSessionHandler::%s -- %s qi[%d] - starting HandleDKGRound\n", __func__, params.name, quorumIndex); - HandleDKGRound(); + HandleDKGRound(connman, peerman); } catch (AbortPhaseException& e) { dkgDebugManager.UpdateLocalSessionStatus(params.type, quorumIndex, [&](CDKGDebugSessionStatus& status) { status.statusBits.aborted = true; diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 31ae25b047..b28ae87e44 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -7,8 +7,6 @@ #include // for NodeId -#include - #include #include #include @@ -65,7 +63,6 @@ public: using BinaryMessage = std::pair>; private: - std::atomic m_peerman{nullptr}; const int invType; const size_t maxMessagesPerNode; mutable Mutex cs_messages; @@ -77,18 +74,18 @@ public: explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) : invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {}; - void PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv); + void PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman); std::list PopPendingMessages(size_t maxCount); bool HasSeen(const uint256& hash) const; - void Misbehaving(NodeId from, int score); + void Misbehaving(NodeId from, int score, PeerManager& peerman); void Clear(); - template - void PushPendingMessage(NodeId from, PeerManager* peerman, Message& msg) + template + void PushPendingMessage(NodeId from, Message& msg, PeerManager& peerman) { CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); ds << msg; - PushPendingMessage(from, peerman, ds); + PushPendingMessage(from, ds, peerman); } // Might return nullptr messages, which indicates that deserialization failed for some reason @@ -132,7 +129,6 @@ private: CBLSWorker& blsWorker; CChainState& m_chainstate; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGDebugManager& dkgDebugManager; CDKGSessionManager& dkgManager; @@ -140,7 +136,6 @@ private: CQuorumBlockProcessor& quorumBlockProcessor; const CActiveMasternodeManager* const m_mn_activeman; const CSporkManager& m_sporkman; - const std::unique_ptr& m_peerman; const Consensus::LLMQParams params; const int quorumIndex; @@ -160,16 +155,16 @@ private: CDKGPendingMessages pendingPrematureCommitments; public: - CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, + CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const std::unique_ptr& peerman, const Consensus::LLMQParams& _params, int _quorumIndex); + const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex); ~CDKGSessionHandler(); void UpdatedBlockTip(const CBlockIndex *pindexNew); - void ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); + void ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv); - void StartThread(); + void StartThread(CConnman& connman, PeerManager& peerman); void StopThread(); bool GetContribution(const uint256& hash, CDKGContribution& ret) const; @@ -194,8 +189,8 @@ private: void WaitForNewQuorum(const uint256& oldQuorumHash) const; void SleepBeforePhase(QuorumPhase curPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting) const; void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting); - void HandleDKGRound(); - void PhaseHandlerThread(); + void HandleDKGRound(CConnman& connman, PeerManager& peerman); + void PhaseHandlerThread(CConnman& connman, PeerManager& peerman); }; } // namespace llmq diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index b9e33f30f9..d6fb0715c1 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -29,14 +29,14 @@ static const std::string DB_VVEC = "qdkg_V"; static const std::string DB_SKCONTRIB = "qdkg_S"; static const std::string DB_ENC_CONTRIB = "qdkg_E"; -CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, - CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, - const std::unique_ptr& peerman, bool unitTests, bool fWipe) : +CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + CQuorumBlockProcessor& _quorumBlockProcessor, + const CActiveMasternodeManager* const mn_activeman, + const CSporkManager& sporkman, bool unitTests, bool fWipe) : db(std::make_unique(unitTests ? "" : (gArgs.GetDataDirNet() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)), blsWorker(_blsWorker), m_chainstate(chainstate), - connman(_connman), m_dmnman(dmnman), dkgDebugManager(_dkgDebugManager), quorumBlockProcessor(_quorumBlockProcessor), @@ -51,19 +51,20 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai for (const auto& params : consensus_params.llmqs) { auto session_count = (params.useRotation) ? params.signingActiveQuorumCount : 1; for (const auto i : irange::range(session_count)) { - dkgSessionHandlers.emplace(std::piecewise_construct, - std::forward_as_tuple(params.type, i), - std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, - quorumBlockProcessor, mn_activeman, spork_manager, peerman, params, i)); + dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i), + std::forward_as_tuple(blsWorker, m_chainstate, dmnman, dkgDebugManager, *this, + mn_metaman, quorumBlockProcessor, mn_activeman, + spork_manager, params, i)); } } } CDKGSessionManager::~CDKGSessionManager() = default; -void CDKGSessionManager::StartThreads() + +void CDKGSessionManager::StartThreads(CConnman& connman, PeerManager& peerman) { for (auto& it : dkgSessionHandlers) { - it.second.StartThread(); + it.second.StartThread(connman, peerman); } } @@ -90,7 +91,8 @@ void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fIni } } -PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager* peerman, bool is_masternode, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode, + const std::string& msg_type, CDataStream& vRecv) { static Mutex cs_indexedQuorumsCache; static std::map> indexedQuorumsCache GUARDED_BY(cs_indexedQuorumsCache); diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 8ca0f9acd0..eaa5dc79de 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -42,7 +42,6 @@ private: CBLSWorker& blsWorker; CChainState& m_chainstate; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGDebugManager& dkgDebugManager; CQuorumBlockProcessor& quorumBlockProcessor; @@ -71,18 +70,19 @@ private: mutable std::map contributionsCache GUARDED_BY(contributionsCacheCs); public: - CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, - CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, - const std::unique_ptr& peerman, bool unitTests, bool fWipe); + CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, + const CSporkManager& sporkman, bool unitTests, bool fWipe); ~CDKGSessionManager(); - void StartThreads(); + void StartThreads(CConnman& connman, PeerManager& peerman); void StopThreads(); void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload); - PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager* peerman, bool is_masternode, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode, const std::string& msg_type, + CDataStream& vRecv); bool AlreadyHave(const CInv& inv) const; bool GetContribution(const uint256& hash, CDKGContribution& ret) const; bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const; diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index d027ff94c3..f5d9a9299b 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -430,14 +430,14 @@ std::vector CInstantSendDb::RemoveChainedInstantSendLocks(const uint256 //////////////// -void CInstantSendManager::Start() +void CInstantSendManager::Start(PeerManager& peerman) { // can't start new thread if we have one running already if (workThread.joinable()) { assert(false); } - workThread = std::thread(&util::TraceThread, "isman", [this] { WorkThreadMain(); }); + workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); }); sigman.RegisterRecoveredSigsListener(this); } @@ -732,21 +732,23 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock)); } -PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv) +PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type, + CDataStream& vRecv) { if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) { const auto islock = std::make_shared(); vRecv >> *islock; - return ProcessMessageInstantSendLock(pfrom, islock); + return ProcessMessageInstantSendLock(pfrom, peerman, islock); } return {}; } -PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock) +PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman, + const llmq::CInstantSendLockPtr& islock) { auto hash = ::SerializeHash(*islock); - WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); + WITH_LOCK(::cs_main, peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); if (!islock->TriviallyValid()) { return tl::unexpected{100}; @@ -800,7 +802,7 @@ bool CInstantSendLock::TriviallyValid() const return true; } -bool CInstantSendManager::ProcessPendingInstantSendLocks() +bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman) { decltype(pendingInstantSendLocks) pend; bool fMoreWork{false}; @@ -845,7 +847,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks() auto dkgInterval = llmq_params.dkgInterval; // First check against the current active set and don't ban - auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, 0, pend, false); + auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false); if (!badISLocks.empty()) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__); @@ -858,13 +860,15 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks() } } // Now check against the previous active set and perform banning if this fails - ProcessPendingInstantSendLocks(llmq_params, dkgInterval, pend, true); + ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true); } return fMoreWork; } -std::unordered_set CInstantSendManager::ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, const std::unordered_map, StaticSaltedHasher>& pend, bool ban) +std::unordered_set CInstantSendManager::ProcessPendingInstantSendLocks( + const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset, + const std::unordered_map, StaticSaltedHasher>& pend, bool ban) { CBLSBatchVerifier batchVerifier(false, true, 8); std::unordered_map recSigs; @@ -936,7 +940,7 @@ std::unordered_set CInstantSendManager::ProcessPend for (const auto& nodeId : batchVerifier.badSources) { // Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which // does not validate anymore due to changed quorums - Assert(m_peerman)->Misbehaving(nodeId, 20); + peerman.Misbehaving(nodeId, 20); } } for (const auto& p : pend) { @@ -951,7 +955,7 @@ std::unordered_set CInstantSendManager::ProcessPend continue; } - ProcessInstantSendLock(nodeId, hash, islock); + ProcessInstantSendLock(nodeId, peerman, hash, islock); // See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid // double-verification of the sig. @@ -969,7 +973,8 @@ std::unordered_set CInstantSendManager::ProcessPend return badISLocks; } -void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock) +void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, + const CInstantSendLockPtr& islock) { LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__, islock->txid.ToString(), hash.ToString(), from); @@ -1030,28 +1035,28 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has CInv inv(MSG_ISDLOCK, hash); if (tx != nullptr) { - Assert(m_peerman)->RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION); + peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION); } else { // we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce // with the TX taken into account. - Assert(m_peerman)->RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION); + peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION); } ResolveBlockConflicts(hash, *islock); if (tx != nullptr) { - RemoveMempoolConflictsForLock(hash, *islock); + RemoveMempoolConflictsForLock(peerman, hash, *islock); LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__, hash.ToString(), tx->GetHash().ToString()); GetMainSignals().NotifyTransactionLock(tx, islock); // bump mempool counter to make sure newly locked txes are picked up by getblocktemplate mempool.AddTransactionsUpdated(1); } else { - m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode); + peerman.AskPeersForTransaction(islock->txid, m_is_masternode); } } -void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) +void CInstantSendManager::TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx) { if (!IsInstantSendEnabled() || !m_mn_sync.IsBlockchainSynced() || tx->vin.empty()) { return; @@ -1080,7 +1085,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) // TX is not locked, so make sure it is tracked AddNonLockedTx(tx, nullptr); } else { - RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock); + RemoveMempoolConflictsForLock(peerman, ::SerializeHash(*islock), *islock); } } @@ -1292,7 +1297,8 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex) } } -void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) +void CInstantSendManager::RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash, + const CInstantSendLock& islock) { std::unordered_map toDelete; @@ -1321,7 +1327,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con for (const auto& p : toDelete) { RemoveConflictedTx(*p.second); } - m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode); + peerman.AskPeersForTransaction(islock.txid, m_is_masternode); } } @@ -1583,10 +1589,10 @@ size_t CInstantSendManager::GetInstantSendLockCount() const return db.GetInstantSendLockCount(); } -void CInstantSendManager::WorkThreadMain() +void CInstantSendManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { - bool fMoreWork = ProcessPendingInstantSendLocks(); + bool fMoreWork = ProcessPendingInstantSendLocks(peerman); ProcessPendingRetryLockTxs(); if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { diff --git a/src/llmq/instantsend.h b/src/llmq/instantsend.h index 2b83878e9a..158ce1b699 100644 --- a/src/llmq/instantsend.h +++ b/src/llmq/instantsend.h @@ -206,7 +206,6 @@ private: CSporkManager& spork_manager; CTxMemPool& mempool; const CMasternodeSync& m_mn_sync; - const std::unique_ptr& m_peerman; const bool m_is_masternode; @@ -256,9 +255,8 @@ private: public: explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman, CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman, - CTxMemPool& _mempool, const CMasternodeSync& mn_sync, - const std::unique_ptr& peerman, bool is_masternode, bool unitTests, - bool fWipe) : + CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool is_masternode, + bool unitTests, bool fWipe) : db(unitTests, fWipe), clhandler(_clhandler), m_chainstate(chainstate), @@ -268,14 +266,13 @@ public: spork_manager(sporkman), mempool(_mempool), m_mn_sync(mn_sync), - m_peerman(peerman), m_is_masternode{is_masternode} { workInterrupt.reset(); } ~CInstantSendManager() = default; - void Start(); + void Start(PeerManager& peerman); void Stop(); void InterruptWorkerThread() { workInterrupt(); }; @@ -295,18 +292,15 @@ private: const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests); void TrySignInstantSendLock(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating); - PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock); - bool ProcessPendingInstantSendLocks() + PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman, const CInstantSendLockPtr& islock); + bool ProcessPendingInstantSendLocks(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - std::unordered_set ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, - int signOffset, - const std::unordered_map, - StaticSaltedHasher>& pend, - bool ban) + std::unordered_set ProcessPendingInstantSendLocks( + const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset, + const std::unordered_map, StaticSaltedHasher>& pend, bool ban) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock) + void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const CInstantSendLockPtr& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) @@ -318,14 +312,14 @@ private: void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests); - void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) + void RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash, const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); void ProcessPendingRetryLockTxs() EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); - void WorkThreadMain() + void WorkThreadMain(PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); void HandleFullyConfirmedBlock(const CBlockIndex* pindex) @@ -339,9 +333,9 @@ public: [[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_pendingLocks); - PeerMsgRet ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv); - void TransactionAddedToMempool(const CTransactionRef& tx) + void TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); void TransactionRemovedFromMempool(const CTransactionRef& tx); void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 1b9902a5a8..85309f0bd3 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -210,8 +210,8 @@ bool CQuorum::ReadContributions(const CDBWrapper& db) return true; } -CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, +CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, const CSporkManager& sporkman, bool unit_tests, bool wipe) : @@ -219,7 +219,6 @@ CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, unit_tests, wipe)), blsWorker(_blsWorker), m_chainstate(chainstate), - connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), quorumBlockProcessor(_quorumBlockProcessor), @@ -249,7 +248,7 @@ void CQuorumManager::Stop() workerPool.stop(true); } -void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const +void CQuorumManager::TriggerQuorumDataRecoveryThreads(CConnman& connman, const CBlockIndex* pIndex) const { if ((m_mn_activeman == nullptr && !IsWatchQuorumsEnabled()) || !QuorumDataRecoveryEnabled() || pIndex == nullptr) { return; @@ -296,17 +295,17 @@ void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) } // Finally start the thread which triggers the requests for this quorum - StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask); + StartQuorumDataRecoveryThread(connman, pQuorum, pIndex, nDataMask); } } } -void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const +void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, CConnman& connman, bool fInitialDownload) const { if (!m_mn_sync.IsBlockchainSynced()) return; for (const auto& params : Params().GetConsensus().llmqs) { - CheckQuorumConnections(params, pindexNew); + CheckQuorumConnections(connman, params, pindexNew); } if (m_mn_activeman != nullptr || IsWatchQuorumsEnabled()) { @@ -322,11 +321,12 @@ void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitial } } - TriggerQuorumDataRecoveryThreads(pindexNew); + TriggerQuorumDataRecoveryThreads(connman, pindexNew); StartCleanupOldQuorumDataThread(pindexNew); } -void CQuorumManager::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, const CBlockIndex* pindexNew) const +void CQuorumManager::CheckQuorumConnections(CConnman& connman, const Consensus::LLMQParams& llmqParams, + const CBlockIndex* pindexNew) const { if (m_mn_activeman == nullptr && !IsWatchQuorumsEnabled()) return; @@ -469,7 +469,9 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockP return quorum_block_processor.HasMinedCommitment(llmqType, quorumHash); } -bool CQuorumManager::RequestQuorumData(CNode* pfrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, const uint256& proTxHash) const +bool CQuorumManager::RequestQuorumData(CNode* pfrom, CConnman& connman, Consensus::LLMQType llmqType, + const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, + const uint256& proTxHash) const { if (pfrom == nullptr) { LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid pfrom: nullptr\n", __func__); @@ -697,7 +699,7 @@ size_t CQuorumManager::GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, c return nIndex % pQuorum->qc->validMembers.size(); } -PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& connman, const std::string& msg_type, CDataStream& vRecv) { auto strFunc = __func__; auto errorHandler = [&](const std::string& strError, int nScore = 10) -> PeerMsgRet { @@ -709,7 +711,6 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t }; if (msg_type == NetMsgType::QGETDATA) { - if (m_mn_activeman == nullptr || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) { return errorHandler("Not a verified masternode or a qwatch connection"); } @@ -912,7 +913,8 @@ void CQuorumManager::StartCachePopulatorThread(const CQuorumCPtr pQuorum) const }); } -void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMaskIn) const +void CQuorumManager::StartQuorumDataRecoveryThread(CConnman& connman, const CQuorumCPtr pQuorum, + const CBlockIndex* pIndex, uint16_t nDataMaskIn) const { assert(m_mn_activeman); @@ -922,7 +924,7 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co } pQuorum->fQuorumDataRecoveryThreadRunning = true; - workerPool.push([pQuorum, pIndex, nDataMaskIn, this](int threadId) { + workerPool.push([&connman, pQuorum, pIndex, nDataMaskIn, this](int threadId) { size_t nTries{0}; uint16_t nDataMask{nDataMaskIn}; int64_t nTimeLastSuccess{0}; @@ -958,7 +960,6 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co printLog("Try to request"); while (nDataMask > 0 && !quorumThreadInterrupt) { - if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && pQuorum->HasVerificationVector()) { nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; @@ -1005,7 +1006,8 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co return; } - if (RequestQuorumData(pNode, pQuorum->qc->llmqType, pQuorum->m_quorum_base_block_index, nDataMask, proTxHash)) { + if (RequestQuorumData(pNode, connman, pQuorum->qc->llmqType, pQuorum->m_quorum_base_block_index, + nDataMask, proTxHash)) { nTimeLastSuccess = GetTime().count(); printLog("Requested"); } else { diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 892c0ecd27..6e8db117db 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -238,7 +238,6 @@ private: CBLSWorker& blsWorker; CChainState& m_chainstate; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGSessionManager& dkgManager; CQuorumBlockProcessor& quorumBlockProcessor; @@ -261,7 +260,7 @@ private: mutable CThreadInterrupt quorumThreadInterrupt; public: - CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, + CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, const CSporkManager& sporkman, bool unit_tests, bool wipe); @@ -270,15 +269,17 @@ public: void Start(); void Stop(); - void TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const; + void TriggerQuorumDataRecoveryThreads(CConnman& connman, const CBlockIndex* pIndex) const; - void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload) const; + void UpdatedBlockTip(const CBlockIndex* pindexNew, CConnman& connman, bool fInitialDownload) const; - PeerMsgRet ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(CNode& pfrom, CConnman& connman, const std::string& msg_type, CDataStream& vRecv); static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash); - bool RequestQuorumData(CNode* pfrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, const uint256& proTxHash = uint256()) const; + bool RequestQuorumData(CNode* pfrom, CConnman& connman, Consensus::LLMQType llmqType, + const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, + const uint256& proTxHash = uint256()) const; // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const; @@ -289,7 +290,8 @@ public: private: // all private methods here are cs_main-free - void CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, const CBlockIndex *pindexNew) const; + void CheckQuorumConnections(CConnman& connman, const Consensus::LLMQParams& llmqParams, + const CBlockIndex* pindexNew) const; CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, gsl::not_null pQuorumBaseBlockIndex, bool populate_cache) const; bool BuildQuorumContributions(const CFinalCommitmentPtr& fqc, const std::shared_ptr& quorum) const; @@ -301,7 +303,8 @@ private: size_t GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const; void StartCachePopulatorThread(const CQuorumCPtr pQuorum) const; - void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const; + void StartQuorumDataRecoveryThread(CConnman& connman, const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, + uint16_t nDataMask) const; void StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) const; void MigrateOldQuorumDB(CEvoDB& evoDb) const; diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index edc8e2f74a..ca4033867d 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -349,8 +349,11 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, - const CQuorumManager& _qman, const std::unique_ptr& peerman, bool fMemory, bool fWipe) : - db(fMemory, fWipe), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman) + const CQuorumManager& _qman, bool fMemory, bool fWipe) : + db(fMemory, fWipe), + m_mn_activeman(mn_activeman), + m_chainstate(chainstate), + qman(_qman) { } @@ -381,13 +384,14 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, + CDataStream& vRecv) { if (msg_type == NetMsgType::QSIGREC) { auto recoveredSig = std::make_shared(); vRecv >> *recoveredSig; - return ProcessMessageRecoveredSig(pfrom, recoveredSig); + return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig); } return {}; } @@ -416,10 +420,11 @@ static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CR return true; } -PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig) +PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman, + const std::shared_ptr& recoveredSig) { - WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), - CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()))); + WITH_LOCK(::cs_main, + peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()))); bool ban = false; if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) { @@ -517,22 +522,22 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( } } -void CSigningManager::ProcessPendingReconstructedRecoveredSigs() +void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman) { decltype(pendingReconstructedRecoveredSigs) m; WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs)); for (const auto& p : m) { - ProcessRecoveredSig(p.second); + ProcessRecoveredSig(p.second, peerman); } } -bool CSigningManager::ProcessPendingRecoveredSigs() +bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman) { std::unordered_map>> recSigsByNode; std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - ProcessPendingReconstructedRecoveredSigs(); + ProcessPendingReconstructedRecoveredSigs(peerman); const size_t nMaxBatchSize{32}; CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums); @@ -575,7 +580,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() if (batchVerifier.badSources.count(nodeId)) { LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - Assert(m_peerman)->Misbehaving(nodeId, 100); + peerman.Misbehaving(nodeId, 100); continue; } @@ -584,7 +589,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() continue; } - ProcessRecoveredSig(recSig); + ProcessRecoveredSig(recSig, peerman); } } @@ -592,7 +597,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() } // signature must be verified already -void CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig) +void CSigningManager::ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman) { auto llmqType = recoveredSig->getLlmqType(); @@ -631,12 +636,12 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash())); if (m_mn_activeman != nullptr) { - Assert(m_peerman)->RelayRecoveredSig(recoveredSig->GetHash()); + peerman.RelayRecoveredSig(recoveredSig->GetHash()); } auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners); for (auto& l : listeners) { - Assert(m_peerman)->PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); + peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig)); } GetMainSignals().NotifyRecoveredSig(recoveredSig); @@ -799,14 +804,14 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256& return db.GetVoteForId(llmqType, id, msgHashRet); } -void CSigningManager::StartWorkerThread() +void CSigningManager::StartWorkerThread(PeerManager& peerman) { // can't start new thread if we have one running already if (workThread.joinable()) { assert(false); } - workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); + workThread = std::thread(&util::TraceThread, "sigshares", [this, &peerman] { WorkThreadMain(peerman); }); } void CSigningManager::StopWorkerThread() @@ -826,10 +831,10 @@ void CSigningManager::InterruptWorkerThread() workInterrupt(); } -void CSigningManager::WorkThreadMain() +void CSigningManager::WorkThreadMain(PeerManager& peerman) { while (!workInterrupt) { - bool fMoreWork = ProcessPendingRecoveredSigs(); + bool fMoreWork = ProcessPendingRecoveredSigs(peerman); Cleanup(); diff --git a/src/llmq/signing.h b/src/llmq/signing.h index e044c51bf6..1b7891c7db 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -162,7 +162,6 @@ private: const CActiveMasternodeManager* const m_mn_activeman; const CChainState& m_chainstate; const CQuorumManager& qman; - const std::unique_ptr& m_peerman; mutable Mutex cs_pending; // Incoming and not verified yet @@ -178,12 +177,12 @@ private: public: CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, - const CQuorumManager& _qman, const std::unique_ptr& peerman, bool fMemory, bool fWipe); + const CQuorumManager& _qman, bool fMemory, bool fWipe); bool AlreadyHave(const CInv& inv) const; bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pnode, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid // This is the case for example when a signature appears as part of InstantSend or ChainLocks @@ -196,16 +195,18 @@ public: void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); private: - PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig); + PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman, + const std::shared_ptr& recoveredSig); void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::unordered_map>>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); - void ProcessPendingReconstructedRecoveredSigs(); - bool ProcessPendingRecoveredSigs(); // called from the worker thread of CSigSharesManager + void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman); + bool ProcessPendingRecoveredSigs(PeerManager& peerman); // called from the worker thread of CSigSharesManager public: // TODO - should not be public! - void ProcessRecoveredSig(const std::shared_ptr& recoveredSig); + void ProcessRecoveredSig(const std::shared_ptr& recoveredSig, PeerManager& peerman); + private: void Cleanup(); // called from the worker thread of CSigSharesManager @@ -228,10 +229,10 @@ public: private: std::thread workThread; CThreadInterrupt workInterrupt; - void WorkThreadMain(); + void WorkThreadMain(PeerManager& peerman); public: - void StartWorkerThread(); + void StartWorkerThread(PeerManager& peerman); void StopWorkerThread(); void InterruptWorkerThread(); }; diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 2b72df3e2e..6bcc0e8996 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -178,14 +178,15 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash) ////////////////////// -void CSigSharesManager::StartWorkerThread() +void CSigSharesManager::StartWorkerThread(CConnman& connman, PeerManager& peerman) { // can't start new thread if we have one running already if (workThread.joinable()) { assert(false); } - workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); }); + workThread = std::thread(&util::TraceThread, "sigshares", + [this, &connman, &peerman] { WorkThreadMain(connman, peerman); }); } void CSigSharesManager::StopWorkerThread() @@ -215,7 +216,8 @@ void CSigSharesManager::InterruptWorkerThread() workInterrupt(); } -void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv) +void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const CSporkManager& sporkman, + const std::string& msg_type, CDataStream& vRecv) { // non-masternodes are not interested in sigshares if (m_mn_activeman == nullptr) return; @@ -227,12 +229,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } for (const auto& sigShare : receivedSigShares) { - ProcessMessageSigShare(pfrom.GetId(), sigShare); + ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare); } } @@ -241,12 +243,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& vRecv >> msgs; if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId()); - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } if (!ranges::all_of(msgs, [this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) { - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } } else if (msg_type == NetMsgType::QSIGSHARESINV) { @@ -254,12 +256,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& vRecv >> msgs; if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId()); - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } if (!ranges::all_of(msgs, [this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) { - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } } else if (msg_type == NetMsgType::QGETSIGSHARES) { @@ -267,12 +269,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& vRecv >> msgs; if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId()); - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } if (!ranges::all_of(msgs, [this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) { - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } } else if (msg_type == NetMsgType::QBSIGSHARES) { @@ -284,12 +286,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& } if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId()); - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } if (!ranges::all_of(msgs, [this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) { - BanNode(pfrom.GetId()); + BanNode(pfrom.GetId(), peerman); return; } } @@ -454,7 +456,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const return true; } -void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare) +void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare) { assert(m_mn_activeman); @@ -479,12 +481,12 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s if (sigShare.getQuorumMember() >= quorum->members.size()) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); - BanNode(fromId); + BanNode(fromId, peerman); return; } if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) { LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__); - BanNode(fromId); + BanNode(fromId, peerman); return; } @@ -620,7 +622,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify( return true; } -bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman) +bool CSigSharesManager::ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman) { std::unordered_map> sigSharesByNodes; std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; @@ -646,7 +648,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman) // we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive // deserialization in the message thread if (!sigShare.sigShare.Get().IsValid()) { - BanNode(nodeId); + BanNode(nodeId, peerman); // don't process any additional shares from this node break; } @@ -678,25 +680,26 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman) LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n", __func__, nodeId); // this will also cause re-requesting of the shares that were sent by this node - BanNode(nodeId); + BanNode(nodeId, peerman); continue; } - ProcessPendingSigShares(v, quorums, connman); + ProcessPendingSigShares(v, quorums, peerman, connman); } return sigSharesByNodes.size() >= nMaxBatchSize; } // It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigShares(const std::vector& sigSharesToProcess, - const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, - const CConnman& connman) +void CSigSharesManager::ProcessPendingSigShares( + const std::vector& sigSharesToProcess, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + PeerManager& peerman, const CConnman& connman) { cxxtimer::Timer t(true); for (const auto& sigShare : sigSharesToProcess) { auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash()); - ProcessSigShare(sigShare, connman, quorums.at(quorumKey)); + ProcessSigShare(peerman, sigShare, connman, quorums.at(quorumKey)); } t.stop(); @@ -705,7 +708,8 @@ void CSigSharesManager::ProcessPendingSigShares(const std::vector& si } // sig shares are already verified when entering this method -void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum) +void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman, + const CQuorumCPtr& quorum) { assert(m_mn_activeman); @@ -754,11 +758,12 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma } if (canTryRecovery) { - TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash()); + TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash()); } } -void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) +void CSigSharesManager::TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id, + const uint256& msgHash) { if (sigman.HasRecoveredSigForId(quorum->params.type, id)) { return; @@ -817,7 +822,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& } } - sigman.ProcessRecoveredSig(rs); + sigman.ProcessRecoveredSig(rs, peerman); } CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt) @@ -1027,7 +1032,9 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToAnnounce) +void CSigSharesManager::CollectSigSharesToAnnounce( + const CConnman& connman, + std::unordered_map>& sigSharesToAnnounce) { AssertLockHeld(cs); @@ -1035,8 +1042,8 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map> sigSharesToRequest; std::unordered_map> sigShareBatchesToSend; @@ -1113,7 +1120,7 @@ bool CSigSharesManager::SendMessages() LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigShareBatchesToSend); - CollectSigSharesToAnnounce(sigSharesToAnnounce); + CollectSigSharesToAnnounce(connman, sigSharesToAnnounce); CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes()); for (auto& [nodeId, sigShareMap] : sigSharesToRequest) { @@ -1254,7 +1261,7 @@ CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionI return sigShare; } -void CSigSharesManager::Cleanup() +void CSigSharesManager::Cleanup(const CConnman& connman) { int64_t now = GetTime().count(); if (now - lastCleanupTime < 5) { @@ -1407,13 +1414,13 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) timeSeenForSessions.erase(signHash); } -void CSigSharesManager::RemoveBannedNodeStates() +void CSigSharesManager::RemoveBannedNodeStates(PeerManager& peerman) { // Called regularly to cleanup local node states for banned nodes LOCK(cs); for (auto it = nodeStates.begin(); it != nodeStates.end();) { - if (Assert(m_peerman)->IsBanned(it->first)) { + if (peerman.IsBanned(it->first)) { // re-request sigshares from other nodes // TODO: remove NO_THREAD_SAFETY_ANALYSIS // using here template ForEach makes impossible to use lock annotation @@ -1428,23 +1435,21 @@ void CSigSharesManager::RemoveBannedNodeStates() } } -void CSigSharesManager::BanNode(NodeId nodeId) +void CSigSharesManager::BanNode(NodeId nodeId, PeerManager& peerman) { if (nodeId == -1) { return; } - { - Assert(m_peerman)->Misbehaving(nodeId, 100); - } + peerman.Misbehaving(nodeId, 100); LOCK(cs); auto it = nodeStates.find(nodeId); if (it == nodeStates.end()) { return; } - auto& nodeState = it->second; + auto& nodeState = it->second; // Whatever we requested from him, let's request it from someone else now // TODO: remove NO_THREAD_SAFETY_ANALYSIS // using here template ForEach makes impossible to use lock annotation @@ -1453,26 +1458,25 @@ void CSigSharesManager::BanNode(NodeId nodeId) sigSharesRequested.Erase(k); }); nodeState.requestedSigShares.Clear(); - nodeState.banned = true; } -void CSigSharesManager::WorkThreadMain() +void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman) { int64_t lastSendTime = 0; while (!workInterrupt) { - RemoveBannedNodeStates(); + RemoveBannedNodeStates(peerman); - bool fMoreWork = ProcessPendingSigShares(connman); - SignPendingSigShares(); + bool fMoreWork = ProcessPendingSigShares(peerman, connman); + SignPendingSigShares(connman, peerman); if (GetTimeMillis() - lastSendTime > 100) { - SendMessages(); + SendMessages(connman); lastSendTime = GetTimeMillis(); } - Cleanup(); + Cleanup(connman); // TODO Wakeup when pending signing is needed? if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { @@ -1487,7 +1491,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, pendingSigns.emplace_back(quorum, id, msgHash); } -void CSigSharesManager::SignPendingSigShares() +void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManager& peerman) { std::vector v; WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns)); @@ -1497,7 +1501,7 @@ void CSigSharesManager::SignPendingSigShares() if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) { auto sigShare = *opt_sigShare; - ProcessSigShare(sigShare, connman, pQuorum); + ProcessSigShare(peerman, sigShare, connman, pQuorum); if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) { LOCK(cs); diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index cd772b85a0..51e4a142d0 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -404,34 +404,35 @@ private: FastRandomContext rnd GUARDED_BY(cs); - CConnman& connman; CSigningManager& sigman; const CActiveMasternodeManager* const m_mn_activeman; const CQuorumManager& qman; const CSporkManager& m_sporkman; - const std::unique_ptr& m_peerman; - int64_t lastCleanupTime{0}; std::atomic recoveredSigsCounter{0}; public: - explicit CSigSharesManager(CConnman& _connman, CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman, - const CQuorumManager& _qman, const CSporkManager& sporkman, const std::unique_ptr& peerman) : - connman(_connman), sigman(_sigman), m_mn_activeman(mn_activeman), qman(_qman), m_sporkman(sporkman), m_peerman(peerman) + explicit CSigSharesManager(CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman, + const CQuorumManager& _qman, const CSporkManager& sporkman) : + sigman(_sigman), + m_mn_activeman(mn_activeman), + qman(_qman), + m_sporkman(sporkman) { workInterrupt.reset(); }; CSigSharesManager() = delete; ~CSigSharesManager() override = default; - void StartWorkerThread(); + void StartWorkerThread(CConnman& connman, PeerManager& peerman); void StopWorkerThread(); void RegisterAsRecoveredSigsListener(); void UnregisterAsRecoveredSigsListener(); void InterruptWorkerThread(); - void ProcessMessage(const CNode& pnode, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv); + void ProcessMessage(const CNode& pnode, PeerManager& peerman, const CSporkManager& sporkman, + const std::string& msg_type, CDataStream& vRecv); void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); std::optional CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const; @@ -447,7 +448,7 @@ private: bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv); bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv); bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares); - void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare); + void ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare); static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv); static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager, @@ -456,31 +457,36 @@ private: bool CollectPendingSigSharesToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); - bool ProcessPendingSigShares(const CConnman& connman); + bool ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman); - void ProcessPendingSigShares(const std::vector& sigSharesToProcess, - const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, - const CConnman& connman); + void ProcessPendingSigShares( + const std::vector& sigSharesToProcess, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + PeerManager& peerman, const CConnman& connman); - void ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum); - void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); + void ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman, + const CQuorumCPtr& quorum); + void TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair& in); - void Cleanup(); + void Cleanup(const CConnman& connman); void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs); - void RemoveBannedNodeStates(); + void RemoveBannedNodeStates(PeerManager& peerman); - void BanNode(NodeId nodeId); + void BanNode(NodeId nodeId, PeerManager& peerman); - bool SendMessages(); + bool SendMessages(CConnman& connman); void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) EXCLUSIVE_LOCKS_REQUIRED(cs); void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); - void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); - void SignPendingSigShares(); - void WorkThreadMain(); + void CollectSigSharesToAnnounce( + const CConnman& connman, + std::unordered_map>& sigSharesToAnnounce) + EXCLUSIVE_LOCKS_REQUIRED(cs); + void SignPendingSigShares(const CConnman& connman, PeerManager& peerman); + void WorkThreadMain(CConnman& connman, PeerManager& peerman); }; } // namespace llmq diff --git a/src/net.cpp b/src/net.cpp index 45ebdafbf9..7c2804ed5e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -4651,13 +4651,13 @@ void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, con }); } -bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) +bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const { LOCK(cs_vPendingMasternodes); return masternodeQuorumNodes.count(std::make_pair(llmqType, quorumHash)); } -std::set CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) +std::set CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) const { LOCK(cs_vPendingMasternodes); std::set result; @@ -4700,7 +4700,7 @@ void CConnman::RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const u masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash)); } -bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) +bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const { // Let's see if this is an outgoing connection to an address that is known to be a masternode // We however only need to know this if the node did not authenticate itself as a MN yet diff --git a/src/net.h b/src/net.h index c0a2b82cde..9085771b53 100644 --- a/src/net.h +++ b/src/net.h @@ -1503,12 +1503,12 @@ public: bool AddPendingMasternode(const uint256& proTxHash); void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set& proTxHashes); void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set& proTxHashes); - bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); - std::set GetMasternodeQuorums(Consensus::LLMQType llmqType); + bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const; + std::set GetMasternodeQuorums(Consensus::LLMQType llmqType) const; // also returns QWATCH nodes std::set GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const; void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash); - bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list); + bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const; bool IsMasternodeQuorumRelayMember(const uint256& protxHash); void AddPendingProbeConnections(const std::set& proTxHashes); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index c20c508be8..4fc702a822 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5251,7 +5251,7 @@ void PeerManagerImpl::ProcessMessage( { //probably one the extensions #ifdef ENABLE_WALLET - ProcessPeerMsgRet(m_cj_ctx->queueman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_cj_ctx->queueman->ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom); m_cj_ctx->walletman->ForEachCJClientMan([this, &pfrom, &msg_type, &vRecv](std::unique_ptr& clientman) { clientman->ProcessMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv); }); @@ -5262,10 +5262,10 @@ void PeerManagerImpl::ProcessMessage( ProcessPeerMsgRet(m_govman.ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom); ProcessPeerMsgRet(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, m_mn_activeman, m_chainman.ActiveChain(), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); - ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, is_masternode, msg_type, vRecv), pfrom); - ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); - m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv); - ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom); + m_llmq_ctx->shareman->ProcessMessage(pfrom, *this, m_sporkman, msg_type, vRecv); + ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom); if (msg_type == NetMsgType::CLSIG) { if (llmq::AreChainLocksEnabled(m_sporkman)) { @@ -5278,7 +5278,7 @@ void PeerManagerImpl::ProcessMessage( return; // CLSIG } - ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom); return; } diff --git a/src/rpc/quorums.cpp b/src/rpc/quorums.cpp index e4c12e8b21..5289d027a2 100644 --- a/src/rpc/quorums.cpp +++ b/src/rpc/quorums.cpp @@ -818,7 +818,7 @@ static RPCHelpMan quorum_getdata() const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(cs_main, return chainman.m_blockman.LookupBlockIndex(quorumHash)); return connman.ForNode(nodeId, [&](CNode* pNode) { - return llmq_ctx.qman->RequestQuorumData(pNode, llmqType, pQuorumBaseBlockIndex, nDataMask, proTxHash); + return llmq_ctx.qman->RequestQuorumData(pNode, connman, llmqType, pQuorumBaseBlockIndex, nDataMask, proTxHash); }); }, }; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 25b355ef71..3e84fc87dd 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -107,16 +107,16 @@ void DashTestSetup(NodeContext& node, const CChainParams& chainparams) { CChainState& chainstate = Assert(node.chainman)->ActiveChainstate(); - node.dmnman = std::make_unique(chainstate, *node.connman, *node.evodb); + node.dmnman = std::make_unique(chainstate, *node.evodb); node.mempool->ConnectManagers(node.dmnman.get()); node.cj_ctx = std::make_unique(*node.chainman, *node.connman, *node.dmnman, *node.mn_metaman, *node.mempool, - /* mn_activeman = */ nullptr, *node.mn_sync, node.peerman, /* relay_txes = */ true); + /*mn_activeman=*/nullptr, *node.mn_sync, node.peerman, /*relay_txes=*/true); #ifdef ENABLE_WALLET node.coinjoin_loader = interfaces::MakeCoinJoinLoader(*node.cj_ctx->walletman); #endif // ENABLE_WALLET - node.llmq_ctx = std::make_unique(*node.chainman, *node.connman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, *node.mempool, - /* mn_activeman = */ nullptr, *node.mn_sync, node.peerman, /* unit_tests = */ true, /* wipe = */ false); + node.llmq_ctx = std::make_unique(*node.chainman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, *node.mempool, + /*mn_activeman=*/nullptr, *node.mn_sync, /*unit_tests=*/true, /*wipe=*/false); Assert(node.mnhf_manager)->ConnectManagers(node.chainman.get(), node.llmq_ctx->qman.get()); node.chain_helper = std::make_unique(*node.cpoolman, *node.dmnman, *node.mnhf_manager, *node.govman, *(node.llmq_ctx->quorum_block_processor), *node.chainman, chainparams.GetConsensus(), *node.mn_sync, *node.sporkman, *(node.llmq_ctx->clhandler), *(node.llmq_ctx->qman));