diff --git a/src/net.cpp b/src/net.cpp index 6619998749..579ab7b923 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1998,7 +1998,6 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: - case ConnectionType::FEELER: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; @@ -2009,6 +2008,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ // no limit for ADDR_FETCH because -seednode has no limit either case ConnectionType::ADDR_FETCH: break; + // no limit for FEELER connections since they're short-lived + case ConnectionType::FEELER: + break; } // no default case, so the compiler can warn about missing cases // Count existing connections @@ -2253,6 +2255,7 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, for (CNode* pnode : nodes) { bool select_recv = !pnode->fHasRecvData; bool select_send = !pnode->fCanSendData; + if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); if (!pnode->m_sock) { @@ -2623,9 +2626,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // receiving data. This means properly utilizing TCP flow control signalling. // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try // receiving data (which should succeed as the socket signalled as receivable). - const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0); - const bool queue_is_empty{to_send.empty() && !more}; - if (!it->second->fPauseRecv && !it->second->fDisconnect && queue_is_empty) { + if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) { it->second->AddRef(); vReceivableNodes.emplace(it->second); } @@ -3301,7 +3302,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) { - break; + continue; } // if we selected an invalid address, restart diff --git a/src/net.h b/src/net.h index ab17ca7758..23988dd1d2 100644 --- a/src/net.h +++ b/src/net.h @@ -1433,8 +1433,8 @@ public: * Attempts to open a connection. Currently only used from tests. * * @param[in] address Address of node to try connecting to - * @param[in] conn_type ConnectionType::OUTBOUND or ConnectionType::BLOCK_RELAY - * or ConnectionType::ADDR_FETCH + * @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY, + * ConnectionType::ADDR_FETCH or ConnectionType::FEELER * @return bool Returns false if there are no available * slots for this connection: * - conn_type not a supported ConnectionType diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bf7e590975..dd484f51e9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -272,25 +272,34 @@ struct Peer { struct TxRelay { mutable RecursiveMutex m_bloom_filter_mutex; - // We use m_relay_txs for two purposes - - // a) it allows us to not relay tx invs before receiving the peer's version message - // b) the peer may tell us in its version message that we should not relay tx invs - // unless it loads a bloom filter. + /** Whether the peer wishes to receive transaction announcements. + * + * This is initially set based on the fRelay flag in the received + * `version` message. If initially set to false, it can only be flipped + * to true if we have offered the peer NODE_BLOOM services and it sends + * us a `filterload` or `filterclear` message. See BIP37. */ bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false}; + /** A bloom filter for which transactions to announce to the peer. See BIP37. */ std::unique_ptr m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr}; mutable RecursiveMutex m_tx_inventory_mutex; - // inventory based relay + /** A filter of all the txids that the peer has announced to + * us or we have announced to the peer. We use this to avoid announcing + * the same txid to a peer that already has the transaction. */ CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001}; - // Set of transaction ids we still have to announce. - // They are sorted by the mempool before relay, so the order is not important. + /** Set of transaction ids we still have to announce. We use the + * mempool to sort transactions in dependency order before relay, so + * this does not have to be sorted. */ std::set m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex); - // List of non-tx/non-block inventory items + /** List of non-tx/non-block inventory items */ std::vector vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex); - // Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex + /** Whether the peer has requested us to send our complete mempool. Only + * permitted if the peer has NetPermissionFlags::Mempool. See BIP35. */ bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false}; - // Last time a "MEMPOOL" request was serviced. + /** The last time a BIP35 `mempool` request was serviced. */ std::atomic m_last_mempool_req{0s}; + /** The next time after which we will send an `inv` message containing + * transaction announcements to this peer. */ std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; }; @@ -1384,7 +1393,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION); } - const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn(); + const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn(); m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime, your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) @@ -3831,6 +3840,12 @@ void PeerManagerImpl::ProcessMessage( best_block = &inv.hash; } } else { + if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { + LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + const bool fAlreadyHave = AlreadyHave(inv); LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); ::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f); @@ -3840,11 +3855,7 @@ void PeerManagerImpl::ProcessMessage( }; AddKnownInv(*peer, inv.hash); - if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { - LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); - pfrom.fDisconnect = true; - return; - } else if (!fAlreadyHave) { + if (!fAlreadyHave) { if (fBlocksOnly && inv.type == MSG_ISDLOCK) { if (pfrom.GetCommonVersion() <= ADDRV2_PROTO_VERSION) { // It's ok to receive these invs, we just ignore them @@ -5889,7 +5900,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. - LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->GetId()); + LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; // Increase timeout for the next peer so that we don't disconnect multiple peers if our own // bandwidth is insufficient. @@ -5908,7 +5919,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { - LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId()); + LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; return true; } @@ -5924,11 +5935,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // disconnect our sync peer for stalling; we have bigger // problems if we can't get any outbound peers. if (!pto->HasPermission(NetPermissionFlags::NoBan)) { - LogPrintf("Timeout downloading headers from peer=%d, disconnecting\n", pto->GetId()); + LogPrintf("Timeout downloading headers from peer=%d%s, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; return true; } else { - LogPrintf("Timeout downloading headers from noban peer=%d, not disconnecting\n", pto->GetId()); + LogPrintf("Timeout downloading headers from noban peer=%d%s, not disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); // Reset the headers sync state so that we have a // chance to try downloading from a different peer. // Note: this will also result in at least one more diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 8c3e96a22c..80fc983824 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -230,6 +230,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "getnodeaddresses", 0, "count"}, { "addpeeraddress", 1, "port"}, { "addpeeraddress", 2, "tried"}, + { "sendmsgtopeer", 0, "peer_id" }, { "stop", 0, "wait" }, { "verifychainlock", 2, "blockHeight" }, { "verifyislock", 3, "maxHeight" }, diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index a5ae324644..7d5418c829 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -352,7 +352,7 @@ static RPCHelpMan addconnection() "\nOpen an outbound connection to a specified node. This RPC is for testing only.\n", { {"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."}, - {"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\" or \"addr-fetch\")."}, + {"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."}, }, RPCResult{ RPCResult::Type::OBJ, "", "", @@ -380,6 +380,8 @@ static RPCHelpMan addconnection() conn_type = ConnectionType::BLOCK_RELAY; } else if (conn_type_in == "addr-fetch") { conn_type = ConnectionType::ADDR_FETCH; + } else if (conn_type_in == "feeler") { + conn_type = ConnectionType::FEELER; } else { throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString()); } @@ -1019,6 +1021,53 @@ static RPCHelpMan addpeeraddress() }; } +static RPCHelpMan sendmsgtopeer() +{ + return RPCHelpMan{ + "sendmsgtopeer", + "Send a p2p message to a peer specified by id.\n" + "The message type and body must be provided, the message header will be generated.\n" + "This RPC is for testing only.", + { + {"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to send the message to."}, + {"msg_type", RPCArg::Type::STR, RPCArg::Optional::NO, strprintf("The message type (maximum length %i)", CMessageHeader::COMMAND_SIZE)}, + {"msg", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The serialized message body to send, in hex, without a message header"}, + }, + RPCResult{RPCResult::Type::NONE, "", ""}, + RPCExamples{ + HelpExampleCli("sendmsgtopeer", "0 \"addr\" \"ffffff\"") + HelpExampleRpc("sendmsgtopeer", "0 \"addr\" \"ffffff\"")}, + [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { + const NodeId peer_id{request.params[0].get_int()}; + const std::string& msg_type{request.params[1].get_str()}; + if (msg_type.size() > CMessageHeader::COMMAND_SIZE) { + throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Error: msg_type too long, max length is %i", CMessageHeader::COMMAND_SIZE)); + } + const std::string& msg{request.params[2].get_str()}; + if (!msg.empty() && !IsHex(msg)) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Error parsing input for msg"); + } + + NodeContext& node = EnsureAnyNodeContext(request.context); + CConnman& connman = EnsureConnman(node); + + CSerializedNetMsg msg_ser; + msg_ser.data = ParseHex(msg); + msg_ser.m_type = msg_type; + + bool success = connman.ForNode(peer_id, [&](CNode* node) { + connman.PushMessage(node, std::move(msg_ser)); + return true; + }); + + if (!success) { + throw JSONRPCError(RPC_MISC_ERROR, "Error: Could not send message to peer"); + } + + return NullUniValue; + }, + }; +} + static RPCHelpMan setmnthreadactive() { return RPCHelpMan{"setmnthreadactive", @@ -1068,6 +1117,7 @@ static const CRPCCommand commands[] = { "hidden", &addconnection, }, { "hidden", &addpeeraddress, }, + { "hidden", &sendmsgtopeer }, { "hidden", &setmnthreadactive }, }; // clang-format on diff --git a/src/test/fuzz/rpc.cpp b/src/test/fuzz/rpc.cpp index 7a2e052649..435ebc1b91 100644 --- a/src/test/fuzz/rpc.cpp +++ b/src/test/fuzz/rpc.cpp @@ -149,6 +149,7 @@ const std::vector RPC_COMMANDS_SAFE_FOR_FUZZING{ "pruneblockchain", "reconsiderblock", "scantxoutset", + "sendmsgtopeer", // when no peers are connected, no p2p message is sent "sendrawtransaction", "setmnthreadactive", "setmocktime", diff --git a/test/functional/p2p_add_connections.py b/test/functional/p2p_add_connections.py index 8b7ea12d91..c9b3bc335a 100755 --- a/test/functional/p2p_add_connections.py +++ b/test/functional/p2p_add_connections.py @@ -6,8 +6,18 @@ from test_framework.p2p import P2PInterface from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import check_node_connections +from test_framework.util import ( + assert_equal, + check_node_connections, +) +class P2PFeelerReceiver(P2PInterface): + def on_version(self, message): + # The bitcoind node closes feeler connections as soon as a version + # message is received from the test framework. Don't send any responses + # to the node's version message since the connection will already be + # closed. + pass class P2PAddConnections(BitcoinTestFramework): def set_test_params(self): @@ -86,6 +96,16 @@ class P2PAddConnections(BitcoinTestFramework): check_node_connections(node=self.nodes[1], num_in=5, num_out=10) + self.log.info("Add 1 feeler connection to node 0") + feeler_conn = self.nodes[0].add_outbound_p2p_connection(P2PFeelerReceiver(), p2p_idx=6, connection_type="feeler") + + # Feeler connection is closed + assert not feeler_conn.is_connected + + # Verify version message received + assert_equal(feeler_conn.message_count["version"], 1) + # Feeler connections do not request tx relay + assert_equal(feeler_conn.last_message["version"].relay, 0) if __name__ == '__main__': P2PAddConnections().main() diff --git a/test/functional/p2p_disconnect_ban.py b/test/functional/p2p_disconnect_ban.py index eb9314cd82..e51b22b286 100755 --- a/test/functional/p2p_disconnect_ban.py +++ b/test/functional/p2p_disconnect_ban.py @@ -91,7 +91,7 @@ class DisconnectBanTest(BitcoinTestFramework): self.log.info("disconnectnode: successfully disconnect node by address") address1 = self.nodes[0].getpeerinfo()[0]['addr'] self.nodes[0].disconnectnode(address=address1) - self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10) + self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10) assert not [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1] self.log.info("disconnectnode: successfully reconnect node") @@ -102,7 +102,7 @@ class DisconnectBanTest(BitcoinTestFramework): self.log.info("disconnectnode: successfully disconnect node by node id") id1 = self.nodes[0].getpeerinfo()[0]['id'] self.nodes[0].disconnectnode(nodeid=id1) - self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10) + self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10) assert not [node for node in self.nodes[0].getpeerinfo() if node['id'] == id1] if __name__ == '__main__': diff --git a/test/functional/p2p_net_deadlock.py b/test/functional/p2p_net_deadlock.py new file mode 100755 index 0000000000..cf62d13310 --- /dev/null +++ b/test/functional/p2p_net_deadlock.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import threading +from test_framework.messages import MAX_PROTOCOL_MESSAGE_LENGTH +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import random_bytes + +class NetDeadlockTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 2 + + def run_test(self): + node0 = self.nodes[0] + node1 = self.nodes[1] + + self.log.info("Simultaneously send a large message on both sides") + rand_msg = random_bytes(MAX_PROTOCOL_MESSAGE_LENGTH).hex() + + thread0 = threading.Thread(target=node0.sendmsgtopeer, args=(0, "unknown", rand_msg)) + thread1 = threading.Thread(target=node1.sendmsgtopeer, args=(0, "unknown", rand_msg)) + + thread0.start() + thread1.start() + thread0.join() + thread1.join() + + self.log.info("Check whether a deadlock happened") + self.nodes[0].generate(1) + self.sync_blocks() + + +if __name__ == '__main__': + NetDeadlockTest().main() diff --git a/test/functional/rpc_net.py b/test/functional/rpc_net.py index 1b7c6311bf..30b626ea23 100755 --- a/test/functional/rpc_net.py +++ b/test/functional/rpc_net.py @@ -10,6 +10,7 @@ Tests correspond to code in rpc/net.cpp. from test_framework.p2p import P2PInterface import test_framework.messages from test_framework.messages import ( + MAX_PROTOCOL_MESSAGE_LENGTH, NODE_NETWORK, ) @@ -66,6 +67,7 @@ class NetTest(DashTestFramework): self.test_service_flags() self.test_getnodeaddresses() self.test_addpeeraddress() + self.test_sendmsgtopeer() def test_connection_count(self): self.log.info("Test getconnectioncount") @@ -341,6 +343,37 @@ class NetTest(DashTestFramework): addrs = node.getnodeaddresses(count=0) # getnodeaddresses re-runs the addrman checks assert_equal(len(addrs), 2) + def test_sendmsgtopeer(self): + node = self.nodes[0] + + self.restart_node(0) + self.connect_nodes(0, 1) + + self.log.info("Test sendmsgtopeer") + self.log.debug("Send a valid message") + with self.nodes[1].assert_debug_log(expected_msgs=["received: addr"]): + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FFFFFF") + + self.log.debug("Test error for sending to non-existing peer") + assert_raises_rpc_error(-1, "Error: Could not send message to peer", node.sendmsgtopeer, peer_id=100, msg_type="addr", msg="FF") + + self.log.debug("Test that zero-length msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="") + + self.log.debug("Test error for msg_type that is too long") + assert_raises_rpc_error(-8, "Error: msg_type too long, max length is 12", node.sendmsgtopeer, peer_id=0, msg_type="long_msg_type", msg="FF") + + self.log.debug("Test that unknown msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="unknown", msg="FF") + + self.log.debug("Test that empty msg is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FF") + + self.log.debug("Test that oversized messages are allowed, but get us disconnected") + zero_byte_string = b'\x00' * int(MAX_PROTOCOL_MESSAGE_LENGTH + 1) + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg=zero_byte_string.hex()) + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0, timeout=10) + if __name__ == '__main__': NetTest().main() diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 0eeed8ba3a..bbd3ee06c9 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -695,42 +695,61 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass): if (a == b): return - def connect_nodes_helper(from_connection, node_num): - ip_port = "127.0.0.1:" + str(p2p_port(node_num)) - from_connection.addnode(ip_port, "onetry") - # poll until version handshake complete to avoid race conditions - # with transaction relaying - # See comments in net_processing: - # * Must have a version message before anything else - # * Must have a verack message before anything else - wait_until_helper(lambda: all(peer['version'] != 0 for peer in from_connection.getpeerinfo())) - wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo())) + from_connection = self.nodes[a] + to_connection = self.nodes[b] + ip_port = "127.0.0.1:" + str(p2p_port(b)) + from_connection.addnode(ip_port, "onetry") - connect_nodes_helper(self.nodes[a], b) + # Use subversion as peer id. Test nodes have their node number appended to the user agent string + from_connection_subver = from_connection.getnetworkinfo()['subversion'] + to_connection_subver = to_connection.getnetworkinfo()['subversion'] + + def find_conn(node, peer_subversion, inbound): + return next(filter(lambda peer: peer['subver'] == peer_subversion and peer['inbound'] == inbound, node.getpeerinfo()), None) + + # poll until version handshake complete to avoid race conditions + # with transaction relaying + # See comments in net_processing: + # * Must have a version message before anything else + # * Must have a verack message before anything else + self.wait_until(lambda: find_conn(from_connection, to_connection_subver, inbound=False) is not None) + self.wait_until(lambda: find_conn(to_connection, from_connection_subver, inbound=True) is not None) + + def check_bytesrecv(peer, msg_type, min_bytes_recv): + assert peer is not None, "Error: peer disconnected" + return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv + + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 24)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 24)) + + # The message bytes are counted before processing the message, so make + # sure it was fully processed by waiting for a ping. + self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 32)) + self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 32)) def disconnect_nodes(self, a, b): # A node cannot disconnect from itself, bail out early if (a == b): return - def disconnect_nodes_helper(from_connection, node_num): - def get_peer_ids(): + def disconnect_nodes_helper(node_a, node_b): + def get_peer_ids(from_connection, node_num): result = [] for peer in from_connection.getpeerinfo(): if "testnode{}".format(node_num) in peer['subver']: result.append(peer['id']) return result - peer_ids = get_peer_ids() + peer_ids = get_peer_ids(node_a, node_b.index) if not peer_ids: self.log.warning("disconnect_nodes: {} and {} were not connected".format( - from_connection.index, - node_num, + node_a.index, + node_b.index, )) return for peer_id in peer_ids: try: - from_connection.disconnectnode(nodeid=peer_id) + node_a.disconnectnode(nodeid=peer_id) except JSONRPCException as e: # If this node is disconnected between calculating the peer id # and issuing the disconnect, don't worry about it. @@ -739,9 +758,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass): raise # wait to disconnect - wait_until_helper(lambda: not get_peer_ids(), timeout=5) + self.wait_until(lambda: not get_peer_ids(node_a, node_b.index), timeout=5) + self.wait_until(lambda: not get_peer_ids(node_b, node_a.index), timeout=5) - disconnect_nodes_helper(self.nodes[a], b) + disconnect_nodes_helper(self.nodes[a], self.nodes[b]) def isolate_node(self, node_num, timeout=5): self.nodes[node_num].setnetworkactive(False) diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index dbdeb255de..49c96cd779 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -103,7 +103,7 @@ class TestNode(): "-debug", "-debugexclude=libevent", "-debugexclude=leveldb", - "-uacomment=testnode%d" % i, + "-uacomment=testnode%d" % i, # required for subversion uniqueness across peers ] if self.mocktime != 0: self.args.append(f"-mocktime={mocktime}") @@ -574,7 +574,7 @@ class TestNode(): def add_outbound_p2p_connection(self, p2p_conn, *, p2p_idx, connection_type="outbound-full-relay", **kwargs): """Add an outbound p2p connection from node. Must be an - "outbound-full-relay", "block-relay-only" or "addr-fetch" connection. + "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. This method adds the p2p connection to the self.p2ps list and returns the connection to the caller. @@ -586,11 +586,16 @@ class TestNode(): p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, **kwargs)() - p2p_conn.wait_for_connect() - self.p2ps.append(p2p_conn) + if connection_type == "feeler": + # feeler connections are closed as soon as the node receives a `version` message + p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False) + p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False) + else: + p2p_conn.wait_for_connect() + self.p2ps.append(p2p_conn) - p2p_conn.wait_for_verack() - p2p_conn.sync_with_ping() + p2p_conn.wait_for_verack() + p2p_conn.sync_with_ping() return p2p_conn diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 3577e98a20..d014fd2dda 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -13,6 +13,7 @@ import inspect import json import logging import os +import random import shutil import re import time @@ -274,6 +275,11 @@ def sha256sum_file(filename): d = f.read(4096) return h.digest() +# TODO: Remove and use random.randbytes(n) instead, available in Python 3.9 +def random_bytes(n): + """Return a random bytes object of length n.""" + return bytes(random.getrandbits(8) for i in range(n)) + # RPC/P2P connection constants and functions ############################################ diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index d973f61e22..1ad6cc54eb 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -259,6 +259,7 @@ BASE_SCRIPTS = [ 'p2p_leak_tx.py', 'p2p_eviction.py', 'p2p_ibd_stalling.py', + 'p2p_net_deadlock.py', 'rpc_signmessage.py', 'rpc_generateblock.py', 'rpc_generate.py',