merge bitcoin#19315: Allow outbound & block-relay-only connections in functional tests

This commit is contained in:
Kittywhiskers Van Gogh 2024-03-28 03:21:36 +00:00
parent b76e029e44
commit 1d4f10a378
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
9 changed files with 365 additions and 53 deletions

View File

@ -1274,6 +1274,27 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
RandAddEvent((uint32_t)id); RandAddEvent((uint32_t)id);
} }
bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type)
{
if (conn_type != ConnectionType::OUTBOUND_FULL_RELAY && conn_type != ConnectionType::BLOCK_RELAY) return false;
const int max_connections = conn_type == ConnectionType::OUTBOUND_FULL_RELAY ? m_max_outbound_full_relay : m_max_outbound_block_relay;
// Count existing connections
int existing_connections = WITH_LOCK(cs_vNodes,
return std::count_if(vNodes.begin(), vNodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
// Max connections of specified type already exist
if (existing_connections >= max_connections) return false;
// Max total outbound connections already exist
CSemaphoreGrant grant(*semOutbound, true);
if (!grant) return false;
OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
return true;
}
void CConnman::DisconnectNodes() void CConnman::DisconnectNodes()
{ {
{ {

View File

@ -1227,6 +1227,19 @@ public:
bool RemoveAddedNode(const std::string& node); bool RemoveAddedNode(const std::string& node);
std::vector<AddedNodeInfo> GetAddedNodeInfo(); std::vector<AddedNodeInfo> GetAddedNodeInfo();
/**
* Attempts to open a connection. Currently only used from tests.
*
* @param[in] address Address of node to try connecting to
* @param[in] conn_type ConnectionType::OUTBOUND or ConnectionType::BLOCK_RELAY
* @return bool Returns false if there are no available
* slots for this connection:
* - conn_type not a supported ConnectionType
* - Max total outbound connection capacity filled
* - Max connection capacity for type is filled
*/
bool AddConnection(const std::string& address, ConnectionType conn_type);
bool AddPendingMasternode(const uint256& proTxHash); bool AddPendingMasternode(const uint256& proTxHash);
void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes); void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes); void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);

View File

@ -6,6 +6,7 @@
#include <rpc/server.h> #include <rpc/server.h>
#include <banman.h> #include <banman.h>
#include <chainparams.h>
#include <clientversion.h> #include <clientversion.h>
#include <core_io.h> #include <core_io.h>
#include <net.h> #include <net.h>
@ -324,6 +325,61 @@ static RPCHelpMan addnode()
}; };
} }
static RPCHelpMan addconnection()
{
return RPCHelpMan{"addconnection",
"\nOpen an outbound connection to a specified node. This RPC is for testing only.\n",
{
{"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open, either \"outbound-full-relay\" or \"block-relay-only\"."},
},
RPCResult{
RPCResult::Type::OBJ, "", "",
{
{ RPCResult::Type::STR, "address", "Address of newly added connection." },
{ RPCResult::Type::STR, "connection_type", "Type of connection opened." },
}},
RPCExamples{
HelpExampleCli("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\"")
+ HelpExampleRpc("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\"")
},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
if (Params().NetworkIDString() != CBaseChainParams::REGTEST) {
throw std::runtime_error("addconnection is for regression testing (-regtest mode) only.");
}
RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VSTR});
const std::string address = request.params[0].get_str();
const std::string conn_type_in{TrimString(request.params[1].get_str())};
ConnectionType conn_type{};
if (conn_type_in == "outbound-full-relay") {
conn_type = ConnectionType::OUTBOUND_FULL_RELAY;
} else if (conn_type_in == "block-relay-only") {
conn_type = ConnectionType::BLOCK_RELAY;
} else {
throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString());
}
NodeContext& node = EnsureAnyNodeContext(request.context);
if (!node.connman) {
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled.");
}
const bool success = node.connman->AddConnection(address, conn_type);
if (!success) {
throw JSONRPCError(RPC_CLIENT_NODE_CAPACITY_REACHED, "Error: Already at capacity for specified connection type.");
}
UniValue info(UniValue::VOBJ);
info.pushKV("address", address);
info.pushKV("connection_type", conn_type_in);
return info;
},
};
}
static RPCHelpMan disconnectnode() static RPCHelpMan disconnectnode()
{ {
return RPCHelpMan{"disconnectnode", return RPCHelpMan{"disconnectnode",
@ -967,6 +1023,8 @@ static const CRPCCommand commands[] =
{ "network", "cleardiscouraged", &cleardiscouraged, {} }, { "network", "cleardiscouraged", &cleardiscouraged, {} },
{ "network", "setnetworkactive", &setnetworkactive, {"state"} }, { "network", "setnetworkactive", &setnetworkactive, {"state"} },
{ "network", "getnodeaddresses", &getnodeaddresses, {"count"} }, { "network", "getnodeaddresses", &getnodeaddresses, {"count"} },
{ "hidden", "addconnection", &addconnection, {"address", "connection_type"} },
{ "hidden", "addpeeraddress", &addpeeraddress, {"address", "port"} }, { "hidden", "addpeeraddress", &addpeeraddress, {"address", "port"} },
}; };
// clang-format on // clang-format on

View File

@ -63,6 +63,7 @@ enum RPCErrorCode
RPC_CLIENT_NODE_NOT_CONNECTED = -29, //!< Node to disconnect not found in connected nodes RPC_CLIENT_NODE_NOT_CONNECTED = -29, //!< Node to disconnect not found in connected nodes
RPC_CLIENT_INVALID_IP_OR_SUBNET = -30, //!< Invalid IP/Subnet RPC_CLIENT_INVALID_IP_OR_SUBNET = -30, //!< Invalid IP/Subnet
RPC_CLIENT_P2P_DISABLED = -31, //!< No valid connection manager instance found RPC_CLIENT_P2P_DISABLED = -31, //!< No valid connection manager instance found
RPC_CLIENT_NODE_CAPACITY_REACHED= -34, //!< Max number of outbound or block-relay connections already open
//! Chain errors //! Chain errors
RPC_CLIENT_MEMPOOL_DISABLED = -33, //!< No mempool instance found RPC_CLIENT_MEMPOOL_DISABLED = -33, //!< No mempool instance found

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python3
# Copyright (c) 2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test add_outbound_p2p_connection test framework functionality"""
from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
def check_node_connections(*, node, num_in, num_out):
info = node.getnetworkinfo()
assert_equal(info["connections_in"], num_in)
assert_equal(info["connections_out"], num_out)
class P2PAddConnections(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = False
self.num_nodes = 2
def setup_network(self):
self.setup_nodes()
# Don't connect the nodes
def run_test(self):
self.log.info("Add 8 outbounds to node 0")
for i in range(8):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i, connection_type="outbound-full-relay")
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# set p2p_idx based on the outbound connections already open to the
# node, so add 8 to account for the previous full-relay connections
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 8, connection_type="block-relay-only")
self.log.info("Add 2 block-relay-only connections to node 1")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
self.nodes[1].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i, connection_type="block-relay-only")
self.log.info("Add 5 inbound connections to node 1")
for i in range(5):
self.log.info(f"inbound: {i}")
self.nodes[1].add_p2p_connection(P2PInterface())
self.log.info("Add 8 outbounds to node 1")
for i in range(8):
self.log.info(f"outbound: {i}")
# bump p2p_idx to account for the 2 existing outbounds on node 1
self.nodes[1].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 2)
self.log.info("Check the connections opened as expected")
check_node_connections(node=self.nodes[0], num_in=0, num_out=10)
check_node_connections(node=self.nodes[1], num_in=5, num_out=10)
self.log.info("Disconnect p2p connections & try to re-open")
self.nodes[0].disconnect_p2ps()
check_node_connections(node=self.nodes[0], num_in=0, num_out=0)
self.log.info("Add 8 outbounds to node 0")
for i in range(8):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i)
check_node_connections(node=self.nodes[0], num_in=0, num_out=8)
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# bump p2p_idx to account for the 8 existing outbounds on node 0
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 8, connection_type="block-relay-only")
check_node_connections(node=self.nodes[0], num_in=0, num_out=10)
self.log.info("Restart node 0 and try to reconnect to p2ps")
self.restart_node(0)
self.log.info("Add 4 outbounds to node 0")
for i in range(4):
self.log.info(f"outbound: {i}")
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i)
check_node_connections(node=self.nodes[0], num_in=0, num_out=4)
self.log.info("Add 2 block-relay-only connections to node 0")
for i in range(2):
self.log.info(f"block-relay-only: {i}")
# bump p2p_idx to account for the 4 existing outbounds on node 0
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=i + 4, connection_type="block-relay-only")
check_node_connections(node=self.nodes[0], num_in=0, num_out=6)
check_node_connections(node=self.nodes[1], num_in=5, num_out=10)
if __name__ == '__main__':
P2PAddConnections().main()

