merge bitcoin#24748: functional tests for v2 P2P encryption

This commit is contained in:
Kittywhiskers Van Gogh 2024-10-24 14:23:01 +00:00
parent 32500f2acd
commit 6b2a8b5988
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
16 changed files with 809 additions and 72 deletions

View File

@ -2131,7 +2131,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
RandAddEvent((uint32_t)id); RandAddEvent((uint32_t)id);
} }
bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type) bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport = false)
{ {
AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_unused_i2p_sessions_mutex);
std::optional<int> max_connections; std::optional<int> max_connections;
@ -2164,7 +2164,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
CSemaphoreGrant grant(*semOutbound, true); CSemaphoreGrant grant(*semOutbound, true);
if (!grant) return false; if (!grant) return false;
OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false); OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/use_v2transport);
return true; return true;
} }
@ -3783,7 +3783,7 @@ void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman,
mn_metaman.GetMetaInfo(connectToDmn->proTxHash)->SetLastOutboundAttempt(nANow); mn_metaman.GetMetaInfo(connectToDmn->proTxHash)->SetLastOutboundAttempt(nANow);
OpenMasternodeConnection(CAddress(connectToDmn->pdmnState->addr, NODE_NETWORK), isProbe); OpenMasternodeConnection(CAddress(connectToDmn->pdmnState->addr, NODE_NETWORK), /*use_v2transport=*/GetLocalServices() & NODE_P2P_V2, isProbe);
// should be in the list now if connection was opened // should be in the list now if connection was opened
bool connected = ForNode(connectToDmn->pdmnState->addr, CConnman::AllNodes, [&](CNode* pnode) { bool connected = ForNode(connectToDmn->pdmnState->addr, CConnman::AllNodes, [&](CNode* pnode) {
if (pnode->fDisconnect) { if (pnode->fDisconnect) {
@ -3893,9 +3893,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
} }
} }
void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeProbeConn probe) { void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, bool use_v2transport, MasternodeProbeConn probe) {
OpenNetworkConnection(addrConnect, false, {}, /*strDest=*/nullptr, ConnectionType::OUTBOUND_FULL_RELAY, OpenNetworkConnection(addrConnect, false, {}, /*strDest=*/nullptr, ConnectionType::OUTBOUND_FULL_RELAY,
/*use_v2transport=*/false, MasternodeConn::IsConnection, probe); use_v2transport, MasternodeConn::IsConnection, probe);
} }
Mutex NetEventsInterface::g_msgproc_mutex; Mutex NetEventsInterface::g_msgproc_mutex;

View File

@ -1283,7 +1283,7 @@ public:
MasternodeConn masternode_connection = MasternodeConn::IsNotConnection, MasternodeConn masternode_connection = MasternodeConn::IsNotConnection,
MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection) MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc); EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc);
void OpenMasternodeConnection(const CAddress& addrConnect, MasternodeProbeConn probe = MasternodeProbeConn::IsConnection) void OpenMasternodeConnection(const CAddress& addrConnect, bool use_v2transport, MasternodeProbeConn probe = MasternodeProbeConn::IsConnection)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc); EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc);
bool CheckIncomingNonce(uint64_t nonce); bool CheckIncomingNonce(uint64_t nonce);
@ -1476,13 +1476,14 @@ public:
* @param[in] address Address of node to try connecting to * @param[in] address Address of node to try connecting to
* @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY, * @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY,
* ConnectionType::ADDR_FETCH or ConnectionType::FEELER * ConnectionType::ADDR_FETCH or ConnectionType::FEELER
* @param[in] use_v2transport Set to true if node attempts to connect using BIP 324 v2 transport protocol.
* @return bool Returns false if there are no available * @return bool Returns false if there are no available
* slots for this connection: * slots for this connection:
* - conn_type not a supported ConnectionType * - conn_type not a supported ConnectionType
* - Max total outbound connection capacity filled * - Max total outbound connection capacity filled
* - Max connection capacity for type is filled * - Max connection capacity for type is filled
*/ */
bool AddConnection(const std::string& address, ConnectionType conn_type) bool AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc); EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc);
bool AddPendingMasternode(const uint256& proTxHash); bool AddPendingMasternode(const uint256& proTxHash);

View File

@ -233,6 +233,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "sendmsgtopeer", 0, "peer_id" }, { "sendmsgtopeer", 0, "peer_id" },
{ "stop", 0, "wait" }, { "stop", 0, "wait" },
{ "addnode", 2, "v2transport" }, { "addnode", 2, "v2transport" },
{ "addconnection", 2, "v2transport" },
{ "verifychainlock", 2, "blockHeight" }, { "verifychainlock", 2, "blockHeight" },
{ "verifyislock", 3, "maxHeight" }, { "verifyislock", 3, "maxHeight" },
{ "submitchainlock", 2, "blockHeight" }, { "submitchainlock", 2, "blockHeight" },

View File

@ -38,6 +38,7 @@ static RPCHelpMan masternode_connect()
"Connect to given masternode\n", "Connect to given masternode\n",
{ {
{"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The address of the masternode to connect"}, {"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The address of the masternode to connect"},
{"v2transport", RPCArg::Type::BOOL, RPCArg::Default{false}, "Attempt to connect using BIP324 v2 transport protocol"},
}, },
RPCResults{}, RPCResults{},
RPCExamples{""}, RPCExamples{""},
@ -50,12 +51,19 @@ static RPCHelpMan masternode_connect()
throw JSONRPCError(RPC_INTERNAL_ERROR, strprintf("Incorrect masternode address %s", strAddress)); throw JSONRPCError(RPC_INTERNAL_ERROR, strprintf("Incorrect masternode address %s", strAddress));
} }
bool use_v2transport = !request.params[1].isNull() && ParseBoolV(request.params[1], "v2transport");
const NodeContext& node = EnsureAnyNodeContext(request.context); const NodeContext& node = EnsureAnyNodeContext(request.context);
CConnman& connman = EnsureConnman(node); CConnman& connman = EnsureConnman(node);
connman.OpenMasternodeConnection(CAddress(addr.value(), NODE_NETWORK)); if (use_v2transport && !(connman.GetLocalServices() & NODE_P2P_V2)) {
if (!connman.IsConnected(CAddress(addr.value(), NODE_NETWORK), CConnman::AllNodes)) throw JSONRPCError(RPC_INVALID_PARAMETER, "Error: Adding v2transport connections requires -v2transport init flag to be set.");
}
connman.OpenMasternodeConnection(CAddress(addr.value(), NODE_NETWORK), use_v2transport);
if (!connman.IsConnected(CAddress(addr.value(), NODE_NETWORK), CConnman::AllNodes)) {
throw JSONRPCError(RPC_INTERNAL_ERROR, strprintf("Couldn't connect to masternode %s", strAddress)); throw JSONRPCError(RPC_INTERNAL_ERROR, strprintf("Couldn't connect to masternode %s", strAddress));
}
return "successfully connected"; return "successfully connected";
}, },

View File

