Merge #15834: Fix transaction relay bugs introduced in #14897 and expire transactions from peer in-flight map

308b76732f Fix bug around transaction requests (Suhas Daftuar)
f635a3ba11 Expire old entries from the in-flight tx map (Suhas Daftuar)
e32e08407e Remove NOTFOUND transactions from in-flight data structures (Suhas Daftuar)
23163b7593 Add an explicit memory bound to m_tx_process_time (Suhas Daftuar)
218697b645 Improve NOTFOUND comment (Suhas Daftuar)

Pull request description:

  #14897 introduced several bugs that could lead to a node no longer requesting transactions from one or more of its peers.  Credit to ajtowns for originally reporting many of these bugs along with an originally proposed fix in #15776.

  This PR does a few things:

  - Fix a bug in NOTFOUND processing, where the in-flight map for a peer was keeping transactions it shouldn't

  - Eliminate the possibility of a memory attack on the CNodeState `m_tx_process_time` data structure by explicitly bounding its size

  - Remove entries from a peer's in-flight map after 10 minutes, so that we should always eventually resume transaction requests even if there are other bugs like the NOTFOUND one

  - Fix a bug relating to the coordination of request times when multiple peers announce the same transaction

  The expiry mechanism added here is something we'll likely want to remove in the future, but is belt-and-suspenders for now to try to ensure we don't have other bugs that could lead to transaction relay failing due to some unforeseen conditions.

ACKs for commit 308b76:
  ajtowns:
    utACK 308b76732f97020c86977e29c854e8e27262cf7c
  morcos:
    light ACK 308b767
  laanwj:
    Code review ACK 308b76732f97020c86977e29c854e8e27262cf7c
  jonatack:
    Light ACK 308b76732f97020c86977e29c854e8e27262cf7c.
  jamesob:
    ACK 308b76732f
  MarcoFalke:
    ACK 308b76732f97020c86977e29c854e8e27262cf7c (Tested two of the three bugs this pull fixes, see comment above)
  jamesob:
    Concept ACK 308b76732f
  MarcoFalke:
    ACK 308b76732f

Tree-SHA512: 8865dca5294447859d95655e8699085643db60c22f0719e76e961651a1398251bc932494b68932e33f68d4f6084579ab3bed7d0e7dd4ac6c362590eaf9414eda
This commit is contained in:
MarcoFalke 2019-06-12 12:32:52 -04:00 committed by Alexander Block
parent 8c0ff34ccd
commit 74eabc23e5

View File