View File

@ -2,10 +2,13 @@
# Copyright (c) 2019 The Bitcoin Core developers # Copyright (c) 2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying # Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php. # file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test p2p blocksonly""" """Test p2p blocksonly mode & block-relay-only connections."""
from test_framework.messages import msg_tx, tx_from_hex import time
from test_framework.p2p import P2PInterface
from test_framework.blocktools import create_transaction
from test_framework.messages import msg_tx
from test_framework.p2p import P2PInterface, P2PTxInvStore
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal from test_framework.util import assert_equal
@ -15,49 +18,30 @@ class P2PBlocksOnly(BitcoinTestFramework):
self.num_nodes = 1 self.num_nodes = 1
self.extra_args = [["-blocksonly"]] self.extra_args = [["-blocksonly"]]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self): def run_test(self):
block_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface()) self.blocksonly_mode_tests()
self.blocks_relay_conn_tests()
self.log.info('Check that txs from p2p are rejected and result in disconnect') def blocksonly_mode_tests(self):
prevtx = self.nodes[0].getblock(self.nodes[0].getblockhash(1), 2)['tx'][0] self.log.info("Tests with node running in -blocksonly mode")
rawtx = self.nodes[0].createrawtransaction(
inputs=[{
'txid': prevtx['txid'],
'vout': 0
}],
outputs=[{
self.nodes[0].get_deterministic_priv_key()[0]: 500 - 0.00125
}],
)
sigtx = self.nodes[0].signrawtransactionwithkey(
hexstring=rawtx,
privkeys=[self.nodes[0].get_deterministic_priv_key()[1]],
prevtxs=[{
'txid': prevtx['txid'],
'vout': 0,
'scriptPubKey': prevtx['vout'][0]['scriptPubKey']['hex'],
}],
)['hex']
assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], False) assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], False)
with self.nodes[0].assert_debug_log(['tx sent in violation of protocol peer=0']):
block_relay_peer.send_message(msg_tx(tx_from_hex(sigtx)))
block_relay_peer.wait_for_disconnect()
assert_equal(self.nodes[0].getmempoolinfo()['size'], 0)
# Remove the disconnected peer and add a new one. self.nodes[0].add_p2p_connection(P2PInterface())
del self.nodes[0].p2ps[0] tx, txid, tx_hex = self.check_p2p_tx_violation()
tx_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface())
self.log.info('Check that txs from rpc are not rejected and relayed to other peers') self.log.info('Check that txs from rpc are not rejected and relayed to other peers')
tx_relay_peer = self.nodes[0].add_p2p_connection(P2PInterface())
assert_equal(self.nodes[0].getpeerinfo()[0]['relaytxes'], True) assert_equal(self.nodes[0].getpeerinfo()[0]['relaytxes'], True)
txid = self.nodes[0].testmempoolaccept([sigtx])[0]['txid'] assert_equal(self.nodes[0].testmempoolaccept([tx_hex])[0]['allowed'], True)
with self.nodes[0].assert_debug_log(['received getdata for: tx {} peer=1'.format(txid)]): with self.nodes[0].assert_debug_log(['received getdata for: tx {} peer=1'.format(txid)]):
self.nodes[0].sendrawtransaction(sigtx) self.nodes[0].sendrawtransaction(tx_hex)
self.bump_mocktime(60) self.bump_mocktime(60)
tx_relay_peer.wait_for_tx(txid) tx_relay_peer.wait_for_tx(txid)
assert_equal(self.nodes[0].getmempoolinfo()['size'], 1) assert_equal(self.nodes[0].getmempoolinfo()['size'], 1)
self.log.info('Check that txs from peers with relay-permission are not rejected and relayed to others')
self.log.info("Restarting node 0 with relay permission and blocksonly") self.log.info("Restarting node 0 with relay permission and blocksonly")
self.restart_node(0, ["-persistmempool=0", "-whitelist=relay@127.0.0.1", "-blocksonly"]) self.restart_node(0, ["-persistmempool=0", "-whitelist=relay@127.0.0.1", "-blocksonly"])
assert_equal(self.nodes[0].getrawmempool(), []) assert_equal(self.nodes[0].getrawmempool(), [])
@ -67,8 +51,7 @@ class P2PBlocksOnly(BitcoinTestFramework):
assert_equal(peer_1_info['permissions'], ['relay']) assert_equal(peer_1_info['permissions'], ['relay'])
peer_2_info = self.nodes[0].getpeerinfo()[1] peer_2_info = self.nodes[0].getpeerinfo()[1]
assert_equal(peer_2_info['permissions'], ['relay']) assert_equal(peer_2_info['permissions'], ['relay'])
assert_equal(self.nodes[0].testmempoolaccept([sigtx])[0]['allowed'], True) assert_equal(self.nodes[0].testmempoolaccept([tx_hex])[0]['allowed'], True)
txid = self.nodes[0].testmempoolaccept([sigtx])[0]['txid']
self.log.info('Check that the tx from first_peer with relay-permission is relayed to others (ie.second_peer)') self.log.info('Check that the tx from first_peer with relay-permission is relayed to others (ie.second_peer)')
with self.nodes[0].assert_debug_log(["received getdata"]): with self.nodes[0].assert_debug_log(["received getdata"]):
@ -78,7 +61,7 @@ class P2PBlocksOnly(BitcoinTestFramework):
# But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to # But if, for some reason, first_peer decides to relay transactions to us anyway, we should relay them to
# second_peer since we gave relay permission to first_peer. # second_peer since we gave relay permission to first_peer.
# See https://github.com/bitcoin/bitcoin/issues/19943 for details. # See https://github.com/bitcoin/bitcoin/issues/19943 for details.
first_peer.send_message(msg_tx(tx_from_hex(sigtx))) first_peer.send_message(msg_tx(tx))
self.log.info('Check that the peer with relay-permission is still connected after sending the transaction') self.log.info('Check that the peer with relay-permission is still connected after sending the transaction')
assert_equal(first_peer.is_connected, True) assert_equal(first_peer.is_connected, True)
self.bump_mocktime(60) self.bump_mocktime(60)
@ -86,6 +69,51 @@ class P2PBlocksOnly(BitcoinTestFramework):
assert_equal(self.nodes[0].getmempoolinfo()['size'], 1) assert_equal(self.nodes[0].getmempoolinfo()['size'], 1)
self.log.info("Relay-permission peer's transaction is accepted and relayed") self.log.info("Relay-permission peer's transaction is accepted and relayed")
self.nodes[0].disconnect_p2ps()
self.nodes[0].generate(1)
def blocks_relay_conn_tests(self):
self.log.info('Tests with node in normal mode with block-relay-only connections')
self.restart_node(0, ["-noblocksonly"]) # disables blocks only mode
assert_equal(self.nodes[0].getnetworkinfo()['localrelay'], True)
# Ensure we disconnect if a block-relay-only connection sends us a transaction
self.nodes[0].add_outbound_p2p_connection(P2PInterface(), p2p_idx=0, connection_type="block-relay-only")
assert_equal(self.nodes[0].getpeerinfo()[0]['relaytxes'], False)
_, txid, tx_hex = self.check_p2p_tx_violation(index=2)
self.log.info("Check that txs from RPC are not sent to blockrelay connection")
conn = self.nodes[0].add_outbound_p2p_connection(P2PTxInvStore(), p2p_idx=1, connection_type="block-relay-only")
self.nodes[0].sendrawtransaction(tx_hex)
# Bump time forward to ensure nNextInvSend timer pops
self.nodes[0].setmocktime(int(time.time()) + 60)
# Calling sync_with_ping twice requires that the node calls
# `ProcessMessage` twice, and thus ensures `SendMessages` must have
# been called at least once
conn.sync_with_ping()
conn.sync_with_ping()
assert(int(txid, 16) not in conn.get_invs())
def check_p2p_tx_violation(self, index=1):
self.log.info('Check that txs from P2P are rejected and result in disconnect')
input_txid = self.nodes[0].getblock(self.nodes[0].getblockhash(index), 2)['tx'][0]['txid']
tx = create_transaction(self.nodes[0], input_txid, self.nodes[0].getnewaddress(), amount=(500 - 0.001))
txid = tx.rehash()
tx_hex = tx.serialize().hex()
with self.nodes[0].assert_debug_log(['tx sent in violation of protocol peer=0']):
self.nodes[0].p2ps[0].send_message(msg_tx(tx))
self.nodes[0].p2ps[0].wait_for_disconnect()
assert_equal(self.nodes[0].getmempoolinfo()['size'], 0)
# Remove the disconnected peer
del self.nodes[0].p2ps[0]
return tx, txid, tx_hex
if __name__ == '__main__': if __name__ == '__main__':
P2PBlocksOnly().main() P2PBlocksOnly().main()