@ -371,6 +371,7 @@ static RPCHelpMan addconnection()
{ {
{"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."}, {"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."}, {"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."},
{"v2transport", RPCArg::Type::BOOL, RPCArg::Default{false}, "Attempt to connect using BIP324 v2 transport protocol"},
}, },
RPCResult{ RPCResult{
RPCResult::Type::OBJ, "", "", RPCResult::Type::OBJ, "", "",
@ -379,8 +380,8 @@ static RPCHelpMan addconnection()
{ RPCResult::Type::STR, "connection_type", "Type of connection opened." }, { RPCResult::Type::STR, "connection_type", "Type of connection opened." },
}}, }},
RPCExamples{ RPCExamples{
HelpExampleCli("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\"") HelpExampleCli("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\" true")
+ HelpExampleRpc("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\"") + HelpExampleRpc("addconnection", "\"192.168.0.6:8333\" \"outbound-full-relay\" true")
}, },
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{ {
@ -403,11 +404,16 @@ static RPCHelpMan addconnection()
} else { } else {
throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString()); throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString());
} }
bool use_v2transport = !request.params[2].isNull() && request.params[2].get_bool();
NodeContext& node = EnsureAnyNodeContext(request.context); NodeContext& node = EnsureAnyNodeContext(request.context);
CConnman& connman = EnsureConnman(node); CConnman& connman = EnsureConnman(node);
const bool success = connman.AddConnection(address, conn_type); if (use_v2transport && !(connman.GetLocalServices() & NODE_P2P_V2)) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Error: Adding v2transport connections requires -v2transport init flag to be set.");
}
const bool success = connman.AddConnection(address, conn_type, use_v2transport);
if (!success) { if (!success) {
throw JSONRPCError(RPC_CLIENT_NODE_CAPACITY_REACHED, "Error: Already at capacity for specified connection type."); throw JSONRPCError(RPC_CLIENT_NODE_CAPACITY_REACHED, "Error: Already at capacity for specified connection type.");
} }

View File

@ -8,8 +8,7 @@ import os
import re import re
import struct import struct
from test_framework.messages import ser_uint256, hash256 from test_framework.messages import ser_uint256, hash256, MAGIC_BYTES
from test_framework.p2p import MAGIC_BYTES
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.test_node import ErrorMatch from test_framework.test_node import ErrorMatch
from test_framework.util import assert_equal from test_framework.util import assert_equal

View File

@ -12,7 +12,7 @@
import os import os
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.p2p import MAGIC_BYTES from test_framework.messages import MAGIC_BYTES
from test_framework.util import assert_equal from test_framework.util import assert_equal

View File

