diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 31857000d5..2d58262f80 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -46,9 +46,9 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm isman{[&]() -> llmq::CInstantSendManager* const { assert(llmq::quorumInstantSendManager == nullptr); llmq::quorumInstantSendManager = std::make_unique(*llmq::chainLocksHandler, - chainman.ActiveChainstate(), - connman, *qman, *sigman, *shareman, - sporkman, mempool, mn_sync, peerman, + chainman.ActiveChainstate(), *qman, + *sigman, *shareman, sporkman, + mempool, mn_sync, peerman, is_masternode, unit_tests, wipe); return llmq::quorumInstantSendManager.get(); }()}, diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index e175009bb8..598986b55b 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -1047,7 +1047,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has // bump mempool counter to make sure newly locked txes are picked up by getblocktemplate mempool.AddTransactionsUpdated(1); } else { - AskNodesForLockedTx(islock->txid, connman, *m_peerman, m_is_masternode); + m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode); } } @@ -1321,7 +1321,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con for (const auto& p : toDelete) { RemoveConflictedTx(*p.second); } - AskNodesForLockedTx(islock.txid, connman, *m_peerman, m_is_masternode); + m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode); } } @@ -1426,46 +1426,6 @@ void CInstantSendManager::RemoveConflictingLock(const uint256& islockHash, const } } -void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, - bool is_masternode) -{ - std::vector nodesToAskFor; - nodesToAskFor.reserve(4); - - auto maybe_add_to_nodesToAskFor = [&peerman, &nodesToAskFor, &txid](CNode* pnode) { - if (nodesToAskFor.size() >= 4) { - return; - } - if (peerman.IsInvInFilter(pnode->GetId(), txid)) { - pnode->AddRef(); - nodesToAskFor.emplace_back(pnode); - } - }; - - connman.ForEachNode([&](CNode* pnode) { - // Check masternodes first - if (pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode); - }); - connman.ForEachNode([&](CNode* pnode) { - // Check non-masternodes next - if (!pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode); - }); - { - LOCK(cs_main); - for (const CNode* pnode : nodesToAskFor) { - LogPrintf("CInstantSendManager::%s -- txid=%s: asking other peer %d for correct TX\n", __func__, - txid.ToString(), pnode->GetId()); - - CInv inv(MSG_TX, txid); - peerman.RequestObject(pnode->GetId(), inv, GetTime(), is_masternode, - /* fForce = */ true); - } - } - for (CNode* pnode : nodesToAskFor) { - pnode->Release(); - } -} - void CInstantSendManager::ProcessPendingRetryLockTxs() { const auto retryTxs = WITH_LOCK(cs_pendingRetry, return pendingRetryTxs); diff --git a/src/llmq/instantsend.h b/src/llmq/instantsend.h index c0a27d7494..2b83878e9a 100644 --- a/src/llmq/instantsend.h +++ b/src/llmq/instantsend.h @@ -200,7 +200,6 @@ private: CChainLocksHandler& clhandler; CChainState& m_chainstate; - CConnman& connman; CQuorumManager& qman; CSigningManager& sigman; CSigSharesManager& shareman; @@ -255,13 +254,21 @@ private: std::unordered_set pendingRetryTxs GUARDED_BY(cs_pendingRetry); public: - explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CConnman& _connman, - CQuorumManager& _qman, CSigningManager& _sigman, CSigSharesManager& _shareman, - CSporkManager& sporkman, CTxMemPool& _mempool, const CMasternodeSync& mn_sync, - const std::unique_ptr& peerman, bool is_masternode, bool unitTests, bool fWipe) : + explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman, + CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman, + CTxMemPool& _mempool, const CMasternodeSync& mn_sync, + const std::unique_ptr& peerman, bool is_masternode, bool unitTests, + bool fWipe) : db(unitTests, fWipe), - clhandler(_clhandler), m_chainstate(chainstate), connman(_connman), qman(_qman), sigman(_sigman), - shareman(_shareman), spork_manager(sporkman), mempool(_mempool), m_mn_sync(mn_sync), m_peerman(peerman), + clhandler(_clhandler), + m_chainstate(chainstate), + qman(_qman), + sigman(_sigman), + shareman(_shareman), + spork_manager(sporkman), + mempool(_mempool), + m_mn_sync(mn_sync), + m_peerman(peerman), m_is_masternode{is_masternode} { workInterrupt.reset(); @@ -315,7 +322,6 @@ private: EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry); - static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, bool is_masternode); void ProcessPendingRetryLockTxs() EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d8e44c7b91..c20c508be8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -329,6 +329,11 @@ struct Peer { LOCK(m_tx_relay_mutex); return m_can_tx_relay ? m_tx_relay.get() : nullptr; }; + const TxRelay* GetTxRelay() const LOCKS_EXCLUDED(m_tx_relay_mutex) + { + LOCK(m_tx_relay_mutex); + return m_can_tx_relay ? m_tx_relay.get() : nullptr; + }; /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ std::vector m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex); @@ -409,7 +414,7 @@ struct Peer { {} private: - Mutex m_tx_relay_mutex; + mutable Mutex m_tx_relay_mutex; /** Transaction relay data. * (Bitcoin) Transaction relay data. May be a nullptr. @@ -633,7 +638,7 @@ public: bool is_masternode, bool fForce = false) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - + void AskPeersForTransaction(const uint256& txid, bool is_masternode) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); private: /** Helpers to process result of external handlers of message */ void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -648,6 +653,11 @@ private: /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + /** + * Private implementation of IsInvInFilter which does not call GetPeerRef; to be prefered when the PeerRef is available. + */ + bool IsInvInFilter(const Peer& peer, const uint256& hash) const; + /** Get a shared pointer to the Peer object. * May return an empty shared_ptr if the Peer object can't be found. */ PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -2256,12 +2266,47 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } +void PeerManagerImpl::AskPeersForTransaction(const uint256& txid, bool is_masternode) +{ + std::vector peersToAsk; + peersToAsk.reserve(4); + + { + LOCK(m_peer_mutex); + // TODO consider prioritizing MNs again, once that flag is moved into Peer + for (const auto& [_, peer] : m_peer_map) { + if (peersToAsk.size() >= 4) { + break; + } + if (IsInvInFilter(*peer, txid)) { + peersToAsk.emplace_back(peer); + } + } + } + { + CInv inv(MSG_TX, txid); + LOCK(cs_main); + for (PeerRef& peer : peersToAsk) { + LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__, + txid.ToString(), peer->m_id); + + RequestObject(peer->m_id, inv, GetTime(), is_masternode, + /*fForce=*/true); + } + } +} + bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const { PeerRef peer = GetPeerRef(nodeid); if (peer == nullptr) return false; - if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + return IsInvInFilter(*peer, hash); +} + +bool PeerManagerImpl::IsInvInFilter(const Peer& peer, const uint256& hash) const +{ + if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { LOCK(tx_relay->m_tx_inventory_mutex); return tx_relay->m_tx_inventory_known_filter.contains(hash); } diff --git a/src/net_processing.h b/src/net_processing.h index 2a302ee9dc..0bd94f7729 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -91,6 +91,9 @@ public: /** Is an inventory in the known inventory filter. Used by InstantSend. */ virtual bool IsInvInFilter(NodeId nodeid, const uint256& hash) const = 0; + /** Ask a number of our peers, which have a transaction in their inventory, for the transaction. */ + virtual void AskPeersForTransaction(const uint256& txid, bool is_masternode) = 0; + /** Broadcast inventory message to a specific peer. */ virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0;