View File

@ -82,7 +82,11 @@ from test_framework.messages import (
NODE_NETWORK, NODE_NETWORK,
sha256, sha256,
) )
from test_framework.util import wait_until_helper from test_framework.util import (
MAX_NODES,
p2p_port,
wait_until_helper,
)
logger = logging.getLogger("TestFramework.p2p") logger = logging.getLogger("TestFramework.p2p")
@ -168,7 +172,7 @@ class P2PConnection(asyncio.Protocol):
def is_connected(self): def is_connected(self):
return self._transport is not None return self._transport is not None
def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, uacomment=None): def peer_connect_helper(self, dstaddr, dstport, net, timeout_factor, uacomment):
assert not self.is_connected assert not self.is_connected
self.timeout_factor = timeout_factor self.timeout_factor = timeout_factor
self.dstaddr = dstaddr self.dstaddr = dstaddr
@ -190,12 +194,19 @@ class P2PConnection(asyncio.Protocol):
else: else:
self.strSubVer = MY_SUBVERSION % b"" self.strSubVer = MY_SUBVERSION % b""
logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport)) def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, uacomment=None):
self.peer_connect_helper(dstaddr, dstport, net, timeout_factor, uacomment)
loop = NetworkThread.network_event_loop loop = NetworkThread.network_event_loop
conn_gen_unsafe = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport) logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport))
conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe) coroutine = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport)
return conn_gen return lambda: loop.call_soon_threadsafe(loop.create_task, coroutine)
def peer_accept_connection(self, connect_id, connect_cb=lambda: None, *, net, timeout_factor, uacomment=None):
self.peer_connect_helper('0', 0, net, timeout_factor, uacomment)
logger.debug('Listening for Dash Node with id: {}'.format(connect_id))
return lambda: NetworkThread.listen(self, connect_cb, idx=connect_id)
def peer_disconnect(self): def peer_disconnect(self):
# Connection could have already been closed by other end. # Connection could have already been closed by other end.
@ -354,19 +365,28 @@ class P2PInterface(P2PConnection):
self.support_addrv2 = support_addrv2 self.support_addrv2 = support_addrv2
def peer_connect_send_version(self, services):
# Send a version msg
vt = msg_version()
vt.nServices = services
vt.addrTo.ip = self.dstaddr
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
vt.strSubVer = self.strSubVer
self.on_connection_send_msg = vt # Will be sent soon after connection_made
def peer_connect(self, *args, services=NODE_NETWORK | NODE_HEADERS_COMPRESSED, send_version=True, **kwargs): def peer_connect(self, *args, services=NODE_NETWORK | NODE_HEADERS_COMPRESSED, send_version=True, **kwargs):
create_conn = super().peer_connect(*args, **kwargs) create_conn = super().peer_connect(*args, **kwargs)
if send_version: if send_version:
# Send a version msg self.peer_connect_send_version(services)
vt = msg_version()
vt.nServices = services return create_conn
vt.addrTo.ip = self.dstaddr
vt.addrTo.port = self.dstport def peer_accept_connection(self, *args, services=NODE_NETWORK | NODE_HEADERS_COMPRESSED, **kwargs):
vt.addrFrom.ip = "0.0.0.0" create_conn = super().peer_accept_connection(*args, **kwargs)
vt.addrFrom.port = 0 self.peer_connect_send_version(services)
vt.strSubVer = self.strSubVer
self.on_connection_send_msg = vt # Will be sent soon after connection_made
return create_conn return create_conn
@ -465,6 +485,10 @@ class P2PInterface(P2PConnection):
wait_until_helper(test_function, timeout=timeout, lock=p2p_lock, timeout_factor=self.timeout_factor) wait_until_helper(test_function, timeout=timeout, lock=p2p_lock, timeout_factor=self.timeout_factor)
def wait_for_connect(self, timeout=60):
test_function = lambda: self.is_connected
wait_until_helper(test_function, timeout=timeout, lock=p2p_lock)
def wait_for_disconnect(self, timeout=60): def wait_for_disconnect(self, timeout=60):
test_function = lambda: not self.is_connected test_function = lambda: not self.is_connected
self.wait_until(test_function, timeout=timeout, check_connected=False) self.wait_until(test_function, timeout=timeout, check_connected=False)
@ -580,6 +604,8 @@ class NetworkThread(threading.Thread):
# There is only one event loop and no more than one thread must be created # There is only one event loop and no more than one thread must be created
assert not self.network_event_loop assert not self.network_event_loop
NetworkThread.listeners = {}
NetworkThread.protos = {}
NetworkThread.network_event_loop = asyncio.new_event_loop() NetworkThread.network_event_loop = asyncio.new_event_loop()
def run(self): def run(self):
@ -595,6 +621,48 @@ class NetworkThread(threading.Thread):
# Safe to remove event loop. # Safe to remove event loop.
NetworkThread.network_event_loop = None NetworkThread.network_event_loop = None
@classmethod
def listen(cls, p2p, callback, port=None, addr=None, idx=1):
""" Ensure a listening server is running on the given port, and run the
protocol specified by `p2p` on the next connection to it. Once ready
for connections, call `callback`."""
if port is None:
assert 0 < idx <= MAX_NODES
port = p2p_port(MAX_NODES - idx)
if addr is None:
addr = '127.0.0.1'
coroutine = cls.create_listen_server(addr, port, callback, p2p)
cls.network_event_loop.call_soon_threadsafe(cls.network_event_loop.create_task, coroutine)
@classmethod
async def create_listen_server(cls, addr, port, callback, proto):
def peer_protocol():
"""Returns a function that does the protocol handling for a new
connection. To allow different connections to have different
behaviors, the protocol function is first put in the cls.protos
dict. When the connection is made, the function removes the
protocol function from that dict, and returns it so the event loop
can start executing it."""
response = cls.protos.get((addr, port))
cls.protos[(addr, port)] = None
return response
if (addr, port) not in cls.listeners:
# When creating a listener on a given (addr, port) we only need to
# do it once. If we want different behaviors for different
# connections, we can accomplish this by providing different
# `proto` functions
listener = await cls.network_event_loop.create_server(peer_protocol, addr, port)
logger.debug("Listening server on %s:%d should be started" % (addr, port))
cls.listeners[(addr, port)] = listener
cls.protos[(addr, port)] = proto
callback(addr, port)
class P2PDataStore(P2PInterface): class P2PDataStore(P2PInterface):
"""A P2P data store class. """A P2P data store class.

