mirror of
https://github.com/dashpay/dash.git
synced 2024-12-24 11:32:46 +01:00
Merge #6425: refactor: move CInstantSendManager::AskNodesForLockedTx into PeerManager
bb5d70c8d6
refactor: convert IsInvInFilter to accept a const Peer&, introduce const version of GetTxRelay (pasta)30fc76c397
fix: simplify logic in AskPeersForTransactions and remove erroneous negative EXCLUSIVE_LOCKS_REQUIRED (pasta)090ae9237e
refactor: move CInstantSendManager::AskNodesForLockedTx into PeerManager (pasta) Pull request description: ## Issue being fixed or feature implemented Instantsend manager currently relies on CConnMan, which is not needed. The function AskNodesForLockedTx is all networking logic anyhow, and these no reason why this logic would be contained to instantsend processing. Move it into net_processing instead. ## What was done? **This does change the logic!** We no longer prioritize asking MNs. This is probably fine? I don't specifically recall why we wanted to ask MNs besides potentially that they may be higher performing or better connected? We can potentially restore this logic once we bring masternode connection logic into Peer Does also change logic, by short-circuiting once peersToAsk is full. This commit has the added benefit of reducing contention on m_nodes_mutex due to no-longer calling connman.ForEachNode not once but twice This may slightly increase contention on m_peer_mutex; but that should be an ok tradeoff for not only removing dependencies, but also reducing contention on a much more contested RecursiveMutex ## How Has This Been Tested? Built, local tests ## Breaking Changes None ## Checklist: _Go over all the following points, and put an `x` in all the boxes that apply._ - [ ] I have performed a self-review of my own code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_ ACKs for top commit: kwvg: ACKbb5d70c8d6
UdjinM6: utACKbb5d70c8d6
Tree-SHA512: c06e4f80a1d19c109fb12b626028b3655c315f203ef569e1af976b8530f4b36172d42cf83c91080ecfddbe740e394f379adc3f98ae4f4e3811e1c8614ea4a7f4
This commit is contained in:
commit
89ca1ae104
@ -46,9 +46,9 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
|
||||
isman{[&]() -> llmq::CInstantSendManager* const {
|
||||
assert(llmq::quorumInstantSendManager == nullptr);
|
||||
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler,
|
||||
chainman.ActiveChainstate(),
|
||||
connman, *qman, *sigman, *shareman,
|
||||
sporkman, mempool, mn_sync, peerman,
|
||||
chainman.ActiveChainstate(), *qman,
|
||||
*sigman, *shareman, sporkman,
|
||||
mempool, mn_sync, peerman,
|
||||
is_masternode, unit_tests, wipe);
|
||||
return llmq::quorumInstantSendManager.get();
|
||||
}()},
|
||||
|
@ -1047,7 +1047,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
|
||||
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
|
||||
mempool.AddTransactionsUpdated(1);
|
||||
} else {
|
||||
AskNodesForLockedTx(islock->txid, connman, *m_peerman, m_is_masternode);
|
||||
m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1321,7 +1321,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
|
||||
for (const auto& p : toDelete) {
|
||||
RemoveConflictedTx(*p.second);
|
||||
}
|
||||
AskNodesForLockedTx(islock.txid, connman, *m_peerman, m_is_masternode);
|
||||
m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1426,46 +1426,6 @@ void CInstantSendManager::RemoveConflictingLock(const uint256& islockHash, const
|
||||
}
|
||||
}
|
||||
|
||||
void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman,
|
||||
bool is_masternode)
|
||||
{
|
||||
std::vector<CNode*> nodesToAskFor;
|
||||
nodesToAskFor.reserve(4);
|
||||
|
||||
auto maybe_add_to_nodesToAskFor = [&peerman, &nodesToAskFor, &txid](CNode* pnode) {
|
||||
if (nodesToAskFor.size() >= 4) {
|
||||
return;
|
||||
}
|
||||
if (peerman.IsInvInFilter(pnode->GetId(), txid)) {
|
||||
pnode->AddRef();
|
||||
nodesToAskFor.emplace_back(pnode);
|
||||
}
|
||||
};
|
||||
|
||||
connman.ForEachNode([&](CNode* pnode) {
|
||||
// Check masternodes first
|
||||
if (pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode);
|
||||
});
|
||||
connman.ForEachNode([&](CNode* pnode) {
|
||||
// Check non-masternodes next
|
||||
if (!pnode->m_masternode_connection) maybe_add_to_nodesToAskFor(pnode);
|
||||
});
|
||||
{
|
||||
LOCK(cs_main);
|
||||
for (const CNode* pnode : nodesToAskFor) {
|
||||
LogPrintf("CInstantSendManager::%s -- txid=%s: asking other peer %d for correct TX\n", __func__,
|
||||
txid.ToString(), pnode->GetId());
|
||||
|
||||
CInv inv(MSG_TX, txid);
|
||||
peerman.RequestObject(pnode->GetId(), inv, GetTime<std::chrono::microseconds>(), is_masternode,
|
||||
/* fForce = */ true);
|
||||
}
|
||||
}
|
||||
for (CNode* pnode : nodesToAskFor) {
|
||||
pnode->Release();
|
||||
}
|
||||
}
|
||||
|
||||
void CInstantSendManager::ProcessPendingRetryLockTxs()
|
||||
{
|
||||
const auto retryTxs = WITH_LOCK(cs_pendingRetry, return pendingRetryTxs);
|
||||
|
@ -200,7 +200,6 @@ private:
|
||||
|
||||
CChainLocksHandler& clhandler;
|
||||
CChainState& m_chainstate;
|
||||
CConnman& connman;
|
||||
CQuorumManager& qman;
|
||||
CSigningManager& sigman;
|
||||
CSigSharesManager& shareman;
|
||||
@ -255,13 +254,21 @@ private:
|
||||
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs GUARDED_BY(cs_pendingRetry);
|
||||
|
||||
public:
|
||||
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CConnman& _connman,
|
||||
CQuorumManager& _qman, CSigningManager& _sigman, CSigSharesManager& _shareman,
|
||||
CSporkManager& sporkman, CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
|
||||
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests, bool fWipe) :
|
||||
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
|
||||
CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
|
||||
CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
|
||||
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests,
|
||||
bool fWipe) :
|
||||
db(unitTests, fWipe),
|
||||
clhandler(_clhandler), m_chainstate(chainstate), connman(_connman), qman(_qman), sigman(_sigman),
|
||||
shareman(_shareman), spork_manager(sporkman), mempool(_mempool), m_mn_sync(mn_sync), m_peerman(peerman),
|
||||
clhandler(_clhandler),
|
||||
m_chainstate(chainstate),
|
||||
qman(_qman),
|
||||
sigman(_sigman),
|
||||
shareman(_shareman),
|
||||
spork_manager(sporkman),
|
||||
mempool(_mempool),
|
||||
m_mn_sync(mn_sync),
|
||||
m_peerman(peerman),
|
||||
m_is_masternode{is_masternode}
|
||||
{
|
||||
workInterrupt.reset();
|
||||
@ -315,7 +322,6 @@ private:
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
|
||||
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
|
||||
static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman, PeerManager& peerman, bool is_masternode);
|
||||
void ProcessPendingRetryLockTxs()
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
|
||||
|
||||
|
@ -329,6 +329,11 @@ struct Peer {
|
||||
LOCK(m_tx_relay_mutex);
|
||||
return m_can_tx_relay ? m_tx_relay.get() : nullptr;
|
||||
};
|
||||
const TxRelay* GetTxRelay() const LOCKS_EXCLUDED(m_tx_relay_mutex)
|
||||
{
|
||||
LOCK(m_tx_relay_mutex);
|
||||
return m_can_tx_relay ? m_tx_relay.get() : nullptr;
|
||||
};
|
||||
|
||||
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
|
||||
std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
|
||||
@ -409,7 +414,7 @@ struct Peer {
|
||||
{}
|
||||
|
||||
private:
|
||||
Mutex m_tx_relay_mutex;
|
||||
mutable Mutex m_tx_relay_mutex;
|
||||
|
||||
/** Transaction relay data.
|
||||
* (Bitcoin) Transaction relay data. May be a nullptr.
|
||||
@ -633,7 +638,7 @@ public:
|
||||
bool is_masternode, bool fForce = false) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
|
||||
size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
|
||||
bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
|
||||
void AskPeersForTransaction(const uint256& txid, bool is_masternode) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
private:
|
||||
/** Helpers to process result of external handlers of message */
|
||||
void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
@ -648,6 +653,11 @@ private:
|
||||
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
|
||||
void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
|
||||
/**
|
||||
* Private implementation of IsInvInFilter which does not call GetPeerRef; to be prefered when the PeerRef is available.
|
||||
*/
|
||||
bool IsInvInFilter(const Peer& peer, const uint256& hash) const;
|
||||
|
||||
/** Get a shared pointer to the Peer object.
|
||||
* May return an empty shared_ptr if the Peer object can't be found. */
|
||||
PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
@ -2256,12 +2266,47 @@ void PeerManagerImpl::SendPings()
|
||||
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::AskPeersForTransaction(const uint256& txid, bool is_masternode)
|
||||
{
|
||||
std::vector<PeerRef> peersToAsk;
|
||||
peersToAsk.reserve(4);
|
||||
|
||||
{
|
||||
LOCK(m_peer_mutex);
|
||||
// TODO consider prioritizing MNs again, once that flag is moved into Peer
|
||||
for (const auto& [_, peer] : m_peer_map) {
|
||||
if (peersToAsk.size() >= 4) {
|
||||
break;
|
||||
}
|
||||
if (IsInvInFilter(*peer, txid)) {
|
||||
peersToAsk.emplace_back(peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
CInv inv(MSG_TX, txid);
|
||||
LOCK(cs_main);
|
||||
for (PeerRef& peer : peersToAsk) {
|
||||
LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__,
|
||||
txid.ToString(), peer->m_id);
|
||||
|
||||
RequestObject(peer->m_id, inv, GetTime<std::chrono::microseconds>(), is_masternode,
|
||||
/*fForce=*/true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const
|
||||
{
|
||||
PeerRef peer = GetPeerRef(nodeid);
|
||||
if (peer == nullptr)
|
||||
return false;
|
||||
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
|
||||
return IsInvInFilter(*peer, hash);
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::IsInvInFilter(const Peer& peer, const uint256& hash) const
|
||||
{
|
||||
if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) {
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
return tx_relay->m_tx_inventory_known_filter.contains(hash);
|
||||
}
|
||||
|
@ -91,6 +91,9 @@ public:
|
||||
/** Is an inventory in the known inventory filter. Used by InstantSend. */
|
||||
virtual bool IsInvInFilter(NodeId nodeid, const uint256& hash) const = 0;
|
||||
|
||||
/** Ask a number of our peers, which have a transaction in their inventory, for the transaction. */
|
||||
virtual void AskPeersForTransaction(const uint256& txid, bool is_masternode) = 0;
|
||||
|
||||
/** Broadcast inventory message to a specific peer. */
|
||||
virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user