@ -0,0 +1,87 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
import random
from test_framework.test_framework import BitcoinTestFramework
from test_framework.crypto.ellswift import ellswift_create
from test_framework.util import random_bytes
from test_framework.p2p import P2PInterface
from test_framework.v2_p2p import EncryptedP2PState
class TestEncryptedP2PState(EncryptedP2PState):
""" Modify v2 P2P protocol functions for testing that "The responder waits until one byte is received which does
not match the 16 bytes consisting of the network magic followed by "version\x00\x00\x00\x00\x00"." (see BIP 324)
- if `send_net_magic` is True, send first 4 bytes of ellswift (match network magic) else send remaining 60 bytes
- `can_data_be_received` is a variable used to assert if data is received on recvbuf.
- v2 TestNode shouldn't respond back if we send V1_PREFIX and data shouldn't be received on recvbuf.
This state is represented using `can_data_be_received` = False.
- v2 TestNode responds back when mismatch from V1_PREFIX happens and data can be received on recvbuf.
This state is represented using `can_data_be_received` = True.
"""
def __init__(self):
super().__init__(initiating=True, net='regtest')
self.send_net_magic = True
self.can_data_be_received = False
def initiate_v2_handshake(self, garbage_len=random.randrange(4096)):
"""Initiator begins the v2 handshake by sending its ellswift bytes and garbage.
Here, the 64 bytes ellswift is assumed to have it's 4 bytes match network magic bytes. It is sent in 2 phases:
1. when `send_network_magic` = True, send first 4 bytes of ellswift (matches network magic bytes)
2. when `send_network_magic` = False, send remaining 60 bytes of ellswift
"""
if self.send_net_magic:
self.privkey_ours, self.ellswift_ours = ellswift_create()
self.sent_garbage = random_bytes(garbage_len)
self.send_net_magic = False
return b"\xfc\xc1\xb7\xdc"
else:
self.can_data_be_received = True
return self.ellswift_ours[4:] + self.sent_garbage
class PeerEarlyKey(P2PInterface):
"""Custom implementation of P2PInterface which uses modified v2 P2P protocol functions for testing purposes."""
def __init__(self):
super().__init__()
self.v2_state = None
def connection_made(self, transport):
"""64 bytes ellswift is sent in 2 parts during `initial_v2_handshake()`"""
self.v2_state = TestEncryptedP2PState()
super().connection_made(transport)
def data_received(self, t):
# check that data can be received on recvbuf only when mismatch from V1_PREFIX happens (send_net_magic = False)
assert self.v2_state.can_data_be_received and not self.v2_state.send_net_magic
class P2PEarlyKey(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.disable_mocktime = True
self.extra_args = [["-v2transport=1", "-peertimeout=3"]]
def run_test(self):
self.log.info('Sending ellswift bytes in parts to ensure that response from responder is received only when')
self.log.info('ellswift bytes have a mismatch from the 16 bytes(network magic followed by "version\\x00\\x00\\x00\\x00\\x00")')
node0 = self.nodes[0]
self.log.info('Sending first 4 bytes of ellswift which match network magic')
self.log.info('If a response is received, assertion failure would happen in our custom data_received() function')
# send happens in `initiate_v2_handshake()` in `connection_made()`
peer1 = node0.add_p2p_connection(PeerEarlyKey(), wait_for_verack=False, send_version=False, supports_v2_p2p=True)
self.log.info('Sending remaining ellswift and garbage which are different from V1_PREFIX. Since a response is')
self.log.info('expected now, our custom data_received() function wouldn\'t result in assertion failure')
ellswift_and_garbage_data = peer1.v2_state.initiate_v2_handshake()
peer1.send_raw_message(ellswift_and_garbage_data)
peer1.wait_for_disconnect(timeout=5)
self.log.info('successful disconnection when MITM happens in the key exchange phase')
if __name__ == '__main__':
P2PEarlyKey().main()

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test encrypted v2 p2p proposed in BIP 324
"""
from test_framework.blocktools import (
create_block,
create_coinbase,
)
from test_framework.p2p import (
P2PDataStore,
P2PInterface,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_greater_than,
check_node_connections,
)
from test_framework.crypto.chacha20 import REKEY_INTERVAL
class P2PEncrypted(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-v2transport=1"], ["-v2transport=1"]]
def setup_network(self):
self.setup_nodes()
def generate_blocks(self, node, number):
test_blocks = []
last_block = node.getbestblockhash()
tip = int(last_block, 16)
tipheight = node.getblockcount()
last_block_time = node.getblock(last_block)['time']
for _ in range(number):
# Create some blocks
block = create_block(tip, create_coinbase(tipheight + 1), last_block_time + 1)
block.solve()
test_blocks.append(block)
tip = block.sha256
tipheight += 1
last_block_time += 1
return test_blocks
def create_test_block(self, txs):
block = create_block(self.tip, create_coinbase(self.tipheight + 1), self.last_block_time + 600, txlist=txs)
block.solve()
return block
def run_test(self):
node0, node1 = self.nodes[0], self.nodes[1]
self.log.info("Check inbound connection to v2 TestNode from v2 P2PConnection is v2")
peer1 = node0.add_p2p_connection(P2PInterface(), wait_for_verack=True, supports_v2_p2p=True)
assert peer1.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v2")
self.log.info("Check inbound connection to v2 TestNode from v1 P2PConnection is v1")
peer2 = node0.add_p2p_connection(P2PInterface(), wait_for_verack=True, supports_v2_p2p=False)
assert not peer2.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v1")
self.log.info("Check outbound connection from v2 TestNode to v1 P2PConnection advertised as v1 is v1")
peer3 = node0.add_outbound_p2p_connection(P2PInterface(), p2p_idx=0, supports_v2_p2p=False, advertise_v2_p2p=False)
assert not peer3.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v1")
# v2 TestNode performs downgrading here
self.log.info("Check outbound connection from v2 TestNode to v1 P2PConnection advertised as v2 is v1")
peer4 = node0.add_outbound_p2p_connection(P2PInterface(), p2p_idx=1, supports_v2_p2p=False, advertise_v2_p2p=True)
assert not peer4.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v1")
self.log.info("Check outbound connection from v2 TestNode to v2 P2PConnection advertised as v2 is v2")
peer5 = node0.add_outbound_p2p_connection(P2PInterface(), p2p_idx=2, supports_v2_p2p=True, advertise_v2_p2p=True)
assert peer5.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v2")
self.log.info("Check if version is sent and verack is received in inbound/outbound connections")
assert_equal(len(node0.getpeerinfo()), 5) # check if above 5 connections are present in node0's getpeerinfo()
for peer in node0.getpeerinfo():
assert_greater_than(peer['bytessent_per_msg']['version'], 0)
assert_greater_than(peer['bytesrecv_per_msg']['verack'], 0)
self.log.info("Testing whether blocks propagate - check if tips sync when number of blocks >= REKEY_INTERVAL")
# tests whether rekeying (which happens every REKEY_INTERVAL packets) works correctly
test_blocks = self.generate_blocks(node0, REKEY_INTERVAL+1)
for i in range(2):
peer6 = node0.add_p2p_connection(P2PDataStore(), supports_v2_p2p=True)
assert peer6.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v2")
# Consider: node0 <-- peer6. node0 and node1 aren't connected here.
# Construct the following topology: node1 <--> node0 <-- peer6
# and test that blocks produced by peer6 will be received by node1 if sent normally
# and won't be received by node1 if sent as decoy messages
# First, check whether blocks produced be peer6 are received by node0 if sent normally
# and not received by node0 if sent as decoy messages.
if i:
# check that node0 receives blocks produced by peer6
self.log.info("Check if blocks produced by node0's p2p connection is received by node0")
peer6.send_blocks_and_test(test_blocks, node0, success=True) # node0's tip advances
else:
# check that node0 doesn't receive blocks produced by peer6 since they are sent as decoy messages
self.log.info("Check if blocks produced by node0's p2p connection sent as decoys aren't received by node0")
peer6.send_blocks_and_test(test_blocks, node0, success=False, is_decoy=True) # node0's tip doesn't advance
# Then, connect node0 and node1 using v2 and check whether the blocks are received by node1
self.connect_nodes(0, 1, peer_advertises_v2=True)
self.log.info("Wait for node1 to receive all the blocks from node0")
self.sync_all()
self.log.info("Make sure node0 and node1 have same block tips")
assert_equal(node0.getbestblockhash(), node1.getbestblockhash())
self.disconnect_nodes(0, 1)
self.log.info("Check the connections opened as expected")
check_node_connections(node=node0, num_in=4, num_out=3)
self.log.info("Check inbound connection to v1 TestNode from v2 P2PConnection is v1")
self.restart_node(0, ["-v2transport=0"])
peer1 = node0.add_p2p_connection(P2PInterface(), wait_for_verack=True, supports_v2_p2p=True)
assert not peer1.supports_v2_p2p
assert_equal(node0.getpeerinfo()[-1]["transport_protocol_type"], "v1")
check_node_connections(node=node0, num_in=1, num_out=0)
if __name__ == '__main__':
P2PEncrypted().main()

View File

@ -7,8 +7,7 @@ Test v2 transport
""" """
import socket import socket
from test_framework.messages import NODE_P2P_V2 from test_framework.messages import MAGIC_BYTES, NODE_P2P_V2
from test_framework.p2p import MAGIC_BYTES
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import ( from test_framework.util import (
assert_equal, assert_equal,

View File

@ -253,7 +253,10 @@ class NetTest(DashTestFramework):
def test_service_flags(self): def test_service_flags(self):
self.log.info("Test service flags") self.log.info("Test service flags")
self.nodes[0].add_p2p_connection(P2PInterface(), services=(1 << 4) | (1 << 63)) self.nodes[0].add_p2p_connection(P2PInterface(), services=(1 << 4) | (1 << 63))
assert_equal(['UNKNOWN[2^4]', 'UNKNOWN[2^63]'], self.nodes[0].getpeerinfo()[-1]['servicesnames']) if self.options.v2transport:
assert_equal(['UNKNOWN[2^4]', 'P2P_V2', 'UNKNOWN[2^63]'], self.nodes[0].getpeerinfo()[-1]['servicesnames'])
else:
assert_equal(['UNKNOWN[2^4]', 'UNKNOWN[2^63]'], self.nodes[0].getpeerinfo()[-1]['servicesnames'])
self.nodes[0].disconnect_p2ps() self.nodes[0].disconnect_p2ps()
def test_getnodeaddresses(self): def test_getnodeaddresses(self):

View File

@ -63,6 +63,13 @@ MSG_TYPE_MASK = 0xffffffff >> 2
FILTER_TYPE_BASIC = 0 FILTER_TYPE_BASIC = 0
MAGIC_BYTES = {
"mainnet": b"\xbf\x0c\x6b\xbd", # mainnet
"testnet3": b"\xce\xe2\xca\xff", # testnet3
"regtest": b"\xfc\xc1\xb7\xdc", # regtest
"devnet": b"\xe2\xca\xff\xce", # devnet
}
def sha256(s): def sha256(s):
return hashlib.sha256(s).digest() return hashlib.sha256(s).digest()

View File

@ -79,6 +79,7 @@ from test_framework.messages import (
MSG_TX, MSG_TX,
MSG_TYPE_MASK, MSG_TYPE_MASK,
NODE_NETWORK, NODE_NETWORK,
MAGIC_BYTES,
sha256, sha256,
) )
from test_framework.util import ( from test_framework.util import (
@ -86,6 +87,11 @@ from test_framework.util import (
p2p_port, p2p_port,
wait_until_helper, wait_until_helper,
) )
from test_framework.v2_p2p import (
EncryptedP2PState,
MSGTYPE_TO_SHORTID,
SHORTID,
)
logger = logging.getLogger("TestFramework.p2p") logger = logging.getLogger("TestFramework.p2p")
@ -155,13 +161,6 @@ MESSAGEMAP = {
b"spork": None, b"spork": None,
} }
MAGIC_BYTES = {
"mainnet": b"\xbf\x0c\x6b\xbd", # mainnet
"testnet3": b"\xce\xe2\xca\xff", # testnet3
"regtest": b"\xfc\xc1\xb7\xdc", # regtest
"devnet": b"\xe2\xca\xff\xce", # devnet
}
class P2PConnection(asyncio.Protocol): class P2PConnection(asyncio.Protocol):
"""A low-level connection object to a node's P2P interface. """A low-level connection object to a node's P2P interface.
@ -180,11 +179,20 @@ class P2PConnection(asyncio.Protocol):
# The underlying transport of the connection. # The underlying transport of the connection.
# Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
self._transport = None self._transport = None
# This lock is acquired before sending messages over the socket. There's an implied lock order and
# p2p_lock must not be acquired after _send_lock as it could result in deadlocks.
self._send_lock = threading.Lock()
self.v2_state = None # EncryptedP2PState object needed for v2 p2p connections
self.reconnect = False # set if reconnection needs to happen
@property @property
def is_connected(self): def is_connected(self):
return self._transport is not None return self._transport is not None
@property
def supports_v2_p2p(self):
return self.v2_state is not None
def peer_connect_helper(self, dstaddr, dstport, net, timeout_factor, uacomment): def peer_connect_helper(self, dstaddr, dstport, net, timeout_factor, uacomment):
assert not self.is_connected assert not self.is_connected
self.timeout_factor = timeout_factor self.timeout_factor = timeout_factor
@ -207,16 +215,21 @@ class P2PConnection(asyncio.Protocol):
else: else:
self.strSubVer = P2P_SUBVERSION % "" self.strSubVer = P2P_SUBVERSION % ""
def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, uacomment=None): def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p, uacomment=None):
self.peer_connect_helper(dstaddr, dstport, net, timeout_factor, uacomment) self.peer_connect_helper(dstaddr, dstport, net, timeout_factor, uacomment)
if supports_v2_p2p:
self.v2_state = EncryptedP2PState(initiating=True, net=net)
loop = NetworkThread.network_event_loop loop = NetworkThread.network_event_loop
logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport)) logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport))
coroutine = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport) coroutine = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport)
return lambda: loop.call_soon_threadsafe(loop.create_task, coroutine) return lambda: loop.call_soon_threadsafe(loop.create_task, coroutine)
def peer_accept_connection(self, connect_id, connect_cb=lambda: None, *, net, timeout_factor, uacomment=None): def peer_accept_connection(self, connect_id, connect_cb=lambda: None, *, net, timeout_factor, supports_v2_p2p, reconnect, uacomment=None):
self.peer_connect_helper('0', 0, net, timeout_factor, uacomment) self.peer_connect_helper('0', 0, net, timeout_factor, uacomment)
self.reconnect = reconnect
if supports_v2_p2p:
self.v2_state = EncryptedP2PState(initiating=False, net=net)
logger.debug('Listening for Dash Node with id: {}'.format(connect_id)) logger.debug('Listening for Dash Node with id: {}'.format(connect_id))
return lambda: NetworkThread.listen(self, connect_cb, idx=connect_id) return lambda: NetworkThread.listen(self, connect_cb, idx=connect_id)
@ -232,14 +245,22 @@ class P2PConnection(asyncio.Protocol):
assert not self._transport assert not self._transport
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self._transport = transport self._transport = transport
if self.on_connection_send_msg: # in an inbound connection to the TestNode with P2PConnection as the initiator, [TestNode <---- P2PConnection]
# send the initial handshake immediately
if self.supports_v2_p2p and self.v2_state.initiating and not self.v2_state.tried_v2_handshake:
send_handshake_bytes = self.v2_state.initiate_v2_handshake()
self.send_raw_message(send_handshake_bytes)
# if v2 connection, send `on_connection_send_msg` after initial v2 handshake.
# if reconnection situation, send `on_connection_send_msg` after version message is received in `on_version()`.
if self.on_connection_send_msg and not self.supports_v2_p2p and not self.reconnect:
self.send_message(self.on_connection_send_msg) self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None # Never used again self.on_connection_send_msg = None # Never used again
self.on_open() self.on_open()
def connection_lost(self, exc): def connection_lost(self, exc):
"""asyncio callback when a connection is closed.""" """asyncio callback when a connection is closed."""
if exc: # don't display warning if reconnection needs to be attempted using v1 P2P
if exc and not self.reconnect:
logger.warning("Connection lost to {}:{} due to {}".format(self.dstaddr, self.dstport, exc)) logger.warning("Connection lost to {}:{} due to {}".format(self.dstaddr, self.dstport, exc))
else: else:
logger.debug("Closed connection to: %s:%d" % (self.dstaddr, self.dstport)) logger.debug("Closed connection to: %s:%d" % (self.dstaddr, self.dstport))
@ -247,13 +268,62 @@ class P2PConnection(asyncio.Protocol):
self.recvbuf = b"" self.recvbuf = b""
self.on_close() self.on_close()
# v2 handshake method
def v2_handshake(self):
"""v2 handshake performed before P2P messages are exchanged (see BIP324). P2PConnection is the initiator
(in inbound connections to TestNode) and the responder (in outbound connections from TestNode).
Performed by:
* initiator using `initiate_v2_handshake()`, `complete_handshake()` and `authenticate_handshake()`
* responder using `respond_v2_handshake()`, `complete_handshake()` and `authenticate_handshake()`
`initiate_v2_handshake()` is immediately done by the initiator when the connection is established in
`connection_made()`. The rest of the initial v2 handshake functions are handled here.
"""
if not self.v2_state.peer:
if not self.v2_state.initiating and not self.v2_state.sent_garbage:
# if the responder hasn't sent garbage yet, the responder is still reading ellswift bytes
# reads ellswift bytes till the first mismatch from 12 bytes V1_PREFIX
length, send_handshake_bytes = self.v2_state.respond_v2_handshake(BytesIO(self.recvbuf))
self.recvbuf = self.recvbuf[length:]
if send_handshake_bytes == -1:
self.v2_state = None
return
elif send_handshake_bytes:
self.send_raw_message(send_handshake_bytes)
elif send_handshake_bytes == b"":
return # only after send_handshake_bytes are sent can `complete_handshake()` be done
# `complete_handshake()` reads the remaining ellswift bytes from recvbuf
# and sends response after deriving shared ECDH secret using received ellswift bytes
length, response = self.v2_state.complete_handshake(BytesIO(self.recvbuf))
self.recvbuf = self.recvbuf[length:]
if response:
self.send_raw_message(response)
else:
return # only after response is sent can `authenticate_handshake()` be done
# `self.v2_state.peer` is instantiated only after shared ECDH secret/BIP324 derived keys and ciphers
# is derived in `complete_handshake()`.
# so `authenticate_handshake()` which uses the BIP324 derived ciphers gets called after `complete_handshake()`.
assert self.v2_state.peer
length, is_mac_auth = self.v2_state.authenticate_handshake(self.recvbuf)
if not is_mac_auth:
raise ValueError("invalid v2 mac tag in handshake authentication")
self.recvbuf = self.recvbuf[length:]
if self.v2_state.tried_v2_handshake and self.on_connection_send_msg:
self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None
# Socket read methods # Socket read methods
def data_received(self, t): def data_received(self, t):
"""asyncio callback when data is read from the socket.""" """asyncio callback when data is read from the socket."""
if len(t) > 0: if len(t) > 0:
self.recvbuf += t self.recvbuf += t
self._on_data() if self.supports_v2_p2p and not self.v2_state.tried_v2_handshake:
self.v2_handshake()
else:
self._on_data()
def _on_data(self): def _on_data(self):
"""Try to read P2P messages from the recv buffer. """Try to read P2P messages from the recv buffer.
@ -263,23 +333,48 @@ class P2PConnection(asyncio.Protocol):
the on_message callback for processing.""" the on_message callback for processing."""
try: try:
while True: while True:
if len(self.recvbuf) < 4: if self.supports_v2_p2p:
return # v2 P2P messages are read
if self.recvbuf[:4] != self.magic_bytes: msglen, msg = self.v2_state.v2_receive_packet(self.recvbuf)
raise ValueError("magic bytes mismatch: {} != {}".format(repr(self.magic_bytes), repr(self.recvbuf))) if msglen == -1:
if len(self.recvbuf) < 4 + 12 + 4 + 4: raise ValueError("invalid v2 mac tag " + repr(self.recvbuf))
return elif msglen == 0: # need to receive more bytes in recvbuf
msgtype = self.recvbuf[4:4+12].split(b"\x00", 1)[0] return
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0] self.recvbuf = self.recvbuf[msglen:]
checksum = self.recvbuf[4+12+4:4+12+4+4]
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen: if msg is None: # ignore decoy messages
return return
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen] assert msg # application layer messages (which aren't decoy messages) are non-empty
th = sha256(msg) shortid = msg[0] # 1-byte short message type ID
h = sha256(th) if shortid == 0:
if checksum != h[:4]: # next 12 bytes are interpreted as ASCII message type if shortid is b'\x00'
raise ValueError("got bad checksum " + repr(self.recvbuf)) if len(msg) < 13:
self.recvbuf = self.recvbuf[4+12+4+4+msglen:] raise IndexError("msg needs minimum required length of 13 bytes")
msgtype = msg[1:13].rstrip(b'\x00')
msg = msg[13:] # msg is set to be payload
else:
# a 1-byte short message type ID
msgtype = SHORTID.get(shortid, f"unknown-{shortid}")
msg = msg[1:]
else:
# v1 P2P messages are read
if len(self.recvbuf) < 4:
return
if self.recvbuf[:4] != self.magic_bytes:
raise ValueError("magic bytes mismatch: {} != {}".format(repr(self.magic_bytes), repr(self.recvbuf)))
if len(self.recvbuf) < 4 + 12 + 4 + 4:
return
msgtype = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
checksum = self.recvbuf[4+12+4:4+12+4+4]
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
return
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
th = sha256(msg)
h = sha256(th)
if checksum != h[:4]:
raise ValueError("got bad checksum " + repr(self.recvbuf))
self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
if msgtype not in MESSAGEMAP: if msgtype not in MESSAGEMAP:
raise ValueError("Received unknown msgtype from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, msgtype, repr(msg))) raise ValueError("Received unknown msgtype from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, msgtype, repr(msg)))
if MESSAGEMAP[msgtype] is None: if MESSAGEMAP[msgtype] is None:
@ -291,7 +386,8 @@ class P2PConnection(asyncio.Protocol):
self._log_message("receive", t) self._log_message("receive", t)
self.on_message(t) self.on_message(t)
except Exception as e: except Exception as e:
logger.exception('Error reading message:', repr(e)) if not self.reconnect:
logger.exception('Error reading message:', repr(e))
raise raise
def on_message(self, message): def on_message(self, message):
@ -300,14 +396,15 @@ class P2PConnection(asyncio.Protocol):
# Socket write methods # Socket write methods
def send_message(self, message): def send_message(self, message, is_decoy=False):
"""Send a P2P message over the socket. """Send a P2P message over the socket.
This method takes a P2P payload, builds the P2P header and adds This method takes a P2P payload, builds the P2P header and adds
the message to the send buffer to be sent over the socket.""" the message to the send buffer to be sent over the socket."""
tmsg = self.build_message(message) with self._send_lock:
self._log_message("send", message) tmsg = self.build_message(message, is_decoy)
return self.send_raw_message(tmsg) self._log_message("send", message)
return self.send_raw_message(tmsg)
def send_raw_message(self, raw_message_bytes): def send_raw_message(self, raw_message_bytes):
if not self.is_connected: if not self.is_connected:
@ -323,19 +420,29 @@ class P2PConnection(asyncio.Protocol):
# Class utility methods # Class utility methods
def build_message(self, message): def build_message(self, message, is_decoy=False):
"""Build a serialized P2P message""" """Build a serialized P2P message"""
msgtype = message.msgtype msgtype = message.msgtype
data = message.serialize() data = message.serialize()
tmsg = self.magic_bytes if self.supports_v2_p2p:
tmsg += msgtype if msgtype in SHORTID.values():
tmsg += b"\x00" * (12 - len(msgtype)) tmsg = MSGTYPE_TO_SHORTID.get(msgtype).to_bytes(1, 'big')
tmsg += struct.pack("<I", len(data)) else:
th = sha256(data) tmsg = b"\x00"
h = sha256(th) tmsg += msgtype
tmsg += h[:4] tmsg += b"\x00" * (12 - len(msgtype))
tmsg += data tmsg += data
return tmsg return self.v2_state.v2_enc_packet(tmsg, ignore=is_decoy)
else:
tmsg = self.magic_bytes
tmsg += msgtype
tmsg += b"\x00" * (12 - len(msgtype))
tmsg += struct.pack("<I", len(data))
th = sha256(data)
h = sha256(th)
tmsg += h[:4]
tmsg += data
return tmsg
def _log_message(self, direction, msg): def _log_message(self, direction, msg):
"""Logs a message being sent or received over the connection.""" """Logs a message being sent or received over the connection."""
@ -486,6 +593,12 @@ class P2PInterface(P2PConnection):
def on_version(self, message): def on_version(self, message):
assert message.nVersion >= MIN_P2P_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_P2P_VERSION_SUPPORTED) assert message.nVersion >= MIN_P2P_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_P2P_VERSION_SUPPORTED)
# reconnection using v1 P2P has happened since version message can be processed, previously unsent version message is sent using v1 P2P here
if self.reconnect:
if self.on_connection_send_msg:
self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None
self.reconnect = False
if self.support_addrv2: if self.support_addrv2:
self.send_message(msg_sendaddrv2()) self.send_message(msg_sendaddrv2())
self.send_message(msg_verack()) self.send_message(msg_verack())
@ -511,6 +624,13 @@ class P2PInterface(P2PConnection):
test_function = lambda: not self.is_connected test_function = lambda: not self.is_connected
self.wait_until(test_function, timeout=timeout, check_connected=False) self.wait_until(test_function, timeout=timeout, check_connected=False)
def wait_for_reconnect(self, timeout=60):
def test_function():
if not (self.is_connected and self.last_message.get('version') and self.v2_state is None):
return False
return True
self.wait_until(test_function, timeout=timeout, check_connected=False)
# Message receiving helper methods # Message receiving helper methods
def wait_for_tx(self, txid, timeout=60): def wait_for_tx(self, txid, timeout=60):
@ -661,6 +781,11 @@ class NetworkThread(threading.Thread):
if addr is None: if addr is None:
addr = '127.0.0.1' addr = '127.0.0.1'
def exception_handler(loop, context):
if not p2p.reconnect:
loop.default_exception_handler(context)
cls.network_event_loop.set_exception_handler(exception_handler)
coroutine = cls.create_listen_server(addr, port, callback, p2p) coroutine = cls.create_listen_server(addr, port, callback, p2p)
cls.network_event_loop.call_soon_threadsafe(cls.network_event_loop.create_task, coroutine) cls.network_event_loop.call_soon_threadsafe(cls.network_event_loop.create_task, coroutine)
@ -674,7 +799,9 @@ class NetworkThread(threading.Thread):
protocol function from that dict, and returns it so the event loop protocol function from that dict, and returns it so the event loop
can start executing it.""" can start executing it."""
response = cls.protos.get((addr, port)) response = cls.protos.get((addr, port))
cls.protos[(addr, port)] = None # remove protocol function from dict only when reconnection doesn't need to happen/already happened
if not proto.reconnect:
cls.protos[(addr, port)] = None
return response return response
if (addr, port) not in cls.listeners: if (addr, port) not in cls.listeners:
@ -760,7 +887,7 @@ class P2PDataStore(P2PInterface):
if response is not None: if response is not None:
self.send_message(response) self.send_message(response)
def send_blocks_and_test(self, blocks, node, *, success=True, force_send=False, reject_reason=None, expect_disconnect=False, timeout=60): def send_blocks_and_test(self, blocks, node, *, success=True, force_send=False, reject_reason=None, expect_disconnect=False, timeout=60, is_decoy=False):
"""Send blocks to test node and test whether the tip advances. """Send blocks to test node and test whether the tip advances.
- add all blocks to our block_store - add all blocks to our block_store
@ -779,9 +906,11 @@ class P2PDataStore(P2PInterface):
reject_reason = [reject_reason] if reject_reason else [] reject_reason = [reject_reason] if reject_reason else []
with node.assert_debug_log(expected_msgs=reject_reason): with node.assert_debug_log(expected_msgs=reject_reason):
if is_decoy: # since decoy messages are ignored by the recipient - no need to wait for response
force_send = True
if force_send: if force_send:
for b in blocks: for b in blocks:
self.send_message(msg_block(block=b)) self.send_message(msg_block(block=b), is_decoy)
else: else:
self.send_message(msg_headers([CBlockHeader(block) for block in blocks])) self.send_message(msg_headers([CBlockHeader(block) for block in blocks]))
self.wait_until( self.wait_until(

View File

@ -24,7 +24,8 @@ from pathlib import Path
from .authproxy import JSONRPCException from .authproxy import JSONRPCException
from .descriptors import descsum_create from .descriptors import descsum_create
from .p2p import P2P_SUBVERSION from .messages import NODE_P2P_V2
from .p2p import P2P_SERVICES, P2P_SUBVERSION
from .util import ( from .util import (
MAX_NODES, MAX_NODES,
assert_equal, assert_equal,
@ -631,18 +632,30 @@ class TestNode():
assert_msg += "with expected error " + expected_msg assert_msg += "with expected error " + expected_msg
self._raise_assertion_error(assert_msg) self._raise_assertion_error(assert_msg)
def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, **kwargs): def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=False, **kwargs):
"""Add an inbound p2p connection to the node. """Add an inbound p2p connection to the node.
This method adds the p2p connection to the self.p2ps list and also This method adds the p2p connection to the self.p2ps list and also
returns the connection to the caller.""" returns the connection to the caller.
When self.use_v2transport is True, TestNode advertises NODE_P2P_V2 service flag
An inbound connection is made from TestNode <------ P2PConnection
- if TestNode doesn't advertise NODE_P2P_V2 service, P2PConnection sends version message and v1 P2P is followed
- if TestNode advertises NODE_P2P_V2 service, (and if P2PConnections supports v2 P2P)
P2PConnection sends ellswift bytes and v2 P2P is followed
"""
if 'dstport' not in kwargs: if 'dstport' not in kwargs:
kwargs['dstport'] = p2p_port(self.index) kwargs['dstport'] = p2p_port(self.index)
if 'dstaddr' not in kwargs: if 'dstaddr' not in kwargs:
kwargs['dstaddr'] = '127.0.0.1' kwargs['dstaddr'] = '127.0.0.1'
p2p_conn.p2p_connected_to_node = True p2p_conn.p2p_connected_to_node = True
p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor)() if self.use_v2transport:
kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
supports_v2_p2p = self.use_v2transport and supports_v2_p2p
p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)()
self.p2ps.append(p2p_conn) self.p2ps.append(p2p_conn)
p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False) p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False)
if send_version: if send_version:
@ -672,7 +685,7 @@ class TestNode():
return p2p_conn return p2p_conn
def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, p2p_idx, connection_type="outbound-full-relay", **kwargs): def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=False, advertise_v2_p2p=False, **kwargs):
"""Add an outbound p2p connection from node. Must be an """Add an outbound p2p connection from node. Must be an
"outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection. "outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection.
@ -682,14 +695,37 @@ class TestNode():
p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer p2p_idx must be different for simultaneously connected peers. When reusing it for the next peer
after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid after disconnecting the previous one, it is necessary to wait for the disconnect to finish to avoid
a race condition. a race condition.
Parameters:
supports_v2_p2p: whether p2p_conn supports v2 P2P or not
advertise_v2_p2p: whether p2p_conn is advertised to support v2 P2P or not
An outbound connection is made from TestNode -------> P2PConnection
- if P2PConnection doesn't advertise_v2_p2p, TestNode sends version message and v1 P2P is followed
- if P2PConnection both supports_v2_p2p and advertise_v2_p2p, TestNode sends ellswift bytes and v2 P2P is followed
- if P2PConnection doesn't supports_v2_p2p but advertise_v2_p2p,
TestNode sends ellswift bytes and P2PConnection disconnects,
TestNode reconnects by sending version message and v1 P2P is followed
""" """
def addconnection_callback(address, port): def addconnection_callback(address, port):
self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type))
self.addconnection('%s:%d' % (address, port), connection_type) self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p)
p2p_conn.p2p_connected_to_node = False p2p_conn.p2p_connected_to_node = False
p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, **kwargs)() if advertise_v2_p2p:
kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
assert self.use_v2transport # only a v2 TestNode could make a v2 outbound connection
# if P2PConnection is advertised to support v2 P2P when it doesn't actually support v2 P2P,
# reconnection needs to be attempted using v1 P2P by sending version message
reconnect = advertise_v2_p2p and not supports_v2_p2p
# P2PConnection needs to be advertised to support v2 P2P so that ellswift bytes are sent instead of msg_version
supports_v2_p2p = supports_v2_p2p and advertise_v2_p2p
p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p, reconnect=reconnect, **kwargs)()
if reconnect:
p2p_conn.wait_for_reconnect()
if connection_type == "feeler": if connection_type == "feeler":
# feeler connections are closed as soon as the node receives a `version` message # feeler connections are closed as soon as the node receives a `version` message

