Merge bitcoin/bitcoin#21775: p2p: Limit m_block_inv_mutex

fac96d026511f22f0202ce3631a38be0e990555f p2p: Limit m_block_inv_mutex (MarcoFalke)

Pull request description:

  Keeping the lock longer than needed is confusing to reviewers and thread analysis. For example, keeping the lock while appending tx-invs, which requires the mempool lock, will tell thread analysis tools an incorrect lock order of `(1) m_block_inv_mutex, (2) pool.cs`.

ACKs for top commit:
  Crypt-iQ:
    crACK fac96d026511f22f0202ce3631a38be0e990555f
  jnewbery:
    utACK fac96d026511f22f0202ce3631a38be0e990555f
  theStack:
    Code-Review ACK fac96d026511f22f0202ce3631a38be0e990555f

Tree-SHA512: fcfac0f1f8b16df7522513abf716b2eed3d2fc9153f231c8cb61f451e342f29c984a5c872deca6bab3e601e5d651874cc229146c9370e46811b4520747a21f2b
This commit is contained in:
MarcoFalke 2021-05-03 11:13:38 +02:00 committed by Konstantin Akimov
parent 23b83109ea
commit 334496ea7e
No known key found for this signature in database
GPG Key ID: 2176C4A5D01EA524

View File

