diff --git a/src/init.cpp b/src/init.cpp index f0d143f032..4e5bda3e78 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2280,7 +2280,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // ********************************************************* Step 10a: schedule Dash-specific tasks - node.llmq_ctx->Start(*node.peerman); + node.llmq_ctx->Start(*node.connman, *node.peerman); node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync), std::cref(*node.peerman), std::cref(*node.govman)), std::chrono::seconds{1}); diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index f48aaab4f0..6f857524dc 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -27,9 +27,9 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm bls_worker{std::make_shared()}, dkg_debugman{std::make_unique()}, quorum_block_processor{std::make_unique(chainman.ActiveChainstate(), dmnman, evo_db)}, - qdkgsman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, - *dkg_debugman, mn_metaman, *quorum_block_processor, - mn_activeman, sporkman, unit_tests, wipe)}, + qdkgsman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), dmnman, *dkg_debugman, + mn_metaman, *quorum_block_processor, mn_activeman, sporkman, + unit_tests, wipe)}, qman{std::make_unique(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, *qdkgsman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests, wipe)}, @@ -74,13 +74,13 @@ void LLMQContext::Interrupt() { llmq::quorumInstantSendManager->InterruptWorkerThread(); } -void LLMQContext::Start(PeerManager& peerman) +void LLMQContext::Start(CConnman& connman, PeerManager& peerman) { assert(clhandler == llmq::chainLocksHandler.get()); assert(isman == llmq::quorumInstantSendManager.get()); if (is_masternode) { - qdkgsman->StartThreads(peerman); + qdkgsman->StartThreads(connman, peerman); } qman->Start(); shareman->RegisterAsRecoveredSigsListener(); diff --git a/src/llmq/context.h b/src/llmq/context.h index 9827428592..adde61b89c 100644 --- a/src/llmq/context.h +++ b/src/llmq/context.h @@ -46,7 +46,7 @@ public: ~LLMQContext(); void Interrupt(); - void Start(PeerManager& peerman); + void Start(CConnman& connman, PeerManager& peerman); void Stop(); /** Guaranteed if LLMQContext is initialized then all members are valid too diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index 177b00b1c2..8bf8b7896d 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -72,14 +72,12 @@ CDKGMember::CDKGMember(const CDeterministicMNCPtr& _dmn, size_t _idx) : } CDKGSession::CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params, - CBLSWorker& _blsWorker, CConnman& _connman, CDeterministicMNManager& dmnman, - CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, - CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman) : + CBLSWorker& _blsWorker, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman) : params(_params), blsWorker(_blsWorker), cache(_blsWorker), - connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), dkgDebugManager(_dkgDebugManager), @@ -417,7 +415,7 @@ void CDKGSession::VerifyPendingContributions() pendingContributionVerifications.clear(); } -void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +void CDKGSession::VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman) { if (!AreWeMember()) { return; @@ -453,12 +451,12 @@ void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages, PeerMa logger.Batch("verified contributions. time=%d", t1.count()); logger.Flush(); - VerifyConnectionAndMinProtoVersions(); + VerifyConnectionAndMinProtoVersions(connman); SendComplaint(pendingMessages, peerman); } -void CDKGSession::VerifyConnectionAndMinProtoVersions() const +void CDKGSession::VerifyConnectionAndMinProtoVersions(CConnman& connman) const { assert(m_mn_metaman.IsValid()); diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index d8a8e20973..82a1263b92 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -280,7 +280,6 @@ private: CBLSWorker& blsWorker; CBLSWorkerCache cache; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGSessionManager& dkgManager; CDKGDebugManager& dkgDebugManager; @@ -327,9 +326,9 @@ private: public: CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, - CConnman& _connman, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, - CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman); + CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, + CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, + const CSporkManager& sporkman); // TODO: remove Init completely bool Init(const uint256& _myProTxHash, int _quorumIndex); @@ -357,8 +356,8 @@ public: void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending); // Phase 2: complaint - void VerifyAndComplain(CDKGPendingMessages& pendingMessages, PeerManager& peerman); - void VerifyConnectionAndMinProtoVersions() const; + void VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman); + void VerifyConnectionAndMinProtoVersions(CConnman& connman) const; void SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman); bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGComplaint& qc); diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 9b4c8b207c..0eee8feb4a 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -24,15 +24,13 @@ namespace llmq { -CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, - CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, - CQuorumBlockProcessor& _quorumBlockProcessor, +CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, + CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex) : blsWorker(_blsWorker), m_chainstate(chainstate), - connman(_connman), m_dmnman(dmnman), dkgDebugManager(_dkgDebugManager), dkgManager(_dkgManager), @@ -42,8 +40,8 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai m_sporkman(sporkman), params(_params), quorumIndex(_quorumIndex), - curSession(std::make_unique(nullptr, _params, _blsWorker, _connman, dmnman, _dkgManager, - _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)), + curSession(std::make_unique(nullptr, _params, _blsWorker, dmnman, _dkgManager, _dkgDebugManager, + m_mn_metaman, m_mn_activeman, sporkman)), pendingContributions( (size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) @@ -165,7 +163,7 @@ void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, PeerManager& peerman } } -void CDKGSessionHandler::StartThread(PeerManager& peerman) +void CDKGSessionHandler::StartThread(CConnman& connman, PeerManager& peerman) { if (phaseHandlerThread.joinable()) { throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread."); @@ -173,7 +171,7 @@ void CDKGSessionHandler::StartThread(PeerManager& peerman) m_thread_name = strprintf("llmq-%d-%d", ToUnderlying(params.type), quorumIndex); phaseHandlerThread = std::thread(&util::TraceThread, m_thread_name.c_str(), - [this, &peerman] { PhaseHandlerThread(peerman); }); + [this, &connman, &peerman] { PhaseHandlerThread(connman, peerman); }); } void CDKGSessionHandler::StopThread() @@ -190,7 +188,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex) return false; } - curSession = std::make_unique(pQuorumBaseBlockIndex, params, blsWorker, connman, m_dmnman, dkgManager, + curSession = std::make_unique(pQuorumBaseBlockIndex, params, blsWorker, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman); if (!curSession->Init(m_mn_activeman->GetProTxHash(), quorumIndex)) { @@ -528,7 +526,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, CDKGSession& session, CDKGPen return true; } -void CDKGSessionHandler::HandleDKGRound(PeerManager& peerman) +void CDKGSessionHandler::HandleDKGRound(CConnman& connman, PeerManager& peerman) { WaitForNextPhase(std::nullopt, QuorumPhase::Initialized); @@ -562,15 +560,17 @@ void CDKGSessionHandler::HandleDKGRound(PeerManager& peerman) // Contribute auto fContributeStart = [this, &peerman]() { curSession->Contribute(pendingContributions, peerman); }; - auto fContributeWait = [this, &peerman] { + auto fContributeWait = [this, &connman, &peerman] { return ProcessPendingMessageBatch(connman, *curSession, pendingContributions, peerman, 8); }; HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); // Complain - auto fComplainStart = [this, &peerman]() { curSession->VerifyAndComplain(pendingComplaints, peerman); }; - auto fComplainWait = [this, &peerman] { + auto fComplainStart = [this, &connman, &peerman]() { + curSession->VerifyAndComplain(connman, pendingComplaints, peerman); + }; + auto fComplainWait = [this, &connman, &peerman] { return ProcessPendingMessageBatch(connman, *curSession, pendingComplaints, peerman, 8); }; @@ -578,7 +578,7 @@ void CDKGSessionHandler::HandleDKGRound(PeerManager& peerman) // Justify auto fJustifyStart = [this, &peerman]() { curSession->VerifyAndJustify(pendingJustifications, peerman); }; - auto fJustifyWait = [this, &peerman] { + auto fJustifyWait = [this, &connman, &peerman] { return ProcessPendingMessageBatch(connman, *curSession, pendingJustifications, peerman, 8); }; @@ -586,7 +586,7 @@ void CDKGSessionHandler::HandleDKGRound(PeerManager& peerman) // Commit auto fCommitStart = [this, &peerman]() { curSession->VerifyAndCommit(pendingPrematureCommitments, peerman); }; - auto fCommitWait = [this, &peerman] { + auto fCommitWait = [this, &connman, &peerman] { return ProcessPendingMessageBatch( connman, *curSession, pendingPrematureCommitments, peerman, 8); }; @@ -600,12 +600,12 @@ void CDKGSessionHandler::HandleDKGRound(PeerManager& peerman) } } -void CDKGSessionHandler::PhaseHandlerThread(PeerManager& peerman) +void CDKGSessionHandler::PhaseHandlerThread(CConnman& connman, PeerManager& peerman) { while (!stopRequested) { try { LogPrint(BCLog::LLMQ_DKG, "CDKGSessionHandler::%s -- %s qi[%d] - starting HandleDKGRound\n", __func__, params.name, quorumIndex); - HandleDKGRound(peerman); + HandleDKGRound(connman, peerman); } catch (AbortPhaseException& e) { dkgDebugManager.UpdateLocalSessionStatus(params.type, quorumIndex, [&](CDKGDebugSessionStatus& status) { status.statusBits.aborted = true; diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 270803cbec..b28ae87e44 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -129,7 +129,6 @@ private: CBLSWorker& blsWorker; CChainState& m_chainstate; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGDebugManager& dkgDebugManager; CDKGSessionManager& dkgManager; @@ -156,9 +155,8 @@ private: CDKGPendingMessages pendingPrematureCommitments; public: - CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, - CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, + CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex); ~CDKGSessionHandler(); @@ -166,7 +164,7 @@ public: void UpdatedBlockTip(const CBlockIndex *pindexNew); void ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv); - void StartThread(PeerManager& peerman); + void StartThread(CConnman& connman, PeerManager& peerman); void StopThread(); bool GetContribution(const uint256& hash, CDKGContribution& ret) const; @@ -191,8 +189,8 @@ private: void WaitForNewQuorum(const uint256& oldQuorumHash) const; void SleepBeforePhase(QuorumPhase curPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting) const; void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting); - void HandleDKGRound(PeerManager& peerman); - void PhaseHandlerThread(PeerManager& peerman); + void HandleDKGRound(CConnman& connman, PeerManager& peerman); + void PhaseHandlerThread(CConnman& connman, PeerManager& peerman); }; } // namespace llmq diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index 7eedacaca5..d6fb0715c1 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -29,15 +29,14 @@ static const std::string DB_VVEC = "qdkg_V"; static const std::string DB_SKCONTRIB = "qdkg_S"; static const std::string DB_ENC_CONTRIB = "qdkg_E"; -CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, - CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, +CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe) : db(std::make_unique(unitTests ? "" : (gArgs.GetDataDirNet() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)), blsWorker(_blsWorker), m_chainstate(chainstate), - connman(_connman), m_dmnman(dmnman), dkgDebugManager(_dkgDebugManager), quorumBlockProcessor(_quorumBlockProcessor), @@ -53,8 +52,8 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai auto session_count = (params.useRotation) ? params.signingActiveQuorumCount : 1; for (const auto i : irange::range(session_count)) { dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i), - std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, - *this, mn_metaman, quorumBlockProcessor, mn_activeman, + std::forward_as_tuple(blsWorker, m_chainstate, dmnman, dkgDebugManager, *this, + mn_metaman, quorumBlockProcessor, mn_activeman, spork_manager, params, i)); } } @@ -62,10 +61,10 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai CDKGSessionManager::~CDKGSessionManager() = default; -void CDKGSessionManager::StartThreads(PeerManager& peerman) +void CDKGSessionManager::StartThreads(CConnman& connman, PeerManager& peerman) { for (auto& it : dkgSessionHandlers) { - it.second.StartThread(peerman); + it.second.StartThread(connman, peerman); } } diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 1c8ac70962..eaa5dc79de 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -42,7 +42,6 @@ private: CBLSWorker& blsWorker; CChainState& m_chainstate; - CConnman& connman; CDeterministicMNManager& m_dmnman; CDKGDebugManager& dkgDebugManager; CQuorumBlockProcessor& quorumBlockProcessor; @@ -71,13 +70,13 @@ private: mutable std::map contributionsCache GUARDED_BY(contributionsCacheCs); public: - CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, - CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, + CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman, + CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe); ~CDKGSessionManager(); - void StartThreads(PeerManager& peerman); + void StartThreads(CConnman& connman, PeerManager& peerman); void StopThreads(); void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);