refactor: move PeerManager out of CInstantSendManager ctor

This commit is contained in:
Kittywhiskers Van Gogh 2024-12-04 18:19:11 +00:00
parent 82d1aed1d6
commit d9e5cc7c9a
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
5 changed files with 47 additions and 47 deletions

View File

@ -107,7 +107,7 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
{ {
assert(m_cj_ctx && m_llmq_ctx); assert(m_cj_ctx && m_llmq_ctx);
m_llmq_ctx->isman->TransactionAddedToMempool(ptx); m_llmq_ctx->isman->TransactionAddedToMempool(m_peerman, ptx);
m_llmq_ctx->clhandler->TransactionAddedToMempool(ptx, nAcceptTime); m_llmq_ctx->clhandler->TransactionAddedToMempool(ptx, nAcceptTime);
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
} }

View File

@ -47,8 +47,8 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler,
chainman.ActiveChainstate(), *qman, chainman.ActiveChainstate(), *qman,
*sigman, *shareman, sporkman, *sigman, *shareman, sporkman,
mempool, mn_sync, peerman, mempool, mn_sync, is_masternode,
is_masternode, unit_tests, wipe); unit_tests, wipe);
return llmq::quorumInstantSendManager.get(); return llmq::quorumInstantSendManager.get();
}()}, }()},
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainman, mnhfman, *sigman, *shareman, *qman)} ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainman, mnhfman, *sigman, *shareman, *qman)}
@ -87,7 +87,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
sigman->StartWorkerThread(peerman); sigman->StartWorkerThread(peerman);
llmq::chainLocksHandler->Start(); llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start(); llmq::quorumInstantSendManager->Start(peerman);
} }
void LLMQContext::Stop() { void LLMQContext::Stop() {

View File

@ -430,14 +430,14 @@ std::vector<uint256> CInstantSendDb::RemoveChainedInstantSendLocks(const uint256
//////////////// ////////////////
void CInstantSendManager::Start() void CInstantSendManager::Start(PeerManager& peerman)
{ {
// can't start new thread if we have one running already // can't start new thread if we have one running already
if (workThread.joinable()) { if (workThread.joinable()) {
assert(false); assert(false);
} }
workThread = std::thread(&util::TraceThread, "isman", [this] { WorkThreadMain(); }); workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); });
sigman.RegisterRecoveredSigsListener(this); sigman.RegisterRecoveredSigsListener(this);
} }
@ -732,21 +732,23 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock)); pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock));
} }
PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv) PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type,
CDataStream& vRecv)
{ {
if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) { if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) {
const auto islock = std::make_shared<CInstantSendLock>(); const auto islock = std::make_shared<CInstantSendLock>();
vRecv >> *islock; vRecv >> *islock;
return ProcessMessageInstantSendLock(pfrom, islock); return ProcessMessageInstantSendLock(pfrom, peerman, islock);
} }
return {}; return {};
} }
PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock) PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman,
const llmq::CInstantSendLockPtr& islock)
{ {
auto hash = ::SerializeHash(*islock); auto hash = ::SerializeHash(*islock);
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash))); WITH_LOCK(::cs_main, peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash)));
if (!islock->TriviallyValid()) { if (!islock->TriviallyValid()) {
return tl::unexpected{100}; return tl::unexpected{100};
@ -800,7 +802,7 @@ bool CInstantSendLock::TriviallyValid() const
return true; return true;
} }
bool CInstantSendManager::ProcessPendingInstantSendLocks() bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
{ {
decltype(pendingInstantSendLocks) pend; decltype(pendingInstantSendLocks) pend;
bool fMoreWork{false}; bool fMoreWork{false};
@ -845,7 +847,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
auto dkgInterval = llmq_params.dkgInterval; auto dkgInterval = llmq_params.dkgInterval;
// First check against the current active set and don't ban // First check against the current active set and don't ban
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, 0, pend, false); auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
if (!badISLocks.empty()) { if (!badISLocks.empty()) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__); LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);
@ -858,13 +860,15 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
} }
} }
// Now check against the previous active set and perform banning if this fails // Now check against the previous active set and perform banning if this fails
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, pend, true); ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
} }
return fMoreWork; return fMoreWork;
} }
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban) std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
{ {
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8); CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs; std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
@ -936,7 +940,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
for (const auto& nodeId : batchVerifier.badSources) { for (const auto& nodeId : batchVerifier.badSources) {
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which // Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
// does not validate anymore due to changed quorums // does not validate anymore due to changed quorums
Assert(m_peerman)->Misbehaving(nodeId, 20); peerman.Misbehaving(nodeId, 20);
} }
} }
for (const auto& p : pend) { for (const auto& p : pend) {
@ -951,7 +955,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
continue; continue;
} }
ProcessInstantSendLock(nodeId, hash, islock); ProcessInstantSendLock(nodeId, peerman, hash, islock);
// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid // See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
// double-verification of the sig. // double-verification of the sig.
@ -969,7 +973,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
return badISLocks; return badISLocks;
} }
void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock) void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
const CInstantSendLockPtr& islock)
{ {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__, LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), from); islock->txid.ToString(), hash.ToString(), from);
@ -1030,28 +1035,28 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
CInv inv(MSG_ISDLOCK, hash); CInv inv(MSG_ISDLOCK, hash);
if (tx != nullptr) { if (tx != nullptr) {
Assert(m_peerman)->RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION); peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
} else { } else {
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce // we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
// with the TX taken into account. // with the TX taken into account.
Assert(m_peerman)->RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION); peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
} }
ResolveBlockConflicts(hash, *islock); ResolveBlockConflicts(hash, *islock);
if (tx != nullptr) { if (tx != nullptr) {
RemoveMempoolConflictsForLock(hash, *islock); RemoveMempoolConflictsForLock(peerman, hash, *islock);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__, LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__,
hash.ToString(), tx->GetHash().ToString()); hash.ToString(), tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock); GetMainSignals().NotifyTransactionLock(tx, islock);
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate // bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1); mempool.AddTransactionsUpdated(1);
} else { } else {
m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode); peerman.AskPeersForTransaction(islock->txid, m_is_masternode);
} }
} }
void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx) void CInstantSendManager::TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
{ {
if (!IsInstantSendEnabled() || !m_mn_sync.IsBlockchainSynced() || tx->vin.empty()) { if (!IsInstantSendEnabled() || !m_mn_sync.IsBlockchainSynced() || tx->vin.empty()) {
return; return;
@ -1080,7 +1085,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
// TX is not locked, so make sure it is tracked // TX is not locked, so make sure it is tracked
AddNonLockedTx(tx, nullptr); AddNonLockedTx(tx, nullptr);
} else { } else {
RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock); RemoveMempoolConflictsForLock(peerman, ::SerializeHash(*islock), *islock);
} }
} }
@ -1292,7 +1297,8 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
} }
} }
void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) void CInstantSendManager::RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash,
const CInstantSendLock& islock)
{ {
std::unordered_map<uint256, CTransactionRef, StaticSaltedHasher> toDelete; std::unordered_map<uint256, CTransactionRef, StaticSaltedHasher> toDelete;
@ -1321,7 +1327,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
for (const auto& p : toDelete) { for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second); RemoveConflictedTx(*p.second);
} }
m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode); peerman.AskPeersForTransaction(islock.txid, m_is_masternode);
} }
} }
@ -1583,10 +1589,10 @@ size_t CInstantSendManager::GetInstantSendLockCount() const
return db.GetInstantSendLockCount(); return db.GetInstantSendLockCount();
} }
void CInstantSendManager::WorkThreadMain() void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
{ {
while (!workInterrupt) { while (!workInterrupt) {
bool fMoreWork = ProcessPendingInstantSendLocks(); bool fMoreWork = ProcessPendingInstantSendLocks(peerman);
ProcessPendingRetryLockTxs(); ProcessPendingRetryLockTxs();
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) { if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {

View File

@ -206,7 +206,6 @@ private:
CSporkManager& spork_manager; CSporkManager& spork_manager;
CTxMemPool& mempool; CTxMemPool& mempool;
const CMasternodeSync& m_mn_sync; const CMasternodeSync& m_mn_sync;
const std::unique_ptr<PeerManager>& m_peerman;
const bool m_is_masternode; const bool m_is_masternode;
@ -256,9 +255,8 @@ private:
public: public:
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman, explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman, CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
CTxMemPool& _mempool, const CMasternodeSync& mn_sync, CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool is_masternode,
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests, bool unitTests, bool fWipe) :
bool fWipe) :
db(unitTests, fWipe), db(unitTests, fWipe),
clhandler(_clhandler), clhandler(_clhandler),
m_chainstate(chainstate), m_chainstate(chainstate),
@ -268,14 +266,13 @@ public:
spork_manager(sporkman), spork_manager(sporkman),
mempool(_mempool), mempool(_mempool),
m_mn_sync(mn_sync), m_mn_sync(mn_sync),
m_peerman(peerman),
m_is_masternode{is_masternode} m_is_masternode{is_masternode}
{ {
workInterrupt.reset(); workInterrupt.reset();
} }
~CInstantSendManager() = default; ~CInstantSendManager() = default;
void Start(); void Start(PeerManager& peerman);
void Stop(); void Stop();
void InterruptWorkerThread() { workInterrupt(); }; void InterruptWorkerThread() { workInterrupt(); };
@ -295,18 +292,15 @@ private:
const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests); const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);
void TrySignInstantSendLock(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating); void TrySignInstantSendLock(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating);
PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock); PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman, const CInstantSendLockPtr& islock);
bool ProcessPendingInstantSendLocks() bool ProcessPendingInstantSendLocks(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
int signOffset, const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
std::pair<NodeId, CInstantSendLockPtr>,
StaticSaltedHasher>& pend,
bool ban)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock) void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const CInstantSendLockPtr& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
@ -318,14 +312,14 @@ private:
void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock) void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests); EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) void RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock) void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessPendingRetryLockTxs() void ProcessPendingRetryLockTxs()
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void WorkThreadMain() void WorkThreadMain(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void HandleFullyConfirmedBlock(const CBlockIndex* pindex) void HandleFullyConfirmedBlock(const CBlockIndex* pindex)
@ -339,9 +333,9 @@ public:
[[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override [[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_pendingLocks); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_pendingLocks);
PeerMsgRet ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv); PeerMsgRet ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv);
void TransactionAddedToMempool(const CTransactionRef& tx) void TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void TransactionRemovedFromMempool(const CTransactionRef& tx); void TransactionRemovedFromMempool(const CTransactionRef& tx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)

View File

@ -5278,7 +5278,7 @@ void PeerManagerImpl::ProcessMessage(
return; // CLSIG return; // CLSIG
} }
ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
return; return;
} }