@ -5642,155 +5642,154 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
}
peer->m_blocks_for_inv_relay.clear();
}
auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) {
AssertLockHeld(peer->m_tx_relay->m_tx_inventory_mutex);
peer->m_tx_relay->m_tx_inventory_known_filter.insert(invIn.hash);
LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId());
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
vInv.push_back(invIn);
if (vInv.size() == MAX_INV_SZ) {
LogPrint(BCLog::NET, "SendMessages -- pushing invs: count=%d peer=%d\n", vInv.size(), pto->GetId());
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) {
AssertLockHeld(peer->m_tx_relay->m_tx_inventory_mutex);
peer->m_tx_relay->m_tx_inventory_known_filter.insert(invIn.hash);
LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId());
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
vInv.push_back(invIn);
if (vInv.size() == MAX_INV_SZ) {
LogPrint(BCLog::NET, "SendMessages -- pushing invs: count=%d peer=%d\n", vInv.size(), pto->GetId());
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
};
if (!pto->IsBlockOnlyConn()) {
LOCK(peer->m_tx_relay->m_tx_inventory_mutex);
// Check whether periodic sends should happen
// Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes
// because we never produce any txes ourselves i.e. no privacy is lost in this case.
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode;
if (peer->m_tx_relay->m_next_inv_send_time < current_time) {
fSendTrickle = true;
if (pto->IsInboundConn()) {
peer->m_tx_relay->m_next_inv_send_time = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
} else {
// Use half the delay for Masternode outbound peers, as there is less privacy concern for them.
peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ?
PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) :
PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2);
}
};
}
if (!pto->IsBlockOnlyConn()) {
LOCK(peer->m_tx_relay->m_tx_inventory_mutex);
// Check whether periodic sends should happen
// Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes
// because we never produce any txes ourselves i.e. no privacy is lost in this case.
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode;
if (peer->m_tx_relay->m_next_inv_send_time < current_time) {
fSendTrickle = true;
if (pto->IsInboundConn()) {
peer->m_tx_relay->m_next_inv_send_time = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
} else {
// Use half the delay for Masternode outbound peers, as there is less privacy concern for them.
peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ?
PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) :
PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2);
}
// Time to send but the peer has requested we not relay transactions.
if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear();
}
// Respond to BIP35 mempool requests
if (fSendTrickle && peer->m_tx_relay->m_send_mempool) {
auto vtxinfo = m_mempool.infoAll();
peer->m_tx_relay->m_send_mempool = false;
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
// Send invs for txes and corresponding IS-locks
for (const auto& txinfo : vtxinfo) {
const uint256& hash = txinfo.tx->GetHash();
peer->m_tx_relay->m_tx_inventory_to_send.erase(hash);
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
queueAndMaybePushInv(CInv(nInvType, hash));
const auto islock = m_llmq_ctx->isman->GetInstantSendLockByTxid(hash);
if (islock == nullptr) continue;
if (pto->nVersion < ISDLOCK_PROTO_VERSION) continue;
queueAndMaybePushInv(CInv(MSG_ISDLOCK, ::SerializeHash(*islock)));
}
// Time to send but the peer has requested we not relay transactions.
if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear();
// Send an inv for the best ChainLock we have
const auto& clsig = m_llmq_ctx->clhandler->GetBestChainLock();
if (!clsig.IsNull()) {
uint256 chainlockHash = ::SerializeHash(clsig);
queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash));
}
peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
}
// Respond to BIP35 mempool requests
if (fSendTrickle && peer->m_tx_relay->m_send_mempool) {
auto vtxinfo = m_mempool.infoAll();
peer->m_tx_relay->m_send_mempool = false;
// Determine transactions to relay
if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
// Send invs for txes and corresponding IS-locks
for (const auto& txinfo : vtxinfo) {
const uint256& hash = txinfo.tx->GetHash();
peer->m_tx_relay->m_tx_inventory_to_send.erase(hash);
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
queueAndMaybePushInv(CInv(nInvType, hash));
const auto islock = m_llmq_ctx->isman->GetInstantSendLockByTxid(hash);
if (islock == nullptr) continue;
if (pto->nVersion < ISDLOCK_PROTO_VERSION) continue;
queueAndMaybePushInv(CInv(MSG_ISDLOCK, ::SerializeHash(*islock)));
}
// Send an inv for the best ChainLock we have
const auto& clsig = m_llmq_ctx->clhandler->GetBestChainLock();
if (!clsig.IsNull()) {
uint256 chainlockHash = ::SerializeHash(clsig);
queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash));
}
peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
// Produce a vector with all candidates for sending
std::vector<std::set<uint256>::iterator> vInvTx;
vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size());
for (std::set<uint256>::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) {
vInvTx.push_back(it);
}
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
// A heap is used so that not all items need sorting if only a few are being sent.
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool);
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
// No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0;
size_t broadcast_max{INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000 + (peer->m_tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(1000, broadcast_max);
// Determine transactions to relay
if (fSendTrickle) {
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
// Produce a vector with all candidates for sending
std::vector<std::set<uint256>::iterator> vInvTx;
vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size());
for (std::set<uint256>::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) {
vInvTx.push_back(it);
while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
// Fetch the top element from the heap
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
std::set<uint256>::iterator it = vInvTx.back();
vInvTx.pop_back();
uint256 hash = *it;
// Remove it from the to-be-sent set
peer->m_tx_relay->m_tx_inventory_to_send.erase(it);
// Check if not in the filter already
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
continue;
}
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
// A heap is used so that not all items need sorting if only a few are being sent.
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool);
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
// No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0;
size_t broadcast_max{INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000 + (peer->m_tx_relay->m_tx_inventory_to_send.size()/1000)*5};
broadcast_max = std::min<size_t>(1000, broadcast_max);
while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
// Fetch the top element from the heap
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
std::set<uint256>::iterator it = vInvTx.back();
vInvTx.pop_back();
uint256 hash = *it;
// Remove it from the to-be-sent set
peer->m_tx_relay->m_tx_inventory_to_send.erase(it);
// Check if not in the filter already
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
continue;
}
// Not in the mempool anymore? don't bother sending it.
auto txinfo = m_mempool.info(hash);
if (!txinfo.tx) {
continue;
}
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
State(pto->GetId())->m_recently_announced_invs.insert(hash);
nRelayedTransactions++;
// Not in the mempool anymore? don't bother sending it.
auto txinfo = m_mempool.info(hash);
if (!txinfo.tx) {
continue;
}
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
State(pto->GetId())->m_recently_announced_invs.insert(hash);
nRelayedTransactions++;
{
// Expire old relay messages
while (!g_relay_expiration.empty() && g_relay_expiration.front().first < current_time)
{
// Expire old relay messages
while (!g_relay_expiration.empty() && g_relay_expiration.front().first < current_time)
{
mapRelay.erase(g_relay_expiration.front().second);
g_relay_expiration.pop_front();
}
auto ret = mapRelay.emplace(hash, std::move(txinfo.tx));
if (ret.second) {
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret.first);
}
mapRelay.erase(g_relay_expiration.front().second);
g_relay_expiration.pop_front();
}
auto ret = mapRelay.emplace(hash, std::move(txinfo.tx));
if (ret.second) {
g_relay_expiration.emplace_back(current_time + RELAY_TX_CACHE_TIME, ret.first);
}
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
queueAndMaybePushInv(CInv(nInvType, hash));
}
int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX;
queueAndMaybePushInv(CInv(nInvType, hash));
}
}
}
{
// Send non-tx/non-block inventory items
LOCK2(peer->m_tx_relay->m_tx_inventory_mutex, peer->m_tx_relay->m_bloom_filter_mutex);
{
// Send non-tx/non-block inventory items
LOCK2(peer->m_tx_relay->m_tx_inventory_mutex, peer->m_tx_relay->m_bloom_filter_mutex);
bool fSendIS = peer->m_tx_relay->m_relay_txs && !pto->IsBlockRelayOnly();
bool fSendIS = peer->m_tx_relay->m_relay_txs && !pto->IsBlockRelayOnly();
for (const auto& inv : peer->m_tx_relay->vInventoryOtherToSend) {
if (!peer->m_tx_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
continue;
}
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) {
continue;
}
if (!fSendIS && inv.type == MSG_ISDLOCK) {
continue;
}
queueAndMaybePushInv(inv);
for (const auto& inv : peer->m_tx_relay->vInventoryOtherToSend) {
if (!peer->m_tx_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
continue;
}
peer->m_tx_relay->vInventoryOtherToSend.clear();
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) {
continue;
}
if (!fSendIS && inv.type == MSG_ISDLOCK) {
continue;
}
queueAndMaybePushInv(inv);
}
peer->m_tx_relay->vInventoryOtherToSend.clear();
}
if (!vInv.empty())
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));