From a36f8f2a1a85c20afde0f334d07306ffb171e5d3 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:29:26 +0000 Subject: [PATCH] merge bitcoin#25880: Make stalling timeout adaptive during IBD We need to disable mocktime so that the node doesn't resort to direct fetching. We also need to delay DIP3's activation so that blocks don't get rejected for not having a valid DIP3-compliant coinbase. --- src/net_processing.cpp | 34 +++++- test/functional/p2p_ibd_stalling.py | 167 ++++++++++++++++++++++++++++ test/functional/test_runner.py | 1 + 3 files changed, 197 insertions(+), 5 deletions(-) create mode 100755 test/functional/p2p_ibd_stalling.py diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3f660baa25..abfcc2ed1c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -124,8 +124,11 @@ static constexpr std::chrono::minutes PING_INTERVAL{2}; static const unsigned int MAX_LOCATOR_SZ = 101; /** Number of blocks that can be requested at any given time from a single peer. */ static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16; -/** Time during which a peer must stall block download progress before being disconnected. */ -static constexpr auto BLOCK_STALLING_TIMEOUT = 2s; +/** Default time during which a peer must stall block download progress before being disconnected. + * the actual timeout is increased temporarily if peers are disconnected for hitting the timeout */ +static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s}; +/** Maximum timeout for stalling block download. */ +static constexpr auto BLOCK_STALLING_TIMEOUT_MAX{64s}; /** Maximum depth of blocks we're willing to serve as compact blocks to peers * when requested. For older blocks, a regular BLOCK response will be sent. */ static const int MAX_CMPCTBLOCK_DEPTH = 5; @@ -835,6 +838,9 @@ private: /** Number of preferable block download peers. */ int m_num_preferred_download_peers GUARDED_BY(cs_main){0}; + /** Stalling timeout for blocks in IBD */ + std::atomic m_block_stalling_timeout{BLOCK_STALLING_TIMEOUT_DEFAULT}; + bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); @@ -1896,8 +1902,9 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) } /** - * Evict orphan txn pool entries (EraseOrphanTx) based on a newly connected - * block. Also save the time of the last tip update. + * Evict orphan txn pool entries based on a newly connected + * block. Also save the time of the last tip update and + * possibly reduce dynamic block stalling timeout. */ void PeerManagerImpl::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) { @@ -1918,6 +1925,16 @@ void PeerManagerImpl::BlockConnected(const std::shared_ptr& pblock m_recent_confirmed_transactions.insert(ptx->GetHash()); } } + + // In case the dynamic timeout was doubled once or more, reduce it slowly back to its default value + auto stalling_timeout = m_block_stalling_timeout.load(); + Assume(stalling_timeout >= BLOCK_STALLING_TIMEOUT_DEFAULT); + if (stalling_timeout != BLOCK_STALLING_TIMEOUT_DEFAULT) { + const auto new_timeout = std::max(std::chrono::duration_cast(stalling_timeout * 0.85), BLOCK_STALLING_TIMEOUT_DEFAULT); + if (m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) { + LogPrint(BCLog::NET, "Decreased stalling timeout to %d seconds\n", new_timeout.count()); + } + } } void PeerManagerImpl::BlockDisconnected(const std::shared_ptr &block, const CBlockIndex* pindex) @@ -5855,12 +5872,19 @@ bool PeerManagerImpl::SendMessages(CNode* pto) m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); // Detect whether we're stalling - if (state.m_stalling_since.count() && state.m_stalling_since < current_time - BLOCK_STALLING_TIMEOUT) { + auto stalling_timeout = m_block_stalling_timeout.load(); + if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->GetId()); pto->fDisconnect = true; + // Increase timeout for the next peer so that we don't disconnect multiple peers if our own + // bandwidth is insufficient. + const auto new_timeout = std::min(2 * stalling_timeout, BLOCK_STALLING_TIMEOUT_MAX); + if (stalling_timeout != new_timeout && m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) { + LogPrint(BCLog::NET, "Increased stalling timeout temporarily to %d seconds\n", m_block_stalling_timeout.load().count()); + } return true; } // In case there is a block that has been in flight from this peer for block_interval * (1 + 0.5 * N) diff --git a/test/functional/p2p_ibd_stalling.py b/test/functional/p2p_ibd_stalling.py new file mode 100755 index 0000000000..8692a001d2 --- /dev/null +++ b/test/functional/p2p_ibd_stalling.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# Copyright (c) 2022- The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test stalling logic during IBD +""" + +import time + +from test_framework.blocktools import ( + create_block, + create_coinbase +) +from test_framework.messages import ( + MSG_BLOCK, + MSG_TYPE_MASK, + NODE_NETWORK, + NODE_BLOOM, +) +from test_framework.p2p import ( + CBlockHeader, + msg_block, + msg_headers, + P2PDataStore, +) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, +) + + +class P2PStaller(P2PDataStore): + def __init__(self, stall_block): + self.stall_block = stall_block + super().__init__() + + def on_getdata(self, message): + for inv in message.inv: + self.getdata_requests.append(inv.hash) + if (inv.type & MSG_TYPE_MASK) == MSG_BLOCK: + if (inv.hash != self.stall_block): + self.send_message(msg_block(self.block_store[inv.hash])) + + def on_getheaders(self, message): + pass + +class P2PIBDStallingTest(BitcoinTestFramework): + def set_test_params(self): + self.disable_mocktime = True + self.extra_args = [["-dip3params=2000:2000"]] + self.setup_clean_chain = True + self.num_nodes = 1 + + def run_test(self): + NUM_BLOCKS = 1025 + NUM_PEERS = 4 + node = self.nodes[0] + tip = int(node.getbestblockhash(), 16) + blocks = [] + height = 1 + block_time = node.getblock(node.getbestblockhash())['time'] + 1 + self.log.info("Prepare blocks without sending them to the node") + block_dict = {} + for _ in range(NUM_BLOCKS): + blocks.append(create_block(tip, create_coinbase(height), block_time)) + blocks[-1].solve() + tip = blocks[-1].sha256 + block_time += 1 + height += 1 + block_dict[blocks[-1].sha256] = blocks[-1] + stall_block = blocks[0].sha256 + + headers_message = msg_headers() + headers_message.headers = [CBlockHeader(b) for b in blocks[:NUM_BLOCKS-1]] + peers = [] + + self.log.info("Check that a staller does not get disconnected if the 1024 block lookahead buffer is filled") + for id in range(NUM_PEERS): + peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_block), services = NODE_NETWORK | NODE_BLOOM, p2p_idx=id, connection_type="outbound-full-relay")) + peers[-1].block_store = block_dict + peers[-1].send_message(headers_message) + + # Need to wait until 1023 blocks are received - the magic total bytes number is a workaround in lack of an rpc + # returning the number of downloaded (but not connected) blocks. + self.wait_until(lambda: self.total_bytes_recv_for_blocks() == 172761) + + self.all_sync_send_with_ping(peers) + # If there was a peer marked for stalling, it would get disconnected + self.mocktime = int(time.time()) + 3 + node.setmocktime(self.mocktime) + self.all_sync_send_with_ping(peers) + assert_equal(node.num_test_p2p_connections(), NUM_PEERS) + + self.log.info("Check that increasing the window beyond 1024 blocks triggers stalling logic") + headers_message.headers = [CBlockHeader(b) for b in blocks] + with node.assert_debug_log(expected_msgs=['Stall started']): + for p in peers: + p.send_message(headers_message) + self.all_sync_send_with_ping(peers) + + self.log.info("Check that the stalling peer is disconnected after 2 seconds") + self.mocktime += 3 + node.setmocktime(self.mocktime) + peers[0].wait_for_disconnect() + assert_equal(node.num_test_p2p_connections(), NUM_PEERS - 1) + self.wait_until(lambda: self.is_block_requested(peers, stall_block)) + # Make sure that SendMessages() is invoked, which assigns the missing block + # to another peer and starts the stalling logic for them + self.all_sync_send_with_ping(peers) + + self.log.info("Check that the stalling timeout gets doubled to 4 seconds for the next staller") + # No disconnect after just 3 seconds + self.mocktime += 3 + node.setmocktime(self.mocktime) + self.all_sync_send_with_ping(peers) + assert_equal(node.num_test_p2p_connections(), NUM_PEERS - 1) + + self.mocktime += 2 + node.setmocktime(self.mocktime) + self.wait_until(lambda: node.num_test_p2p_connections() == NUM_PEERS - 2) + self.wait_until(lambda: self.is_block_requested(peers, stall_block)) + self.all_sync_send_with_ping(peers) + + self.log.info("Check that the stalling timeout gets doubled to 8 seconds for the next staller") + # No disconnect after just 7 seconds + self.mocktime += 7 + node.setmocktime(self.mocktime) + self.all_sync_send_with_ping(peers) + assert_equal(node.num_test_p2p_connections(), NUM_PEERS - 2) + + self.mocktime += 2 + node.setmocktime(self.mocktime) + self.wait_until(lambda: node.num_test_p2p_connections() == NUM_PEERS - 3) + self.wait_until(lambda: self.is_block_requested(peers, stall_block)) + self.all_sync_send_with_ping(peers) + + self.log.info("Provide the withheld block and check that stalling timeout gets reduced back to 2 seconds") + with node.assert_debug_log(expected_msgs=['Decreased stalling timeout to 2 seconds']): + for p in peers: + if p.is_connected and (stall_block in p.getdata_requests): + p.send_message(msg_block(block_dict[stall_block])) + + self.log.info("Check that all outstanding blocks get connected") + self.wait_until(lambda: node.getblockcount() == NUM_BLOCKS) + + def total_bytes_recv_for_blocks(self): + total = 0 + for info in self.nodes[0].getpeerinfo(): + if ("block" in info["bytesrecv_per_msg"].keys()): + total += info["bytesrecv_per_msg"]["block"] + return total + + def all_sync_send_with_ping(self, peers): + for p in peers: + if p.is_connected: + p.sync_send_with_ping() + + def is_block_requested(self, peers, hash): + for p in peers: + if p.is_connected and (hash in p.getdata_requests): + return True + return False + + +if __name__ == '__main__': + P2PIBDStallingTest().main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 6cd2368496..f91be1fb41 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -254,6 +254,7 @@ BASE_SCRIPTS = [ 'wallet_importprunedfunds.py --descriptors', 'p2p_leak_tx.py', 'p2p_eviction.py', + 'p2p_ibd_stalling.py', 'rpc_signmessage.py', 'rpc_generateblock.py', 'rpc_generate.py',