merge bitcoin#22778: Reduce resource usage for inbound block-relay-only connections

continuation of 60b5392d in dash#6276

excludes:
- 42e3250497b03478d61cd6bfe6cd904de73d57b1 (Dash does not have FEEFILTER support)
This commit is contained in:
Kittywhiskers Van Gogh 2024-10-26 08:29:08 +00:00
parent 6e6de54e5e
commit cc694c2e5b
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD

View File

@ -304,10 +304,25 @@ struct Peer {
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
}; };
// in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer /**
// in dash: m_tx_relay should never be nullptr, we don't relay transactions if * (Bitcoin) Initializes a TxRelay struct for this peer. Can be called at most once for a peer.
// `IsBlockOnlyConn() == true` is instead * (Dash) Enables the flag that allows GetTxRelay() to return m_tx_relay */
std::unique_ptr<TxRelay> m_tx_relay{std::make_unique<TxRelay>()}; TxRelay* SetTxRelay()
{
Assume(!m_can_tx_relay);
m_can_tx_relay = true;
return WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get());
};
TxRelay* GetInvRelay()
{
return WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get());
}
TxRelay* GetTxRelay()
{
return m_can_tx_relay ? WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get()) : nullptr;
};
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex); std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
@ -337,7 +352,7 @@ struct Peer {
* initialized.*/ * initialized.*/
std::atomic_bool m_addr_relay_enabled{false}; std::atomic_bool m_addr_relay_enabled{false};
/** Whether a peer can relay transactions */ /** Whether a peer can relay transactions */
const bool m_can_tx_relay{false}; bool m_can_tx_relay{false};
/** Whether a getaddr request to this peer is outstanding. */ /** Whether a getaddr request to this peer is outstanding. */
bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Guards address sending timers. */ /** Guards address sending timers. */
@ -375,12 +390,22 @@ struct Peer {
/** Time of the last getheaders message to this peer */ /** Time of the last getheaders message to this peer */
std::atomic<std::chrono::seconds> m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0s}; std::atomic<std::chrono::seconds> m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0s};
explicit Peer(NodeId id, ServiceFlags our_services, bool tx_relay) explicit Peer(NodeId id, ServiceFlags our_services)
: m_id(id) : m_id(id)
, m_our_services{our_services} , m_our_services{our_services}
, m_tx_relay(std::make_unique<TxRelay>())
, m_can_tx_relay{tx_relay}
{} {}
private:
Mutex m_tx_relay_mutex;
/** Transaction relay data.
* (Bitcoin) Will be a nullptr if we're not relaying transactions with this peer
* (e.g. if it's a block-relay-only peer). Users should access this with
* the GetTxRelay() getter.
* (Dash) Always initialized but selectively available through GetTxRelay()
* (non-transaction relay should use GetInvRelay(), which will provide
* unconditional access) */
std::unique_ptr<TxRelay> m_tx_relay GUARDED_BY(m_tx_relay_mutex){std::make_unique<TxRelay>()};
}; };
using PeerRef = std::shared_ptr<Peer>; using PeerRef = std::shared_ptr<Peer>;
@ -1033,11 +1058,11 @@ void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomCo
static void AddKnownInv(Peer& peer, const uint256& hash) static void AddKnownInv(Peer& peer, const uint256& hash)
{ {
// Dash always initializes m_tx_relay auto inv_relay = peer.GetInvRelay();
assert(peer.m_tx_relay != nullptr); assert(inv_relay);
LOCK(peer.m_tx_relay->m_tx_inventory_mutex); LOCK(inv_relay->m_tx_inventory_mutex);
peer.m_tx_relay->m_tx_inventory_known_filter.insert(hash); inv_relay->m_tx_inventory_known_filter.insert(hash);
} }
/** Whether this peer can serve us blocks. */ /** Whether this peer can serve us blocks. */
@ -1071,8 +1096,8 @@ static uint16_t GetHeadersLimit(const CNode& pfrom, bool compressed)
static void PushInv(Peer& peer, const CInv& inv) static void PushInv(Peer& peer, const CInv& inv)
{ {
// Dash always initializes m_tx_relay auto inv_relay = peer.GetInvRelay();
assert(peer.m_tx_relay != nullptr); assert(inv_relay);
ASSERT_IF_DEBUG(inv.type != MSG_BLOCK); ASSERT_IF_DEBUG(inv.type != MSG_BLOCK);
if (inv.type == MSG_BLOCK) { if (inv.type == MSG_BLOCK) {
@ -1080,17 +1105,17 @@ static void PushInv(Peer& peer, const CInv& inv)
return; return;
} }
LOCK(peer.m_tx_relay->m_tx_inventory_mutex); LOCK(inv_relay->m_tx_inventory_mutex);
if (peer.m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { if (inv_relay->m_tx_inventory_known_filter.contains(inv.hash)) {
LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id);
return; return;
} }
LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id);
if (inv.type == MSG_TX || inv.type == MSG_DSTX) { if (inv.type == MSG_TX || inv.type == MSG_DSTX) {
peer.m_tx_relay->m_tx_inventory_to_send.insert(inv.hash); inv_relay->m_tx_inventory_to_send.insert(inv.hash);
return; return;
} }
peer.m_tx_relay->vInventoryOtherToSend.push_back(inv); inv_relay->vInventoryOtherToSend.push_back(inv);
} }
std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now, std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now,
@ -1395,7 +1420,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer)
nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION); nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION);
} }
const bool tx_relay = !m_ignore_incoming_txs && peer.m_can_tx_relay && !pnode.IsFeelerConn(); const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn();
m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime, m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime,
your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime)
my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime)
@ -1552,7 +1577,7 @@ void PeerManagerImpl::InitializeNode(CNode& node, ServiceFlags our_services) {
LOCK(cs_main); LOCK(cs_main);
m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn())); m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn()));
} }
PeerRef peer = std::make_shared<Peer>(nodeid, our_services, /*tx_relay=*/!node.IsBlockOnlyConn()); PeerRef peer = std::make_shared<Peer>(nodeid, our_services);
{ {
LOCK(m_peer_mutex); LOCK(m_peer_mutex);
m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer); m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer);
@ -1685,8 +1710,8 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load(); ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load();
} }
if (peer->m_can_tx_relay) { if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
stats.m_relay_txs = WITH_LOCK(peer->m_tx_relay->m_bloom_filter_mutex, return peer->m_tx_relay->m_relay_txs); stats.m_relay_txs = WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs);
} else { } else {
stats.m_relay_txs = false; stats.m_relay_txs = false;
} }
@ -2211,10 +2236,11 @@ bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const
PeerRef peer = GetPeerRef(nodeid); PeerRef peer = GetPeerRef(nodeid);
if (peer == nullptr) if (peer == nullptr)
return false; return false;
if (!peer->m_can_tx_relay) if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
LOCK(tx_relay->m_tx_inventory_mutex);
return tx_relay->m_tx_inventory_known_filter.contains(hash);
}
return false; return false;
LOCK(peer->m_tx_relay->m_tx_inventory_mutex);
return peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash);
} }
void PeerManagerImpl::PushInventory(NodeId nodeid, const CInv& inv) void PeerManagerImpl::PushInventory(NodeId nodeid, const CInv& inv)
@ -2245,16 +2271,21 @@ void PeerManagerImpl::RelayInvFiltered(CInv &inv, const CTransaction& relatedTx,
m_connman.ForEachNode([&](CNode* pnode) { m_connman.ForEachNode([&](CNode* pnode) {
PeerRef peer = GetPeerRef(pnode->GetId()); PeerRef peer = GetPeerRef(pnode->GetId());
if (peer == nullptr) return; if (peer == nullptr) return;
if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || !peer->m_tx_relay) return;
auto tx_relay = peer->GetTxRelay();
if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || tx_relay == nullptr) {
return;
}
{ {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
if (!peer->m_tx_relay->m_relay_txs) { if (!tx_relay->m_relay_txs) {
return; return;
} }
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(relatedTx)) { if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(relatedTx)) {
return; return;
} }
} } // LOCK(tx_relay->m_bloom_filter_mutex)
PushInv(*peer, inv); PushInv(*peer, inv);
}); });
} }
@ -2265,16 +2296,21 @@ void PeerManagerImpl::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash,
m_connman.ForEachNode([&](CNode* pnode) { m_connman.ForEachNode([&](CNode* pnode) {
PeerRef peer = GetPeerRef(pnode->GetId()); PeerRef peer = GetPeerRef(pnode->GetId());
if (peer == nullptr) return; if (peer == nullptr) return;
if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || !peer->m_tx_relay) return;
auto tx_relay = peer->GetTxRelay();
if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || tx_relay == nullptr) {
return;
}
{ {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
if (!peer->m_tx_relay->m_relay_txs) { if (!tx_relay->m_relay_txs) {
return; return;
} }
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->contains(relatedTxHash)) { if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->contains(relatedTxHash)) {
return; return;
} }
} } // LOCK(tx_relay->m_bloom_filter_mutex)
PushInv(*peer, inv); PushInv(*peer, inv);
}); });
} }
@ -2284,7 +2320,8 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid)
LOCK(m_peer_mutex); LOCK(m_peer_mutex);
for(auto& it : m_peer_map) { for(auto& it : m_peer_map) {
Peer& peer = *it.second; Peer& peer = *it.second;
if (!peer.m_tx_relay) continue; auto tx_relay = peer.GetTxRelay();
if (!tx_relay) continue;
const CInv inv{m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid}; const CInv inv{m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid};
PushInv(peer, inv); PushInv(peer, inv);
@ -2418,11 +2455,11 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv&
} else if (inv.IsMsgFilteredBlk()) { } else if (inv.IsMsgFilteredBlk()) {
bool sendMerkleBlock = false; bool sendMerkleBlock = false;
CMerkleBlock merkleBlock; CMerkleBlock merkleBlock;
if (peer.m_can_tx_relay) { if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) {
LOCK(peer.m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
if (peer.m_tx_relay->m_bloom_filter) { if (tx_relay->m_bloom_filter) {
sendMerkleBlock = true; sendMerkleBlock = true;
merkleBlock = CMerkleBlock(*pblock, *peer.m_tx_relay->m_bloom_filter); merkleBlock = CMerkleBlock(*pblock, *tx_relay->m_bloom_filter);
} }
} }
if (sendMerkleBlock) { if (sendMerkleBlock) {
@ -2514,13 +2551,15 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
{ {
AssertLockNotHeld(cs_main); AssertLockNotHeld(cs_main);
auto tx_relay = peer.GetTxRelay();
std::deque<CInv>::iterator it = peer.m_getdata_requests.begin(); std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
std::vector<CInv> vNotFound; std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
const std::chrono::seconds now = GetTime<std::chrono::seconds>(); const std::chrono::seconds now = GetTime<std::chrono::seconds>();
// Get last mempool request time // Get last mempool request time
const std::chrono::seconds mempool_req = peer.m_can_tx_relay ? peer.m_tx_relay->m_last_mempool_req.load() const std::chrono::seconds mempool_req = tx_relay != nullptr ? tx_relay->m_last_mempool_req.load()
: std::chrono::seconds::min(); : std::chrono::seconds::min();
// Process as many TX items from the front of the getdata queue as // Process as many TX items from the front of the getdata queue as
@ -2541,7 +2580,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
} }
++it; ++it;
if (!peer.m_can_tx_relay && NetMessageViolatesBlocksOnly(inv.GetCommand())) { if (tx_relay == nullptr && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
// Note that if we receive a getdata for non-block messages // Note that if we receive a getdata for non-block messages
// from a block-relay-only outbound peer that violate the policy, // from a block-relay-only outbound peer that violate the policy,
// we skip such getdata messages from this peer // we skip such getdata messages from this peer
@ -2582,7 +2621,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
for (const uint256& parent_txid : parent_ids_to_add) { for (const uint256& parent_txid : parent_ids_to_add) {
// Relaying a transaction with a recent but unconfirmed parent. // Relaying a transaction with a recent but unconfirmed parent.
if (WITH_LOCK(peer.m_tx_relay->m_tx_inventory_mutex, return !peer.m_tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) { if (WITH_LOCK(tx_relay->m_tx_inventory_mutex, return !tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) {
LOCK(cs_main); LOCK(cs_main);
State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid); State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid);
} }
@ -3480,10 +3519,16 @@ void PeerManagerImpl::ProcessMessage(
} }
peer->m_starting_height = starting_height; peer->m_starting_height = starting_height;
if (peer->m_can_tx_relay) { // We only initialize the m_tx_relay data structure if:
// - this isn't an outbound block-relay-only connection; and
// - fRelay=true or we're offering NODE_BLOOM to this peer
// (NODE_BLOOM means that the peer may turn on tx relay later)
if (!pfrom.IsBlockOnlyConn() &&
(fRelay || (peer->m_our_services & NODE_BLOOM))) {
auto* const tx_relay = peer->SetTxRelay();
{ {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
peer->m_tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message
} }
if (fRelay) pfrom.m_relays_txs = true; if (fRelay) pfrom.m_relays_txs = true;
} }
@ -4744,9 +4789,9 @@ void PeerManagerImpl::ProcessMessage(
return; return;
} }
if (peer->m_can_tx_relay) { if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
LOCK(peer->m_tx_relay->m_tx_inventory_mutex); LOCK(tx_relay->m_tx_inventory_mutex);
peer->m_tx_relay->m_send_mempool = true; tx_relay->m_send_mempool = true;
} }
return; return;
} }
@ -4837,13 +4882,11 @@ void PeerManagerImpl::ProcessMessage(
{ {
// There is no excuse for sending a too-large filter // There is no excuse for sending a too-large filter
Misbehaving(pfrom.GetId(), 100, "too-large bloom filter"); Misbehaving(pfrom.GetId(), 100, "too-large bloom filter");
} } else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
else if (peer->m_can_tx_relay)
{ {
{ LOCK(tx_relay->m_bloom_filter_mutex);
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); tx_relay->m_bloom_filter.reset(new CBloomFilter(filter));
peer->m_tx_relay->m_bloom_filter.reset(new CBloomFilter(filter)); tx_relay->m_relay_txs = true;
peer->m_tx_relay->m_relay_txs = true;
} }
pfrom.m_bloom_filter_loaded = true; pfrom.m_bloom_filter_loaded = true;
pfrom.m_relays_txs = true; pfrom.m_relays_txs = true;
@ -4865,10 +4908,10 @@ void PeerManagerImpl::ProcessMessage(
bool bad = false; bool bad = false;
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
bad = true; bad = true;
} else if (peer->m_can_tx_relay) { } else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
if (peer->m_tx_relay->m_bloom_filter) { if (tx_relay->m_bloom_filter) {
peer->m_tx_relay->m_bloom_filter->insert(vData); tx_relay->m_bloom_filter->insert(vData);
} else { } else {
bad = true; bad = true;
} }
@ -4885,14 +4928,13 @@ void PeerManagerImpl::ProcessMessage(
pfrom.fDisconnect = true; pfrom.fDisconnect = true;
return; return;
} }
if (!peer->m_can_tx_relay) { auto tx_relay = peer->GetTxRelay();
return; if (!tx_relay) return;
}
{ {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
peer->m_tx_relay->m_bloom_filter = nullptr; tx_relay->m_bloom_filter = nullptr;
peer->m_tx_relay->m_relay_txs = true; tx_relay->m_relay_txs = true;
} }
pfrom.m_bloom_filter_loaded = false; pfrom.m_bloom_filter_loaded = false;
pfrom.m_relays_txs = true; pfrom.m_relays_txs = true;
@ -5753,9 +5795,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
LOCK(peer->m_block_inv_mutex); LOCK(peer->m_block_inv_mutex);
size_t reserve = INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000; size_t reserve = INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000;
if (peer->m_can_tx_relay) { if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
LOCK(peer->m_tx_relay->m_tx_inventory_mutex); LOCK(tx_relay->m_tx_inventory_mutex);
reserve = std::min<size_t>(peer->m_tx_relay->m_tx_inventory_to_send.size(), reserve); reserve = std::min<size_t>(tx_relay->m_tx_inventory_to_send.size(), reserve);
} }
reserve = std::max<size_t>(reserve, peer->m_blocks_for_inv_relay.size()); reserve = std::max<size_t>(reserve, peer->m_blocks_for_inv_relay.size());
reserve = std::min<size_t>(reserve, MAX_INV_SZ); reserve = std::min<size_t>(reserve, MAX_INV_SZ);
@ -5772,9 +5814,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
peer->m_blocks_for_inv_relay.clear(); peer->m_blocks_for_inv_relay.clear();
} }
auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) EXCLUSIVE_LOCKS_REQUIRED(peer->m_tx_relay->m_tx_inventory_mutex) { auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) {
AssertLockHeld(peer->m_tx_relay->m_tx_inventory_mutex);
peer->m_tx_relay->m_tx_inventory_known_filter.insert(invIn.hash);
LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId()); LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId());
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter. // Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
vInv.push_back(invIn); vInv.push_back(invIn);
@ -5785,19 +5825,19 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
} }
}; };
if (peer->m_can_tx_relay) { if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
LOCK(peer->m_tx_relay->m_tx_inventory_mutex); LOCK(tx_relay->m_tx_inventory_mutex);
// Check whether periodic sends should happen // Check whether periodic sends should happen
// Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes // Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes
// because we never produce any txes ourselves i.e. no privacy is lost in this case. // because we never produce any txes ourselves i.e. no privacy is lost in this case.
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode; bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode;
if (peer->m_tx_relay->m_next_inv_send_time < current_time) { if (tx_relay->m_next_inv_send_time < current_time) {
fSendTrickle = true; fSendTrickle = true;
if (pto->IsInboundConn()) { if (pto->IsInboundConn()) {
peer->m_tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
} else { } else {
// Use half the delay for Masternode outbound peers, as there is less privacy concern for them. // Use half the delay for Masternode outbound peers, as there is less privacy concern for them.
peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ? tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ?
GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) :
GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2);
} }
@ -5805,49 +5845,53 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Time to send but the peer has requested we not relay transactions. // Time to send but the peer has requested we not relay transactions.
if (fSendTrickle) { if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear(); if (!tx_relay->m_relay_txs) tx_relay->m_tx_inventory_to_send.clear();
} }
// Respond to BIP35 mempool requests // Respond to BIP35 mempool requests
if (fSendTrickle && peer->m_tx_relay->m_send_mempool) { if (fSendTrickle && tx_relay->m_send_mempool) {
auto vtxinfo = m_mempool.infoAll(); auto vtxinfo = m_mempool.infoAll();
peer->m_tx_relay->m_send_mempool = false; tx_relay->m_send_mempool = false;
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
// Send invs for txes and corresponding IS-locks // Send invs for txes and corresponding IS-locks
for (const auto& txinfo : vtxinfo) { for (const auto& txinfo : vtxinfo) {
const uint256& hash = txinfo.tx->GetHash(); const uint256& hash = txinfo.tx->GetHash();
peer->m_tx_relay->m_tx_inventory_to_send.erase(hash); tx_relay->m_tx_inventory_to_send.erase(hash);
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX; int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
tx_relay->m_tx_inventory_known_filter.insert(hash);
queueAndMaybePushInv(CInv(nInvType, hash)); queueAndMaybePushInv(CInv(nInvType, hash));
const auto islock = m_llmq_ctx->isman->GetInstantSendLockByTxid(hash); const auto islock = m_llmq_ctx->isman->GetInstantSendLockByTxid(hash);
if (islock == nullptr) continue; if (islock == nullptr) continue;
if (pto->nVersion < ISDLOCK_PROTO_VERSION) continue; if (pto->nVersion < ISDLOCK_PROTO_VERSION) continue;
queueAndMaybePushInv(CInv(MSG_ISDLOCK, ::SerializeHash(*islock))); uint256 isLockHash{::SerializeHash(*islock)};
tx_relay->m_tx_inventory_known_filter.insert(isLockHash);
queueAndMaybePushInv(CInv(MSG_ISDLOCK, isLockHash));
} }
// Send an inv for the best ChainLock we have // Send an inv for the best ChainLock we have
const auto& clsig = m_llmq_ctx->clhandler->GetBestChainLock(); const auto& clsig = m_llmq_ctx->clhandler->GetBestChainLock();
if (!clsig.IsNull()) { if (!clsig.IsNull()) {
uint256 chainlockHash = ::SerializeHash(clsig); uint256 chainlockHash{::SerializeHash(clsig)};
tx_relay->m_tx_inventory_known_filter.insert(chainlockHash);
queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash)); queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash));
} }
peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time); tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
} }
// Determine transactions to relay // Determine transactions to relay
if (fSendTrickle) { if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex); LOCK(tx_relay->m_bloom_filter_mutex);
// Produce a vector with all candidates for sending // Produce a vector with all candidates for sending
std::vector<std::set<uint256>::iterator> vInvTx; std::vector<std::set<uint256>::iterator> vInvTx;
vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size()); vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());
for (std::set<uint256>::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) { for (std::set<uint256>::iterator it = tx_relay->m_tx_inventory_to_send.begin(); it != tx_relay->m_tx_inventory_to_send.end(); it++) {
vInvTx.push_back(it); vInvTx.push_back(it);
} }
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons. // Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
@ -5857,7 +5901,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// No reason to drain out at many times the network's capacity, // No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays. // especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0; unsigned int nRelayedTransactions = 0;
size_t broadcast_max{INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000 + (peer->m_tx_relay->m_tx_inventory_to_send.size()/1000)*5}; size_t broadcast_max{INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000 + (tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(1000, broadcast_max); broadcast_max = std::min<size_t>(1000, broadcast_max);
while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) { while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
@ -5867,9 +5911,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
vInvTx.pop_back(); vInvTx.pop_back();
uint256 hash = *it; uint256 hash = *it;
// Remove it from the to-be-sent set // Remove it from the to-be-sent set
peer->m_tx_relay->m_tx_inventory_to_send.erase(it); tx_relay->m_tx_inventory_to_send.erase(it);
// Check if not in the filter already // Check if not in the filter already
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) { if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
continue; continue;
} }
// Not in the mempool anymore? don't bother sending it. // Not in the mempool anymore? don't bother sending it.
@ -5877,7 +5921,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
if (!txinfo.tx) { if (!txinfo.tx) {
continue; continue;
} }
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send // Send
State(pto->GetId())->m_recently_announced_invs.insert(hash); State(pto->GetId())->m_recently_announced_invs.insert(hash);
nRelayedTransactions++; nRelayedTransactions++;
@ -5895,29 +5939,33 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
} }
} }
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX; int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
tx_relay->m_tx_inventory_known_filter.insert(hash);
queueAndMaybePushInv(CInv(nInvType, hash)); queueAndMaybePushInv(CInv(nInvType, hash));
} }
} }
} }
{ {
auto inv_relay = peer->GetInvRelay();
// Send non-tx/non-block inventory items // Send non-tx/non-block inventory items
LOCK2(peer->m_tx_relay->m_tx_inventory_mutex, peer->m_tx_relay->m_bloom_filter_mutex); LOCK2(inv_relay->m_tx_inventory_mutex, inv_relay->m_bloom_filter_mutex);
bool fSendIS = peer->m_tx_relay->m_relay_txs && !pto->IsBlockRelayOnly(); bool fSendIS = inv_relay->m_relay_txs && !pto->IsBlockRelayOnly();
for (const auto& inv : peer->m_tx_relay->vInventoryOtherToSend) { for (const auto& inv : inv_relay->vInventoryOtherToSend) {
if (!peer->m_tx_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) { if (!inv_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
continue; continue;
} }
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { if (inv_relay->m_tx_inventory_known_filter.contains(inv.hash)) {
continue; continue;
} }
if (!fSendIS && inv.type == MSG_ISDLOCK) { if (!fSendIS && inv.type == MSG_ISDLOCK) {
continue; continue;
} }
inv_relay->m_tx_inventory_known_filter.insert(inv.hash);
queueAndMaybePushInv(inv); queueAndMaybePushInv(inv);
} }
peer->m_tx_relay->vInventoryOtherToSend.clear(); inv_relay->vInventoryOtherToSend.clear();
} }
if (!vInv.empty()) if (!vInv.empty())
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));