@ -62,11 +62,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
/** Maximum number of announced transactions from a peer */ /** Maximum number of announced transactions from a peer */
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
/** How many microseconds to delay requesting transactions from inbound peers */ /** How many microseconds to delay requesting transactions from inbound peers */
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ /** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
@ -323,8 +325,11 @@ struct CNodeState {
//! Store all the transactions a peer has recently announced //! Store all the transactions a peer has recently announced
std::set<uint256> m_tx_announced; std::set<uint256> m_tx_announced;
//! Store transactions which were requested by us //! Store transactions which were requested by us, with timestamp
std::set<uint256> m_tx_in_flight; std::map<uint256, int64_t> m_tx_in_flight;
//! Periodically check for stuck getdata requests
int64_t m_check_expiry_timer{0};
}; };
TxDownloadState m_tx_download; TxDownloadState m_tx_download;
@ -674,30 +679,40 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
} }
} }
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{ {
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) {
// Too many queued announcements from this peer, or we already have
// this announcement
return;
}
peer_download_state.m_tx_announced.insert(txid);
int64_t process_time; int64_t process_time;
int64_t last_request_time = GetTxRequestTime(txid); int64_t last_request_time = GetTxRequestTime(txid);
// First time requesting this tx // First time requesting this tx
if (last_request_time == 0) { if (last_request_time == 0) {
process_time = nNow; process_time = current_time;
} else { } else {
// Randomize the delay to avoid biasing some peers over others (such as due to // Randomize the delay to avoid biasing some peers over others (such as due to
// fixed ordering of peer processing in ThreadMessageHandler) // fixed ordering of peer processing in ThreadMessageHandler)
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
} }
// We delay processing announcements from non-preferred (eg inbound) peers // We delay processing announcements from inbound peers
if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
return process_time;
}
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
peer_download_state.m_tx_announced.count(txid)) {
// Too many queued announcements from this peer, or we already have
// this announcement
return;
}
peer_download_state.m_tx_announced.insert(txid);
// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload);
peer_download_state.m_tx_process_time.emplace(process_time, txid); peer_download_state.m_tx_process_time.emplace(process_time, txid);
} }
@ -1580,12 +1595,19 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
if (!vNotFound.empty()) { if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it doesn't // Let the peer know that we didn't find what it asked for, so it doesn't
// have to wait around forever. Currently only SPV clients actually care // have to wait around forever.
// about this message: it's needed when they are recursively walking the // SPV clients care about this message: it's needed when they are
// dependencies of relevant unconfirmed transactions. SPV clients want to // recursively walking the dependencies of relevant unconfirmed
// do that because they want to know about (and store and rebroadcast and // transactions. SPV clients want to do that because they want to know
// risk analyze) the dependencies of transactions relevant to them, without // about (and store and rebroadcast and risk analyze) the dependencies
// having to download the entire memory pool. // of transactions relevant to them, without having to download the
// entire memory pool.
// Also, other nodes can use these messages to automatically request a
// transaction from some other peer that annnounced it, and stop
// waiting for us to respond.
// In normal operation, we often send NOTFOUND messages for parents of
// transactions that we relay; if a peer is missing a parent, they may
// assume we have them and request the parents from us.
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound));
} }
} }
@ -3338,8 +3360,27 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (strCommand == NetMsgType::NOTFOUND) { if (strCommand == NetMsgType::NOTFOUND) {
// We do not care about the NOTFOUND message, but logging an Unknown Command // Remove the NOTFOUND transactions from the peer
// message would be undesirable as we transmit it ourselves. LOCK(cs_main);
CNodeState *state = State(pfrom->GetId());
std::vector<CInv> vInv;
vRecv >> vInv;
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
for (CInv &inv : vInv) {
if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) {
// If we receive a NOTFOUND message for a txid we requested, erase
// it from our data structures for this peer.
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
// Skip any further work if this is a spurious NOTFOUND
// message.
continue;
}
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
state->m_tx_download.m_tx_announced.erase(inv.hash);
}
}
}
return true; return true;
} }
@ -4186,9 +4227,33 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
// //
// Message: getdata (non-blocks) // Message: getdata (non-blocks)
// //
// For robustness, expire old requests after a long timeout, so that
// we can resume downloading transactions from a peer even if they
// were unresponsive in the past.
// Eventually we should consider disconnecting peers, but this is
// conservative.
if (state.m_tx_download.m_check_expiry_timer <= nNow) {
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
if (it->second <= nNow - TX_EXPIRY_INTERVAL) {
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
state.m_tx_download.m_tx_announced.erase(it->first);
state.m_tx_download.m_tx_in_flight.erase(it++);
} else {
++it;
}
}
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
// so that we're not doing this for all peers at the same time.
state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
}
auto& tx_process_time = state.m_tx_download.m_tx_process_time; auto& tx_process_time = state.m_tx_download.m_tx_process_time;
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
const uint256& txid = tx_process_time.begin()->second; const uint256 txid = tx_process_time.begin()->second;
// Erase this entry from tx_process_time (it may be added back for
// processing at a later time, see below)
tx_process_time.erase(tx_process_time.begin());
CInv inv(MSG_TX | GetFetchFlags(pto), txid); CInv inv(MSG_TX | GetFetchFlags(pto), txid);
if (!AlreadyHave(inv)) { if (!AlreadyHave(inv)) {
// If this transaction was last requested more than 1 minute ago, // If this transaction was last requested more than 1 minute ago,
@ -4202,20 +4267,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
vGetData.clear(); vGetData.clear();
} }
UpdateTxRequestTime(inv.hash, nNow); UpdateTxRequestTime(inv.hash, nNow);
state.m_tx_download.m_tx_in_flight.insert(inv.hash); state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow);
} else { } else {
// This transaction is in flight from someone else; queue // This transaction is in flight from someone else; queue
// up processing to happen after the download times out // up processing to happen after the download times out
// (with a slight delay for inbound peers, to prefer // (with a slight delay for inbound peers, to prefer
// requests to outbound peers). // requests to outbound peers).
RequestTx(&state, txid, nNow); int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload);
tx_process_time.emplace(next_process_time, txid);
} }
} else { } else {
// We have already seen this transaction, no need to download. // We have already seen this transaction, no need to download.
state.m_tx_download.m_tx_announced.erase(inv.hash); state.m_tx_download.m_tx_announced.erase(inv.hash);
state.m_tx_download.m_tx_in_flight.erase(inv.hash); state.m_tx_download.m_tx_in_flight.erase(inv.hash);
} }
tx_process_time.erase(tx_process_time.begin());
} }