View File

@ -72,6 +72,7 @@ class TestNode():
""" """
self.index = i self.index = i
self.p2p_conn_index = 1
self.datadir = datadir self.datadir = datadir
self.chain = chain self.chain = chain
self.bitcoinconf = os.path.join(self.datadir, "dash.conf") self.bitcoinconf = os.path.join(self.datadir, "dash.conf")
@ -537,7 +538,7 @@ class TestNode():
self._raise_assertion_error(assert_msg) self._raise_assertion_error(assert_msg)
def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs): def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs):
"""Add a p2p connection to the node. """Add an inbound p2p connection to the node.
This method adds the p2p connection to the self.p2ps list and also This method adds the p2p connection to the self.p2ps list and also
returns the connection to the caller.""" returns the connection to the caller."""
@ -566,6 +567,29 @@ class TestNode():
return p2p_conn return p2p_conn
def add_outbound_p2p_connection(self, p2p_conn, *, p2p_idx, connection_type="outbound-full-relay", **kwargs):
"""Add an outbound p2p connection from node. Either
full-relay("outbound-full-relay") or
block-relay-only("block-relay-only") connection.
This method adds the p2p connection to the self.p2ps list and returns
the connection to the caller.
"""
def addconnection_callback(address, port):
self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type))
self.addconnection('%s:%d' % (address, port), connection_type)
p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, **kwargs)()
p2p_conn.wait_for_connect()
self.p2ps.append(p2p_conn)
p2p_conn.wait_for_verack()
p2p_conn.sync_with_ping()
return p2p_conn
def num_test_p2p_connections(self): def num_test_p2p_connections(self):
"""Return number of test framework p2p connections to the node.""" """Return number of test framework p2p connections to the node."""
return len([peer for peer in self.getpeerinfo() if peer['subver'] == MY_SUBVERSION.decode("utf-8")]) return len([peer for peer in self.getpeerinfo() if peer['subver'] == MY_SUBVERSION.decode("utf-8")])
@ -585,6 +609,7 @@ class TestNode():
wait_until_helper(check_peers, timeout=5) wait_until_helper(check_peers, timeout=5)
del self.p2ps[:] del self.p2ps[:]
wait_until_helper(lambda: self.num_test_p2p_connections() == 0, timeout_factor=self.timeout_factor) wait_until_helper(lambda: self.num_test_p2p_connections() == 0, timeout_factor=self.timeout_factor)

View File

@ -306,6 +306,7 @@ BASE_SCRIPTS = [
'feature_filelock.py', 'feature_filelock.py',
'feature_loadblock.py', 'feature_loadblock.py',
'p2p_dos_header_tree.py', 'p2p_dos_header_tree.py',
'p2p_add_connections.py',
'p2p_blockfilters.py', 'p2p_blockfilters.py',
'p2p_message_capture.py', 'p2p_message_capture.py',
'feature_asmap.py', 'feature_asmap.py',