View File

@ -0,0 +1,325 @@
#!/usr/bin/env python3
# Copyright (c) 2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Class for v2 P2P protocol (see BIP 324)"""
import logging
import random
from .crypto.bip324_cipher import FSChaCha20Poly1305
from .crypto.chacha20 import FSChaCha20
from .crypto.ellswift import ellswift_create, ellswift_ecdh_xonly
from .crypto.hkdf import hkdf_sha256
from .key import TaggedHash
from .messages import MAGIC_BYTES
from .util import random_bytes
logger = logging.getLogger("TestFramework.v2_p2p")
CHACHA20POLY1305_EXPANSION = 16
HEADER_LEN = 1
IGNORE_BIT_POS = 7
LENGTH_FIELD_LEN = 3
MAX_GARBAGE_LEN = 4095
TRANSPORT_VERSION = b''
SHORTID = {
1: b"addr",
2: b"block",
3: b"blocktxn",
4: b"cmpctblock",
5: b"", # Dash does not support "feefilter"
6: b"filteradd",
7: b"filterclear",
8: b"filterload",
9: b"getblocks",
10: b"getblocktxn",
11: b"getdata",
12: b"getheaders",
13: b"headers",
14: b"inv",
15: b"mempool",
16: b"merkleblock",
17: b"notfound",
18: b"ping",
19: b"pong",
20: b"sendcmpct",
21: b"tx",
22: b"getcfilters",
23: b"cfilter",
24: b"getcfheaders",
25: b"cfheaders",
26: b"getcfcheckpt",
27: b"cfcheckpt",
28: b"addrv2",
128: b"spork", # Dash short IDs start from 128 onwards
129: b"getsporks",
130: b"senddsq",
131: b"dsa",
132: b"dsi",
133: b"dsf",
134: b"dss",
135: b"dsc",
136: b"dssu",
137: b"dstx",
138: b"dsq",
139: b"ssc",
140: b"govsync",
141: b"govobj",
142: b"govobjvote",
143: b"getmnlistd",
144: b"mnlistdiff",
145: b"qsendrecsigs",
146: b"qfcommit",
147: b"qcontrib",
148: b"qcomplaint",
149: b"qjustify",
150: b"qpcommit",
151: b"qwatch",
152: b"qsigsesann",
153: b"qsigsinv",
154: b"qgetsigs",
155: b"qbsigs",
156: b"qsigrec",
157: b"qsigshare",
158: b"qgetdata",
159: b"qdata",
160: b"clsig",
161: b"isdlock",
162: b"mnauth",
163: b"getheaders2",
164: b"sendheaders2",
165: b"headers2",
166: b"getqrinfo",
167: b"qrinfo",
}
# Dictionary which contains short message type ID for the P2P message
MSGTYPE_TO_SHORTID = {msgtype: shortid for shortid, msgtype in SHORTID.items()}
class EncryptedP2PState:
"""A class for managing the state when v2 P2P protocol is used. Performs initial v2 handshake and encrypts/decrypts
P2P messages. P2PConnection uses an object of this class.
Args:
initiating (bool): defines whether the P2PConnection is an initiator or responder.
- initiating = True for inbound connections in the test framework [TestNode <------- P2PConnection]
- initiating = False for outbound connections in the test framework [TestNode -------> P2PConnection]
net (string): chain used (regtest, signet etc..)
Methods:
perform an advanced form of diffie-hellman handshake to instantiate the encrypted transport. before exchanging
any P2P messages, 2 nodes perform this handshake in order to determine a shared secret that is unique to both
of them and use it to derive keys to encrypt/decrypt P2P messages.
- initial v2 handshakes is performed by: (see BIP324 section #overall-handshake-pseudocode)
1. initiator using initiate_v2_handshake(), complete_handshake() and authenticate_handshake()
2. responder using respond_v2_handshake(), complete_handshake() and authenticate_handshake()
- initialize_v2_transport() sets various BIP324 derived keys and ciphers.
encrypt/decrypt v2 P2P messages using v2_enc_packet() and v2_receive_packet().
"""
def __init__(self, *, initiating, net):
self.initiating = initiating # True if initiator
self.net = net
self.peer = {} # object with various BIP324 derived keys and ciphers
self.privkey_ours = None
self.ellswift_ours = None
self.sent_garbage = b""
self.received_garbage = b""
self.received_prefix = b"" # received ellswift bytes till the first mismatch from 16 bytes v1_prefix
self.tried_v2_handshake = False # True when the initial handshake is over
# stores length of packet contents to detect whether first 3 bytes (which contains length of packet contents)
# has been decrypted. set to -1 if decryption hasn't been done yet.
self.contents_len = -1
self.found_garbage_terminator = False
@staticmethod
def v2_ecdh(priv, ellswift_theirs, ellswift_ours, initiating):
"""Compute BIP324 shared secret.
Returns:
bytes - BIP324 shared secret
"""
ecdh_point_x32 = ellswift_ecdh_xonly(ellswift_theirs, priv)
if initiating:
# Initiating, place our public key encoding first.
return TaggedHash("bip324_ellswift_xonly_ecdh", ellswift_ours + ellswift_theirs + ecdh_point_x32)
else:
# Responding, place their public key encoding first.
return TaggedHash("bip324_ellswift_xonly_ecdh", ellswift_theirs + ellswift_ours + ecdh_point_x32)
def generate_keypair_and_garbage(self):
"""Generates ellswift keypair and 4095 bytes garbage at max"""
self.privkey_ours, self.ellswift_ours = ellswift_create()
garbage_len = random.randrange(MAX_GARBAGE_LEN + 1)
self.sent_garbage = random_bytes(garbage_len)
logger.debug(f"sending {garbage_len} bytes of garbage data")
return self.ellswift_ours + self.sent_garbage
def initiate_v2_handshake(self):
"""Initiator begins the v2 handshake by sending its ellswift bytes and garbage
Returns:
bytes - bytes to be sent to the peer when starting the v2 handshake as an initiator
"""
return self.generate_keypair_and_garbage()
def respond_v2_handshake(self, response):
"""Responder begins the v2 handshake by sending its ellswift bytes and garbage. However, the responder
sends this after having received at least one byte that mismatches 16-byte v1_prefix.
Returns:
1. int - length of bytes that were consumed so that recvbuf can be updated
2. bytes - bytes to be sent to the peer when starting the v2 handshake as a responder.
- returns b"" if more bytes need to be received before we can respond and start the v2 handshake.
- returns -1 to downgrade the connection to v1 P2P.
"""
v1_prefix = MAGIC_BYTES[self.net] + b'version\x00\x00\x00\x00\x00'
while len(self.received_prefix) < 16:
byte = response.read(1)
# return b"" if we need to receive more bytes
if not byte:
return len(self.received_prefix), b""
self.received_prefix += byte
if self.received_prefix[-1] != v1_prefix[len(self.received_prefix) - 1]:
return len(self.received_prefix), self.generate_keypair_and_garbage()
# return -1 to decide v1 only after all 16 bytes processed
return len(self.received_prefix), -1
def complete_handshake(self, response):
""" Instantiates the encrypted transport and
sends garbage terminator + optional decoy packets + transport version packet.
Done by both initiator and responder.
Returns:
1. int - length of bytes that were consumed. returns 0 if all 64 bytes from ellswift haven't been received yet.
2. bytes - bytes to be sent to the peer when completing the v2 handshake
"""
ellswift_theirs = self.received_prefix + response.read(64 - len(self.received_prefix))
# return b"" if we need to receive more bytes
if len(ellswift_theirs) != 64:
return 0, b""
ecdh_secret = self.v2_ecdh(self.privkey_ours, ellswift_theirs, self.ellswift_ours, self.initiating)
self.initialize_v2_transport(ecdh_secret)
# Send garbage terminator
msg_to_send = self.peer['send_garbage_terminator']
# Optionally send decoy packets after garbage terminator.
aad = self.sent_garbage
for decoy_content_len in [random.randint(1, 100) for _ in range(random.randint(0, 10))]:
msg_to_send += self.v2_enc_packet(decoy_content_len * b'\x00', aad=aad, ignore=True)
aad = b''
# Send version packet.
msg_to_send += self.v2_enc_packet(TRANSPORT_VERSION, aad=aad)
return 64 - len(self.received_prefix), msg_to_send
def authenticate_handshake(self, response):
""" Ensures that the received optional decoy packets and transport version packet are authenticated.
Marks the v2 handshake as complete. Done by both initiator and responder.
Returns:
1. int - length of bytes that were processed so that recvbuf can be updated
2. bool - True if the authentication was successful/more bytes need to be received and False otherwise
"""
processed_length = 0
# Detect garbage terminator in the received bytes
if not self.found_garbage_terminator:
received_garbage = response[:16]
response = response[16:]
processed_length = len(received_garbage)
for i in range(MAX_GARBAGE_LEN + 1):
if received_garbage[-16:] == self.peer['recv_garbage_terminator']:
# Receive, decode, and ignore version packet.
# This includes skipping decoys and authenticating the received garbage.
self.found_garbage_terminator = True
self.received_garbage = received_garbage[:-16]
break
else:
# don't update recvbuf since more bytes need to be received
if len(response) == 0:
return 0, True
received_garbage += response[:1]
processed_length += 1
response = response[1:]
else:
# disconnect since garbage terminator was not seen after 4 KiB of garbage.
return processed_length, False
# Process optional decoy packets and transport version packet
while not self.tried_v2_handshake:
length, contents = self.v2_receive_packet(response, aad=self.received_garbage)
if length == -1:
return processed_length, False
elif length == 0:
return processed_length, True
processed_length += length
self.received_garbage = b""
# decoy packets have contents = None. v2 handshake is complete only when version packet
# (can be empty with contents = b"") with contents != None is received.
if contents is not None:
self.tried_v2_handshake = True
return processed_length, True
response = response[length:]
def initialize_v2_transport(self, ecdh_secret):
"""Sets the peer object with various BIP324 derived keys and ciphers."""
peer = {}
salt = b'bitcoin_v2_shared_secret' + MAGIC_BYTES[self.net]
for name in ('initiator_L', 'initiator_P', 'responder_L', 'responder_P', 'garbage_terminators', 'session_id'):
peer[name] = hkdf_sha256(salt=salt, ikm=ecdh_secret, info=name.encode('utf-8'), length=32)
if self.initiating:
self.peer['send_L'] = FSChaCha20(peer['initiator_L'])
self.peer['send_P'] = FSChaCha20Poly1305(peer['initiator_P'])
self.peer['send_garbage_terminator'] = peer['garbage_terminators'][:16]
self.peer['recv_L'] = FSChaCha20(peer['responder_L'])
self.peer['recv_P'] = FSChaCha20Poly1305(peer['responder_P'])
self.peer['recv_garbage_terminator'] = peer['garbage_terminators'][16:]
else:
self.peer['send_L'] = FSChaCha20(peer['responder_L'])
self.peer['send_P'] = FSChaCha20Poly1305(peer['responder_P'])
self.peer['send_garbage_terminator'] = peer['garbage_terminators'][16:]
self.peer['recv_L'] = FSChaCha20(peer['initiator_L'])
self.peer['recv_P'] = FSChaCha20Poly1305(peer['initiator_P'])
self.peer['recv_garbage_terminator'] = peer['garbage_terminators'][:16]
self.peer['session_id'] = peer['session_id']
def v2_enc_packet(self, contents, aad=b'', ignore=False):
"""Encrypt a BIP324 packet.
Returns:
bytes - encrypted packet contents
"""
assert len(contents) <= 2**24 - 1
header = (ignore << IGNORE_BIT_POS).to_bytes(HEADER_LEN, 'little')
plaintext = header + contents
aead_ciphertext = self.peer['send_P'].encrypt(aad, plaintext)
enc_plaintext_len = self.peer['send_L'].crypt(len(contents).to_bytes(LENGTH_FIELD_LEN, 'little'))
return enc_plaintext_len + aead_ciphertext
def v2_receive_packet(self, response, aad=b''):
"""Decrypt a BIP324 packet
Returns:
1. int - number of bytes consumed (or -1 if error)
2. bytes - contents of decrypted non-decoy packet if any (or None otherwise)
"""
if self.contents_len == -1:
if len(response) < LENGTH_FIELD_LEN:
return 0, None
enc_contents_len = response[:LENGTH_FIELD_LEN]
self.contents_len = int.from_bytes(self.peer['recv_L'].crypt(enc_contents_len), 'little')
response = response[LENGTH_FIELD_LEN:]
if len(response) < HEADER_LEN + self.contents_len + CHACHA20POLY1305_EXPANSION:
return 0, None
aead_ciphertext = response[:HEADER_LEN + self.contents_len + CHACHA20POLY1305_EXPANSION]
plaintext = self.peer['recv_P'].decrypt(aad, aead_ciphertext)
if plaintext is None:
return -1, None # disconnect
header = plaintext[:HEADER_LEN]
length = LENGTH_FIELD_LEN + HEADER_LEN + self.contents_len + CHACHA20POLY1305_EXPANSION
self.contents_len = -1
return length, None if (header[0] & (1 << IGNORE_BIT_POS)) else plaintext[HEADER_LEN:]

View File

@ -243,6 +243,8 @@ BASE_SCRIPTS = [
'p2p_invalid_tx.py', 'p2p_invalid_tx.py',
'p2p_invalid_tx.py --v2transport', 'p2p_invalid_tx.py --v2transport',
'p2p_v2_transport.py', 'p2p_v2_transport.py',
'p2p_v2_encrypted.py',
'p2p_v2_earlykeyresponse.py',
'feature_assumevalid.py', 'feature_assumevalid.py',
'example_test.py', 'example_test.py',
'wallet_txn_doublespend.py --legacy-wallet', 'wallet_txn_doublespend.py --legacy-wallet',