partial bitcoin#28331: BIP324 integration

Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>

excludes:
- changes to `src/rpc/util.cpp`
This commit is contained in:
Kittywhiskers Van Gogh 2024-10-09 12:20:57 +00:00
parent f9f8805c72
commit ff92d1adae
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
21 changed files with 548 additions and 90 deletions

View File

@ -41,4 +41,5 @@ Versions and PRs are relevant to Bitcoin's core if not mentioned other.
* [`BIP 158`](https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki): Compact Block Filters for Light Clients can be indexed as of **Dash Core v18.0** ([PR dash#4314](https://github.com/dashpay/dash/pull/4314), [PR #14121](https://github.com/bitcoin/bitcoin/pull/14121)).
* [`BIP 159`](https://github.com/bitcoin/bips/blob/master/bip-0159.mediawiki): The `NODE_NETWORK_LIMITED` service bit is signalled as of **v0.16.0** ([PR 11740](https://github.com/bitcoin/bitcoin/pull/11740)), and such nodes are connected to as of **v0.17.0** ([PR 10387](https://github.com/bitcoin/bitcoin/pull/10387)).
* [`BIP 174`](https://github.com/bitcoin/bips/blob/master/bip-0174.mediawiki): RPCs to operate on Partially Signed Bitcoin Transactions (PSBT) are present as of **v18.0** ([PR 13557](https://github.com/bitcoin/bitcoin/pull/13557)).
* [`BIP 324`](https://github.com/bitcoin/bips/blob/master/bip-0324.mediawiki): The v2 transport protocol specified by BIP324 and the associated `NODE_P2P_V2` service bit are supported as of **v22.0**, but off by default ([PR 28331](https://github.com/bitcoin/bitcoin/pull/28331)).
* [`BIP 339`](https://github.com/bitcoin/bips/blob/master/bip-0339.mediawiki): Relay of transactions by wtxid is supported as of **v0.21.0** ([PR 18044](https://github.com/bitcoin/bitcoin/pull/18044)).

View File

@ -588,6 +588,7 @@ void SetupServerArgs(ArgsManager& argsman)
argsman.AddArg("-i2psam=<ip:port>", "I2P SAM proxy to reach I2P peers and accept I2P connections (default: none)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-i2pacceptincoming", strprintf("Whether to accept inbound I2P connections (default: %i). Ignored if -i2psam is not set. Listening for inbound I2P connections is done through the SAM proxy, not by binding to a local address and port.", DEFAULT_I2P_ACCEPT_INCOMING), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-onlynet=<net>", "Make automatic outbound connections only to network <net> (" + Join(GetNetworkNames(), ", ") + "). Inbound and manual connections are not affected by this option. It can be specified multiple times to allow multiple networks.", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-v2transport", strprintf("Support v2 transport (default: %u)", DEFAULT_V2_TRANSPORT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-peerblockfilters", strprintf("Serve compact block filters to peers per BIP 157 (default: %u)", DEFAULT_PEERBLOCKFILTERS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-peerbloomfilters", strprintf("Support filtering of blocks and transaction with bloom filters (default: %u)", DEFAULT_PEERBLOOMFILTERS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
argsman.AddArg("-peertimeout=<n>", strprintf("Specify a p2p connection timeout delay in seconds. After connecting to a peer, wait this amount of time before considering disconnection based on inactivity (minimum: 1, default: %d)", DEFAULT_PEER_CONNECT_TIMEOUT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
@ -1147,6 +1148,11 @@ bool AppInitParameterInteraction(const ArgsManager& args)
}
}
// Signal NODE_P2P_V2 if BIP324 v2 transport is enabled.
if (args.GetBoolArg("-v2transport", DEFAULT_V2_TRANSPORT)) {
nLocalServices = ServiceFlags(nLocalServices | NODE_P2P_V2);
}
// Signal NODE_COMPACT_FILTERS if peerblockfilters and basic filters index are both enabled.
if (args.GetBoolArg("-peerblockfilters", DEFAULT_PEERBLOCKFILTERS)) {
if (g_enabled_filter_types.count(BlockFilterType::BASIC_FILTER) != 1) {

View File

@ -484,7 +484,7 @@ static CAddress GetBindAddress(const Sock& sock)
return addr_bind;
}
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type)
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@ -506,11 +506,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
/// debug print
if (fLogIPs) {
LogPrint(BCLog::NET, "trying connection %s lastseen=%.1fhrs\n",
LogPrint(BCLog::NET, "trying %s connection %s lastseen=%.1fhrs\n",
use_v2transport ? "v2" : "v1",
pszDest ? pszDest : addrConnect.ToStringAddrPort(),
pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0);
} else {
LogPrint(BCLog::NET, "trying connection lastseen=%.1fhrs\n",
LogPrint(BCLog::NET, "trying %s connection lastseen=%.1fhrs\n",
use_v2transport ? "v2" : "v1",
pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0);
}
@ -632,6 +634,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
CNodeOptions{
.i2p_sam_session = std::move(i2p_transient_session),
.recv_flood_size = nReceiveFloodSize,
.use_v2transport = use_v2transport,
});
pnode->AddRef();
::g_stats_client->inc("peers.connect", 1.0f);
@ -747,6 +750,9 @@ void CNode::CopyStats(CNodeStats& stats)
LOCK(cs_vRecv);
X(mapRecvBytesPerMsgType);
X(nRecvBytes);
Transport::Info info = m_transport->GetInfo();
stats.m_transport_type = info.transport_type;
if (info.session_id) stats.m_session_id = HexStr(*info.session_id);
}
X(m_legacyWhitelisted);
X(m_permission_flags);
@ -822,6 +828,11 @@ V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noex
Reset();
}
Transport::Info V1Transport::GetInfo() const noexcept
{
return {.transport_type = TransportProtocolType::V1, .session_id = {}};
}
int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
{
AssertLockHeld(m_recv_mutex);
@ -1632,8 +1643,15 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
LOCK(m_send_mutex);
if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent);
if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) {
LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid);
}
m_send_pos += bytes_sent;
Assume(m_send_pos <= m_send_buffer.size());
if (m_send_pos >= CMessageHeader::HEADER_SIZE) {
m_sent_v1_header_worth = true;
}
// Wipe the buffer when everything is sent.
if (m_send_pos == m_send_buffer.size()) {
m_send_pos = 0;
@ -1641,6 +1659,23 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
}
}
bool V2Transport::ShouldReconnectV1() const noexcept
{
AssertLockNotHeld(m_send_mutex);
AssertLockNotHeld(m_recv_mutex);
// Only outgoing connections need reconnection.
if (!m_initiating) return false;
LOCK(m_recv_mutex);
// We only reconnect in the very first state and when the receive buffer is empty. Together
// these conditions imply nothing has been received so far.
if (m_recv_state != RecvState::KEY) return false;
if (!m_recv_buffer.empty()) return false;
// Check if we've sent enough for the other side to disconnect us (if it was V1).
LOCK(m_send_mutex);
return m_sent_v1_header_worth;
}
size_t V2Transport::GetSendMemoryUsage() const noexcept
{
AssertLockNotHeld(m_send_mutex);
@ -1650,6 +1685,27 @@ size_t V2Transport::GetSendMemoryUsage() const noexcept
return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer);
}
Transport::Info V2Transport::GetInfo() const noexcept
{
AssertLockNotHeld(m_recv_mutex);
LOCK(m_recv_mutex);
if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo();
Transport::Info info;
// Do not report v2 and session ID until the version packet has been received
// and verified (confirming that the other side very likely has the same keys as us).
if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY &&
m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) {
info.transport_type = TransportProtocolType::V2;
info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID()));
} else {
info.transport_type = TransportProtocolType::DETECTING;
}
return info;
}
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
@ -1699,7 +1755,9 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
// Notify transport that bytes have been processed.
node.m_transport->MarkBytesSent(nBytes);
// Update statistics per message type.
node.AccountForSentBytes(msg_type, nBytes);
if (!msg_type.empty()) { // don't report v2 handshake bytes for now
node.AccountForSentBytes(msg_type, nBytes);
}
nSentSize += nBytes;
if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
@ -1934,6 +1992,10 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
}
const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end();
// The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is
// detected, so use it whenever we signal NODE_P2P_V2.
const bool use_v2transport(nodeServices & NODE_P2P_V2);
CNode* pnode = new CNode(id,
std::move(sock),
addr,
@ -1947,6 +2009,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
.permission_flags = permission_flags,
.prefer_evict = discouraged,
.recv_flood_size = nReceiveFloodSize,
.use_v2transport = use_v2transport,
});
pnode->AddRef();
// If this flag is present, the user probably expect that RPC and QT report it as whitelisted (backward compatibility)
@ -2016,12 +2079,19 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
CSemaphoreGrant grant(*semOutbound, true);
if (!grant) return false;
OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false);
return true;
}
void CConnman::DisconnectNodes()
{
AssertLockNotHeld(m_nodes_mutex);
AssertLockNotHeld(m_reconnections_mutex);
// Use a temporary variable to accumulate desired reconnections, so we don't need
// m_reconnections_mutex while holding m_nodes_mutex.
decltype(m_reconnections) reconnections_to_add;
{
LOCK(m_nodes_mutex);
@ -2084,6 +2154,19 @@ void CConnman::DisconnectNodes()
// remove from m_nodes
it = m_nodes.erase(it);
// Add to reconnection list if appropriate. We don't reconnect right here, because
// the creation of a connection is a blocking operation (up to several seconds),
// and we don't want to hold up the socket handler thread for that long.
if (pnode->m_transport->ShouldReconnectV1()) {
reconnections_to_add.push_back({
.addr_connect = pnode->addr,
.grant = std::move(pnode->grantOutbound),
.destination = pnode->m_dest,
.conn_type = pnode->m_conn_type,
.use_v2transport = false});
LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
}
// release outbound grant (if any)
pnode->grantOutbound.Release();
@ -2113,6 +2196,11 @@ void CConnman::DisconnectNodes()
}
}
}
{
// Move entries from reconnections_to_add to m_reconnections.
LOCK(m_reconnections_mutex);
m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add));
}
}
void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync)
@ -2936,9 +3024,9 @@ void CConnman::ProcessAddrFetch()
m_addr_fetches.pop_front();
}
CAddress addr;
CSemaphoreGrant grant(*semOutbound, true);
CSemaphoreGrant grant(*semOutbound, /*fTry=*/true);
if (grant) {
OpenNetworkConnection(addr, false, &grant, strDest.c_str(), ConnectionType::ADDR_FETCH);
OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, /*use_v2transport=*/false);
}
}
@ -3007,6 +3095,7 @@ std::unordered_set<Network> CConnman::GetReachableEmptyNetworks() const
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDeterministicMNManager& dmnman)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
AssertLockNotHeld(m_reconnections_mutex);
FastRandomContext rng;
// Connect to specific addresses
if (!connect.empty())
@ -3016,7 +3105,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
for (const std::string& strAddr : connect)
{
CAddress addr(CService(), NODE_NONE);
OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), ConnectionType::MANUAL);
OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/false);
for (int i = 0; i < 10 && i < nLoop; i++)
{
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
@ -3049,6 +3138,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
PerformReconnections();
CSemaphoreGrant grant(*semOutbound);
if (interruptNet)
return;
@ -3069,7 +3160,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
// Perform cheap checks before locking a mutex.
else if (!dnsseed && !use_seednodes) {
LOCK(m_added_nodes_mutex);
if (m_added_nodes.empty()) {
if (m_added_node_params.empty()) {
add_fixed_seeds_now = true;
LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n");
}
@ -3339,7 +3430,9 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
// Don't record addrman failure attempts when node is offline. This can be identified since all local
// network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1.
const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(nMaxConnections - 1, 2)};
OpenNetworkConnection(addrConnect, count_failures, &grant, /*strDest=*/nullptr, conn_type);
// Use BIP324 transport when both us and them have NODE_V2_P2P set.
const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2);
OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport);
}
}
}
@ -3361,11 +3454,11 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
{
std::vector<AddedNodeInfo> ret;
std::list<std::string> lAddresses(0);
std::list<AddedNodeParams> lAddresses(0);
{
LOCK(m_added_nodes_mutex);
ret.reserve(m_added_nodes.size());
std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses));
ret.reserve(m_added_node_params.size());
std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses));
}
@ -3385,9 +3478,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
}
}
for (const std::string& strAddNode : lAddresses) {
CService service(LookupNumeric(strAddNode, Params().GetDefaultPort(strAddNode)));
AddedNodeInfo addedNode{strAddNode, CService(), false, false};
for (const auto& addr : lAddresses) {
CService service(LookupNumeric(addr.m_added_node, Params().GetDefaultPort(addr.m_added_node)));
AddedNodeInfo addedNode{addr, CService(), false, false};
if (service.IsValid()) {
// strAddNode is an IP:port
auto it = mapConnected.find(service);
@ -3398,7 +3491,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
}
} else {
// strAddNode is a name
auto it = mapConnectedByName.find(strAddNode);
auto it = mapConnectedByName.find(addr.m_added_node);
if (it != mapConnectedByName.end()) {
addedNode.resolvedAddress = it->second.second;
addedNode.fConnected = true;
@ -3414,6 +3507,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
void CConnman::ThreadOpenAddedConnections()
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
AssertLockNotHeld(m_reconnections_mutex);
while (true)
{
CSemaphoreGrant grant(*semAddnode);
@ -3421,21 +3515,23 @@ void CConnman::ThreadOpenAddedConnections()
bool tried = false;
for (const AddedNodeInfo& info : vInfo) {
if (!info.fConnected) {
if (!grant.TryAcquire()) {
if (!grant) {
// If we've used up our semaphore and need a new one, let's not wait here since while we are waiting
// the addednodeinfo state might change.
break;
}
tried = true;
CAddress addr(CService(), NODE_NONE);
OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), ConnectionType::MANUAL);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return;
grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true);
}
}
// Retry every 60 seconds if a connection was attempted, otherwise two seconds
if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
return;
// See if any reconnections are desired.
PerformReconnections();
}
}
@ -3614,7 +3710,9 @@ void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman,
}
// if successful, this moves the passed grant to the constructed node
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, ConnectionType conn_type, MasternodeConn masternode_connection, MasternodeProbeConn masternode_probe_connection)
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound,
const char *pszDest, ConnectionType conn_type, bool use_v2transport,
MasternodeConn masternode_connection, MasternodeProbeConn masternode_probe_connection)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@ -3661,7 +3759,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return;
LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- connecting to %s\n", __func__, getIpStr());
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type);
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
if (!pnode) {
LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- ConnectNode failed for %s\n", __func__, getIpStr());
@ -3673,8 +3771,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
LogPrint(BCLog::NET_NETCONN, "CConnman::%s -- successfully connected to %s, sock=%d, peer=%d\n", __func__, getIpStr(), pnode->m_sock->Get(), pnode->GetId());
}
if (grantOutbound)
grantOutbound->MoveTo(pnode->grantOutbound);
pnode->grantOutbound = std::move(grant_outbound);
if (masternode_connection == MasternodeConn::IsConnection)
pnode->m_masternode_connection = true;
@ -3705,7 +3802,8 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
}
void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeProbeConn probe) {
OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe);
OpenNetworkConnection(addrConnect, false, {}, /*strDest=*/nullptr, ConnectionType::OUTBOUND_FULL_RELAY,
/*use_v2transport=*/false, MasternodeConn::IsConnection, probe);
}
Mutex NetEventsInterface::g_msgproc_mutex;
@ -4313,23 +4411,23 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres
return cache_entry.m_addrs_response_cache;
}
bool CConnman::AddNode(const std::string& strNode)
bool CConnman::AddNode(const AddedNodeParams& add)
{
LOCK(m_added_nodes_mutex);
for (const std::string& it : m_added_nodes) {
if (strNode == it) return false;
for (const auto& it : m_added_node_params) {
if (add.m_added_node == it.m_added_node) return false;
}
m_added_nodes.push_back(strNode);
m_added_node_params.push_back(add);
return true;
}
bool CConnman::RemoveAddedNode(const std::string& strNode)
{
LOCK(m_added_nodes_mutex);
for(std::vector<std::string>::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) {
if (strNode == *it) {
m_added_nodes.erase(it);
for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) {
if (strNode == it->m_added_node) {
m_added_node_params.erase(it);
return true;
}
}
@ -4677,6 +4775,15 @@ ServiceFlags CConnman::GetLocalServices() const
return nLocalServices;
}
static std::unique_ptr<Transport> MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept
{
if (use_v2transport) {
return std::make_unique<V2Transport>(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION);
} else {
return std::make_unique<V1Transport>(id, SER_NETWORK, INIT_PROTO_VERSION);
}
}
CNode::CNode(NodeId idIn,
std::shared_ptr<Sock> sock,
const CAddress& addrIn,
@ -4687,13 +4794,14 @@ CNode::CNode(NodeId idIn,
ConnectionType conn_type_in,
bool inbound_onion,
CNodeOptions&& node_opts)
: m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
: m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)},
m_permission_flags{node_opts.permission_flags},
m_sock{sock},
m_connected{GetTime<std::chrono::seconds>()},
addr{addrIn},
addrBind{addrBindIn},
m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn},
m_dest(addrNameIn),
m_inbound_onion{inbound_onion},
m_prefer_evict{node_opts.prefer_evict},
nKeyedNetGroup{nKeyedNetGroupIn},
@ -4872,6 +4980,33 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
}
void CConnman::PerformReconnections()
{
AssertLockNotHeld(m_reconnections_mutex);
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
while (true) {
// Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
decltype(m_reconnections) todo;
{
LOCK(m_reconnections_mutex);
if (m_reconnections.empty()) break;
todo.splice(todo.end(), m_reconnections, m_reconnections.begin());
}
auto& item = *todo.begin();
OpenNetworkConnection(item.addr_connect,
// We only reconnect if the first attempt to connect succeeded at
// connection time, but then failed after the CNode object was
// created. Since we already know connecting is possible, do not
// count failure to reconnect.
/*fCountFailure=*/false,
std::move(item.grant),
item.destination.empty() ? nullptr : item.destination.c_str(),
item.conn_type,
item.use_v2transport);
}
}
void CaptureMessageToFile(const CAddress& addr,
const std::string& msg_type,
Span<const unsigned char> data,

View File

@ -114,6 +114,8 @@ static const bool DEFAULT_FIXEDSEEDS = true;
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
static constexpr bool DEFAULT_V2_TRANSPORT{false};
#if defined USE_KQUEUE
#define DEFAULT_SOCKETEVENTS "kqueue"
#elif defined USE_EPOLL
@ -126,9 +128,13 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
typedef int64_t NodeId;
struct AddedNodeInfo
{
std::string strAddedNode;
struct AddedNodeParams {
std::string m_added_node;
bool m_use_v2transport;
};
struct AddedNodeInfo {
AddedNodeParams m_params;
CService resolvedAddress;
bool fConnected;
bool fInbound;
@ -262,6 +268,10 @@ public:
uint256 verifiedPubKeyHash;
bool m_masternode_connection;
ConnectionType m_conn_type;
/** Transport protocol type. */
TransportProtocolType m_transport_type;
/** BIP324 session id string in hex, if any. */
std::string m_session_id;
};
@ -298,6 +308,15 @@ class Transport {
public:
virtual ~Transport() {}
struct Info
{
TransportProtocolType transport_type;
std::optional<uint256> session_id;
};
/** Retrieve information about this transport. */
virtual Info GetInfo() const noexcept = 0;
// 1. Receiver side functions, for decoding bytes received on the wire into transport protocol
// agnostic CNetMessage (message type & payload) objects.
@ -391,6 +410,11 @@ public:
/** Return the memory usage of this transport attributable to buffered data to send. */
virtual size_t GetSendMemoryUsage() const noexcept = 0;
// 3. Miscellaneous functions.
/** Whether upon disconnections, a reconnect with V1 is warranted. */
virtual bool ShouldReconnectV1() const noexcept = 0;
};
class V1Transport final : public Transport
@ -451,6 +475,8 @@ public:
return WITH_LOCK(m_recv_mutex, return CompleteInternal());
}
Info GetInfo() const noexcept override;
bool ReceivedBytes(Span<const uint8_t>& msg_bytes) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex)
{
AssertLockNotHeld(m_recv_mutex);
@ -470,6 +496,7 @@ public:
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
bool ShouldReconnectV1() const noexcept override { return false; }
};
class V2Transport final : public Transport
@ -638,6 +665,8 @@ private:
std::string m_send_type GUARDED_BY(m_send_mutex);
/** Current sender state. */
SendState m_send_state GUARDED_BY(m_send_mutex);
/** Whether we've sent at least 24 bytes (which would trigger disconnect for V1 peers). */
bool m_sent_v1_header_worth GUARDED_BY(m_send_mutex) {false};
/** Change the receive state. */
void SetReceiveState(RecvState recv_state) noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
@ -683,6 +712,10 @@ public:
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
// Miscellaneous functions.
bool ShouldReconnectV1() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex, !m_send_mutex);
Info GetInfo() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
};
struct CNodeOptions
@ -691,6 +724,7 @@ struct CNodeOptions
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
bool prefer_evict = false;
size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000};
bool use_v2transport = false;
};
/** Information about a peer */
@ -739,6 +773,8 @@ public:
// Bind address of our side of the connection
const CAddress addrBind;
const std::string m_addr_name;
/** The pszDest argument provided to ConnectNode(). Only used for reconnections. */
const std::string m_dest;
//! Whether this peer is an inbound onion, i.e. connected via our Tor onion service.
const bool m_inbound_onion;
std::atomic<int> nNumWarningsSkipped{0};
@ -1197,7 +1233,11 @@ public:
vWhitelistedRange = connOptions.vWhitelistedRange;
{
LOCK(m_added_nodes_mutex);
m_added_nodes = connOptions.m_added_nodes;
for (const std::string& added_node : connOptions.m_added_nodes) {
// -addnode cli arg does not currently have a way to signal BIP324 support
m_added_node_params.push_back({added_node, false});
}
}
socketEventsMode = connOptions.socketEventsMode;
m_onion_binds = connOptions.onion_binds;
@ -1237,8 +1277,8 @@ public:
IsConnection,
};
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound,
const char* strDest, ConnectionType conn_type,
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound,
const char* strDest, ConnectionType conn_type, bool use_v2transport,
MasternodeConn masternode_connection = MasternodeConn::IsNotConnection,
MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex, !mutexMsgProc);
@ -1425,7 +1465,7 @@ public:
// Count the number of block-relay-only peers we have over our limit.
int GetExtraBlockRelayCount() const;
bool AddNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
bool AddNode(const AddedNodeParams& add) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
bool RemoveAddedNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
std::vector<AddedNodeInfo> GetAddedNodeInfo() const EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
@ -1541,12 +1581,12 @@ private:
bool InitBinds(const Options& options);
void ThreadOpenAddedConnections()
EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc);
EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex, !mutexMsgProc);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ProcessAddrFetch()
EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc);
void ThreadOpenConnections(const std::vector<std::string> connect, CDeterministicMNManager& dmnman)
EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !mutexMsgProc);
EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex, !mutexMsgProc);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void AcceptConnection(const ListenSocket& hListenSocket, CMasternodeSync& mn_sync)
@ -1566,7 +1606,7 @@ private:
const CAddress& addr,
CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void DisconnectNodes();
void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex);
void NotifyNumConnectionsChanged(CMasternodeSync& mn_sync);
void CalculateNumConnectionsChangedStats();
/** Return true if the peer is inactive and should be disconnected. */
@ -1648,7 +1688,8 @@ private:
void SocketHandlerListening(const std::set<SOCKET>& recv_set, CMasternodeSync& mn_sync)
EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadSocketHandler(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
void ThreadSocketHandler(CMasternodeSync& mn_sync)
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex);
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
void ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,
CMasternodeSync& mn_sync)
@ -1668,8 +1709,7 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest = nullptr, bool fCountFailure = false, ConnectionType conn_type = ConnectionType::OUTBOUND_FULL_RELAY)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
@ -1729,7 +1769,10 @@ private:
const NetGroupManager& m_netgroupman;
std::deque<std::string> m_addr_fetches GUARDED_BY(m_addr_fetches_mutex);
Mutex m_addr_fetches_mutex;
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex);
// connection string and whether to use v2 p2p
std::vector<AddedNodeParams> m_added_node_params GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
std::list<CNode*> m_nodes_disconnected;
@ -1896,6 +1939,29 @@ private:
*/
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
/**
* Mutex protecting m_reconnections.
*/
Mutex m_reconnections_mutex;
/** Struct for entries in m_reconnections. */
struct ReconnectionInfo
{
CAddress addr_connect;
CSemaphoreGrant grant;
std::string destination;
ConnectionType conn_type;
bool use_v2transport;
};
/**
* List of reconnections we have to make.
*/
std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex);
/** Attempt reconnections, if m_reconnections non-empty. */
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc, !m_reconnections_mutex, !m_unused_i2p_sessions_mutex);
/**
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
* unexpectedly use too much memory.

View File

@ -3596,11 +3596,14 @@ void PeerManagerImpl::ProcessMessage(
return;
}
if (!pfrom.IsInboundConn()) {
LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n",
// Log succesful connections unconditionally for outbound, but not for inbound as those
// can be triggered by an attacker at high rate.
if (!pfrom.IsInboundConn() || LogAcceptCategory(BCLog::NET)) {
LogPrintf("New %s %s peer connected: version: %d, blocks=%d, peer=%d%s\n",
pfrom.ConnectionTypeAsString(),
TransportTypeAsString(pfrom.m_transport->GetInfo().transport_type),
pfrom.nVersion.load(), peer->m_starting_height,
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""),
pfrom.ConnectionTypeAsString());
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""));
}
if (is_masternode && !pfrom.m_masternode_probe_connection) {

View File

@ -24,3 +24,17 @@ std::string ConnectionTypeAsString(ConnectionType conn_type)
assert(false);
}
std::string TransportTypeAsString(TransportProtocolType transport_type)
{
switch (transport_type) {
case TransportProtocolType::DETECTING:
return "detecting";
case TransportProtocolType::V1:
return "v1";
case TransportProtocolType::V2:
return "v2";
} // no default case, so the compiler can warn about missing cases
assert(false);
}

View File

@ -6,6 +6,7 @@
#define BITCOIN_NODE_CONNECTION_TYPES_H
#include <string>
#include <stdint.h>
/** Different types of connections to a peer. This enum encapsulates the
* information we have available at the time of opening or accepting the
@ -79,4 +80,14 @@ enum class ConnectionType {
/** Convert ConnectionType enum to a string value */
std::string ConnectionTypeAsString(ConnectionType conn_type);
/** Transport layer version */
enum class TransportProtocolType : uint8_t {
DETECTING, //!< Peer could be v1 or v2
V1, //!< Unencrypted, plaintext protocol
V2, //!< BIP324 protocol
};
/** Convert TransportProtocolType enum to a string value */
std::string TransportTypeAsString(TransportProtocolType transport_type);
#endif // BITCOIN_NODE_CONNECTION_TYPES_H

View File

@ -338,6 +338,7 @@ static std::string serviceFlagToStr(size_t bit)
case NODE_COMPACT_FILTERS: return "COMPACT_FILTERS";
case NODE_NETWORK_LIMITED: return "NETWORK_LIMITED";
case NODE_HEADERS_COMPRESSED: return "HEADERS_COMPRESSED";
case NODE_P2P_V2: return "P2P_V2";
// Not using default, so we get warned when a case is missing
}

View File

@ -328,6 +328,9 @@ enum ServiceFlags : uint64_t {
// description will be provided
NODE_HEADERS_COMPRESSED = (1 << 11),
// NODE_P2P_V2 means the node supports BIP324 transport
NODE_P2P_V2 = (1 << 12),
// Bits 24-31 are reserved for temporary experiments. Just pick a bit that
// isn't getting used, or one not being used much, and notify the
// bitcoin-development mailing list. Remember that service bits are just

View File

@ -232,6 +232,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "addpeeraddress", 2, "tried"},
{ "sendmsgtopeer", 0, "peer_id" },
{ "stop", 0, "wait" },
{ "addnode", 2, "v2transport" },
{ "verifychainlock", 2, "blockHeight" },
{ "verifyislock", 3, "maxHeight" },
{ "submitchainlock", 2, "blockHeight" },

View File

@ -40,6 +40,12 @@ const std::vector<std::string> CONNECTION_TYPE_DOC{
"feeler (short-lived automatic connection for testing addresses)"
};
const std::vector<std::string> TRANSPORT_TYPE_DOC{
"detecting (peer could be v1 or v2)",
"v1 (plaintext transport protocol)",
"v2 (BIP324 encrypted transport protocol)"
};
static RPCHelpMan getconnectioncount()
{
return RPCHelpMan{"getconnectioncount",
@ -166,6 +172,8 @@ static RPCHelpMan getpeerinfo()
{RPCResult::Type::STR, "connection_type", "Type of connection: \n" + Join(CONNECTION_TYPE_DOC, ",\n") + ".\n"
"Please note this output is unlikely to be stable in upcoming releases as we iterate to\n"
"best capture connection behaviors."},
{RPCResult::Type::STR, "transport_protocol_type", "Type of transport protocol: \n" + Join(TRANSPORT_TYPE_DOC, ",\n") + ".\n"},
{RPCResult::Type::STR, "session_id", "The session ID for this connection, or \"\" if there is none (\"v2\" transport protocol only).\n"},
}},
}}},
RPCExamples{
@ -280,6 +288,8 @@ static RPCHelpMan getpeerinfo()
}
obj.pushKV("bytesrecv_per_msg", recvPerMsgType);
obj.pushKV("connection_type", ConnectionTypeAsString(stats.m_conn_type));
obj.pushKV("transport_protocol_type", TransportTypeAsString(stats.m_transport_type));
obj.pushKV("session_id", stats.m_session_id);
ret.push_back(obj);
}
@ -301,11 +311,12 @@ static RPCHelpMan addnode()
{
{"node", RPCArg::Type::STR, RPCArg::Optional::NO, "The node (see getpeerinfo for nodes)"},
{"command", RPCArg::Type::STR, RPCArg::Optional::NO, "'add' to add a node to the list, 'remove' to remove a node from the list, 'onetry' to try a connection to the node once"},
{"v2transport", RPCArg::Type::BOOL, RPCArg::Default{false}, "Attempt to connect using BIP324 v2 transport protocol (ignored for 'remove' command)"},
},
RPCResult{RPCResult::Type::NONE, "", ""},
RPCExamples{
HelpExampleCli("addnode", "\"192.168.0.6:9999\" \"onetry\"")
+ HelpExampleRpc("addnode", "\"192.168.0.6:9999\", \"onetry\"")
HelpExampleCli("addnode", "\"192.168.0.6:9999\" \"onetry\" true")
+ HelpExampleRpc("addnode", "\"192.168.0.6:9999\", \"onetry\" true")
},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
@ -321,17 +332,22 @@ static RPCHelpMan addnode()
CConnman& connman = EnsureConnman(node);
std::string strNode = request.params[0].get_str();
bool use_v2transport = request.params[2].isNull() ? false : request.params[2].get_bool();
if (use_v2transport && !(node.connman->GetLocalServices() & NODE_P2P_V2)) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Error: v2transport requested but not enabled (see -v2transport)");
}
if (strCommand == "onetry")
{
CAddress addr;
connman.OpenNetworkConnection(addr, false, nullptr, strNode.c_str(), ConnectionType::MANUAL);
connman.OpenNetworkConnection(addr, false, /*grant_outbound=*/{}, strNode.c_str(), ConnectionType::MANUAL, use_v2transport);
return NullUniValue;
}
if (strCommand == "add")
{
if (!connman.AddNode(strNode)) {
if (!connman.AddNode({strNode, use_v2transport})) {
throw JSONRPCError(RPC_CLIENT_NODE_ALREADY_ADDED, "Error: Node already added");
}
}
@ -492,7 +508,7 @@ static RPCHelpMan getaddednodeinfo()
if (!request.params[0].isNull()) {
bool found = false;
for (const AddedNodeInfo& info : vInfo) {
if (info.strAddedNode == request.params[0].get_str()) {
if (info.m_params.m_added_node == request.params[0].get_str()) {
vInfo.assign(1, info);
found = true;
break;
@ -507,7 +523,7 @@ static RPCHelpMan getaddednodeinfo()
for (const AddedNodeInfo& info : vInfo) {
UniValue obj(UniValue::VOBJ);
obj.pushKV("addednode", info.strAddedNode);
obj.pushKV("addednode", info.m_params.m_added_node);
obj.pushKV("connected", info.fConnected);
UniValue addresses(UniValue::VARR);
if (info.fConnected) {

View File

@ -354,6 +354,10 @@ using ReadLock = SharedLock<typename std::remove_reference<typename std::remove_
#define WITH_LOCK(cs, code) [&]() -> decltype(auto) { LOCK(cs); code; }()
#define WITH_READ_LOCK(cs, code) [&]() -> decltype(auto) { READ_LOCK(cs); code; }()
/** An implementation of a semaphore.
*
* See https://en.wikipedia.org/wiki/Semaphore_(programming)
*/
class CSemaphore
{
private:
@ -362,25 +366,33 @@ private:
int value;
public:
explicit CSemaphore(int init) : value(init) {}
explicit CSemaphore(int init) noexcept : value(init) {}
void wait()
// Disallow default construct, copy, move.
CSemaphore() = delete;
CSemaphore(const CSemaphore&) = delete;
CSemaphore(CSemaphore&&) = delete;
CSemaphore& operator=(const CSemaphore&) = delete;
CSemaphore& operator=(CSemaphore&&) = delete;
void wait() noexcept
{
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [&]() { return value >= 1; });
value--;
}
bool try_wait()
bool try_wait() noexcept
{
std::lock_guard<std::mutex> lock(mutex);
if (value < 1)
if (value < 1) {
return false;
}
value--;
return true;
}
void post()
void post() noexcept
{
{
std::lock_guard<std::mutex> lock(mutex);
@ -398,45 +410,64 @@ private:
bool fHaveGrant;
public:
void Acquire()
void Acquire() noexcept
{
if (fHaveGrant)
if (fHaveGrant) {
return;
}
sem->wait();
fHaveGrant = true;
}
void Release()
void Release() noexcept
{
if (!fHaveGrant)
if (!fHaveGrant) {
return;
}
sem->post();
fHaveGrant = false;
}
bool TryAcquire()
bool TryAcquire() noexcept
{
if (!fHaveGrant && sem->try_wait())
if (!fHaveGrant && sem->try_wait()) {
fHaveGrant = true;
}
return fHaveGrant;
}
void MoveTo(CSemaphoreGrant& grant)
// Disallow copy.
CSemaphoreGrant(const CSemaphoreGrant&) = delete;
CSemaphoreGrant& operator=(const CSemaphoreGrant&) = delete;
// Allow move.
CSemaphoreGrant(CSemaphoreGrant&& other) noexcept
{
grant.Release();
grant.sem = sem;
grant.fHaveGrant = fHaveGrant;
fHaveGrant = false;
sem = other.sem;
fHaveGrant = other.fHaveGrant;
other.fHaveGrant = false;
other.sem = nullptr;
}
CSemaphoreGrant() : sem(nullptr), fHaveGrant(false) {}
explicit CSemaphoreGrant(CSemaphore& sema, bool fTry = false) : sem(&sema), fHaveGrant(false)
CSemaphoreGrant& operator=(CSemaphoreGrant&& other) noexcept
{
if (fTry)
Release();
sem = other.sem;
fHaveGrant = other.fHaveGrant;
other.fHaveGrant = false;
other.sem = nullptr;
return *this;
}
CSemaphoreGrant() noexcept : sem(nullptr), fHaveGrant(false) {}
explicit CSemaphoreGrant(CSemaphore& sema, bool fTry = false) noexcept : sem(&sema), fHaveGrant(false)
{
if (fTry) {
TryAcquire();
else
} else {
Acquire();
}
}
~CSemaphoreGrant()
@ -444,7 +475,7 @@ public:
Release();
}
operator bool() const
explicit operator bool() const noexcept
{
return fHaveGrant;
}

View File

@ -54,7 +54,7 @@ FUZZ_TARGET_INIT(connman, initialize_connman)
random_string = fuzzed_data_provider.ConsumeRandomLengthString(64);
},
[&] {
connman.AddNode(random_string);
connman.AddNode({random_string, fuzzed_data_provider.ConsumeBool()});
},
[&] {
connman.CheckIncomingNonce(fuzzed_data_provider.ConsumeIntegral<uint64_t>());

View File

@ -323,6 +323,9 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Make sure all expected messages were received.
assert(expected[0].empty());
assert(expected[1].empty());
// Compare session IDs.
assert(transports[0]->GetInfo().session_id == transports[1]->GetInfo().session_id);
}
std::unique_ptr<Transport> MakeV1Transport(NodeId nodeid) noexcept

View File

@ -1328,6 +1328,14 @@ public:
SendPacket(contents);
}
/** Test whether the transport's session ID matches the session ID we expect. */
void CompareSessionIDs() const
{
auto info = m_transport.GetInfo();
BOOST_CHECK(info.session_id);
BOOST_CHECK(uint256(MakeUCharSpan(m_cipher.GetSessionID())) == *info.session_id);
}
/** Introduce a bit error in the data scheduled to be sent. */
void Damage()
{
@ -1353,6 +1361,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test)
BOOST_REQUIRE(ret && ret->empty());
tester.ReceiveGarbage();
tester.ReceiveVersion();
tester.CompareSessionIDs();
auto msg_data_1 = g_insecure_rand_ctx.randbytes<uint8_t>(InsecureRandRange(100000));
auto msg_data_2 = g_insecure_rand_ctx.randbytes<uint8_t>(InsecureRandRange(1000));
tester.SendMessage(uint8_t(4), msg_data_1); // cmpctblock short id
@ -1393,6 +1402,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test)
BOOST_REQUIRE(ret && ret->empty());
tester.ReceiveGarbage();
tester.ReceiveVersion();
tester.CompareSessionIDs();
auto msg_data_1 = g_insecure_rand_ctx.randbytes<uint8_t>(InsecureRandRange(100000));
auto msg_data_2 = g_insecure_rand_ctx.randbytes<uint8_t>(InsecureRandRange(1000));
tester.SendMessage(uint8_t(14), msg_data_1); // inv short id
@ -1446,6 +1456,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test)
BOOST_REQUIRE(ret && ret->empty());
tester.ReceiveGarbage();
tester.ReceiveVersion();
tester.CompareSessionIDs();
for (unsigned d = 0; d < num_decoys_1; ++d) {
auto decoy_data = g_insecure_rand_ctx.randbytes<uint8_t>(InsecureRandRange(1000));
tester.SendPacket(/*content=*/decoy_data, /*aad=*/{}, /*ignore=*/true);
@ -1523,6 +1534,7 @@ BOOST_AUTO_TEST_CASE(v2transport_test)
BOOST_REQUIRE(ret && ret->empty());
tester.ReceiveGarbage();
tester.ReceiveVersion();
tester.CompareSessionIDs();
auto msg_data_1 = g_insecure_rand_ctx.randbytes<uint8_t>(MAX_PROTOCOL_MESSAGE_LENGTH); // test that receiving max size payload works
auto msg_data_2 = g_insecure_rand_ctx.randbytes<uint8_t>(MAX_PROTOCOL_MESSAGE_LENGTH); // test that sending max size payload works
tester.SendMessage(uint8_t(InsecureRandRange(223) + 33), {}); // unknown short id

View File

@ -62,6 +62,7 @@ constexpr ServiceFlags ALL_SERVICE_FLAGS[]{
NODE_COMPACT_FILTERS,
NODE_NETWORK_LIMITED,
NODE_HEADERS_COMPRESSED,
NODE_P2P_V2,
};
constexpr NetPermissionFlags ALL_NET_PERMISSION_FLAGS[]{

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
# Copyright (c) 2021-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test v2 transport
"""
from test_framework.messages import NODE_P2P_V2
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
class V2TransportTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 5
self.extra_args = [["-v2transport=1"], ["-v2transport=1"], ["-v2transport=0"], ["-v2transport=0"], ["-v2transport=0"]]
def run_test(self):
sending_handshake = "start sending v2 handshake to peer"
downgrading_to_v1 = "retrying with v1 transport protocol for peer"
self.disconnect_nodes(0, 1)
self.disconnect_nodes(1, 2)
self.disconnect_nodes(2, 3)
self.disconnect_nodes(3, 4)
# verify local services
network_info = self.nodes[2].getnetworkinfo()
assert_equal(int(network_info["localservices"], 16) & NODE_P2P_V2, 0)
assert "P2P_V2" not in network_info["localservicesnames"]
network_info = self.nodes[1].getnetworkinfo()
assert_equal(int(network_info["localservices"], 16) & NODE_P2P_V2, NODE_P2P_V2)
assert "P2P_V2" in network_info["localservicesnames"]
# V2 nodes can sync with V2 nodes
assert_equal(self.nodes[0].getblockcount(), 0)
assert_equal(self.nodes[1].getblockcount(), 0)
with self.nodes[0].assert_debug_log(expected_msgs=[sending_handshake],
unexpected_msgs=[downgrading_to_v1]):
self.connect_nodes(0, 1, peer_advertises_v2=True)
self.generate(self.nodes[0], 5, sync_fun=lambda: self.sync_all(self.nodes[0:2]))
assert_equal(self.nodes[1].getblockcount(), 5)
# verify there is a v2 connection between node 0 and 1
node_0_info = self.nodes[0].getpeerinfo()
node_1_info = self.nodes[0].getpeerinfo()
assert_equal(len(node_0_info), 1)
assert_equal(len(node_1_info), 1)
assert_equal(node_0_info[0]["transport_protocol_type"], "v2")
assert_equal(node_1_info[0]["transport_protocol_type"], "v2")
assert_equal(len(node_0_info[0]["session_id"]), 64)
assert_equal(len(node_1_info[0]["session_id"]), 64)
assert_equal(node_0_info[0]["session_id"], node_1_info[0]["session_id"])
# V1 nodes can sync with each other
assert_equal(self.nodes[2].getblockcount(), 0)
assert_equal(self.nodes[3].getblockcount(), 0)
with self.nodes[2].assert_debug_log(expected_msgs=[],
unexpected_msgs=[sending_handshake, downgrading_to_v1]):
self.connect_nodes(2, 3, peer_advertises_v2=False)
self.generate(self.nodes[2], 8, sync_fun=lambda: self.sync_all(self.nodes[2:4]))
assert_equal(self.nodes[3].getblockcount(), 8)
assert self.nodes[0].getbestblockhash() != self.nodes[2].getbestblockhash()
# verify there is a v1 connection between node 2 and 3
node_2_info = self.nodes[2].getpeerinfo()
node_3_info = self.nodes[3].getpeerinfo()
assert_equal(len(node_2_info), 1)
assert_equal(len(node_3_info), 1)
assert_equal(node_2_info[0]["transport_protocol_type"], "v1")
assert_equal(node_3_info[0]["transport_protocol_type"], "v1")
assert_equal(len(node_2_info[0]["session_id"]), 0)
assert_equal(len(node_3_info[0]["session_id"]), 0)
# V1 nodes can sync with V2 nodes
self.disconnect_nodes(0, 1)
self.disconnect_nodes(2, 3)
with self.nodes[2].assert_debug_log(expected_msgs=[],
unexpected_msgs=[sending_handshake, downgrading_to_v1]):
self.connect_nodes(2, 1, peer_advertises_v2=False) # cannot enable v2 on v1 node
self.sync_all(self.nodes[1:3])
assert_equal(self.nodes[1].getblockcount(), 8)
assert self.nodes[0].getbestblockhash() != self.nodes[1].getbestblockhash()
# verify there is a v1 connection between node 1 and 2
node_1_info = self.nodes[1].getpeerinfo()
node_2_info = self.nodes[2].getpeerinfo()
assert_equal(len(node_1_info), 1)
assert_equal(len(node_2_info), 1)
assert_equal(node_1_info[0]["transport_protocol_type"], "v1")
assert_equal(node_2_info[0]["transport_protocol_type"], "v1")
assert_equal(len(node_1_info[0]["session_id"]), 0)
assert_equal(len(node_2_info[0]["session_id"]), 0)
# V2 nodes can sync with V1 nodes
self.disconnect_nodes(1, 2)
with self.nodes[0].assert_debug_log(expected_msgs=[],
unexpected_msgs=[sending_handshake, downgrading_to_v1]):
self.connect_nodes(0, 3, peer_advertises_v2=False)
self.sync_all([self.nodes[0], self.nodes[3]])
assert_equal(self.nodes[0].getblockcount(), 8)
# verify there is a v1 connection between node 0 and 3
node_0_info = self.nodes[0].getpeerinfo()
node_3_info = self.nodes[3].getpeerinfo()
assert_equal(len(node_0_info), 1)
assert_equal(len(node_3_info), 1)
assert_equal(node_0_info[0]["transport_protocol_type"], "v1")
assert_equal(node_3_info[0]["transport_protocol_type"], "v1")
assert_equal(len(node_0_info[0]["session_id"]), 0)
assert_equal(len(node_3_info[0]["session_id"]), 0)
# V2 node mines another block and everyone gets it
self.connect_nodes(0, 1, peer_advertises_v2=True)
self.connect_nodes(1, 2, peer_advertises_v2=False)
self.generate(self.nodes[1], 1, sync_fun=lambda: self.sync_all(self.nodes[0:4]))
assert_equal(self.nodes[0].getblockcount(), 9) # sync_all() verifies tip hashes match
# V1 node mines another block and everyone gets it
self.generate(self.nodes[3], 2, sync_fun=lambda: self.sync_all(self.nodes[0:4]))
assert_equal(self.nodes[2].getblockcount(), 11) # sync_all() verifies tip hashes match
assert_equal(self.nodes[4].getblockcount(), 0)
# Peer 4 is v1 p2p, but is falsely advertised as v2.
with self.nodes[1].assert_debug_log(expected_msgs=[sending_handshake, downgrading_to_v1]):
self.connect_nodes(1, 4, peer_advertises_v2=True)
self.sync_all()
assert_equal(self.nodes[4].getblockcount(), 11)
if __name__ == '__main__':
V2TransportTest().main()

View File

@ -145,11 +145,13 @@ class NetTest(DashTestFramework):
"relaytxes": False,
"services": "0000000000000000",
"servicesnames": [],
"session_id": "",
"startingheight": -1,
"subver": "",
"synced_blocks": -1,
"synced_headers": -1,
"timeoffset": 0,
"transport_protocol_type": "v1",
"version": 0,
},
)

View File

@ -51,6 +51,7 @@ NODE_BLOOM = (1 << 2)
NODE_COMPACT_FILTERS = (1 << 6)
NODE_NETWORK_LIMITED = (1 << 10)
NODE_HEADERS_COMPRESSED = (1 << 11)
NODE_P2P_V2 = (1 << 12)
MSG_TX = 1
MSG_BLOCK = 2

View File

@ -222,6 +222,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
parser.add_argument("--randomseed", type=int,
help="set a random seed for deterministically reproducing a previous test run")
parser.add_argument('--timeout-factor', dest="timeout_factor", type=float, default=1.0, help='adjust test timeouts by a factor. Setting it to 0 disables all timeouts')
parser.add_argument("--v2transport", dest="v2transport", default=False, action="store_true",
help="use BIP324 v2 connections between all nodes by default")
group = parser.add_mutually_exclusive_group()
group.add_argument("--descriptors", action='store_const', const=True,
@ -525,6 +527,9 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
assert_equal(len(binary_cli), num_nodes)
old_num_nodes = len(self.nodes)
for i in range(num_nodes):
args = extra_args[i]
if self.options.v2transport and ("-v2transport=0" not in args and "-v2transport=1" not in args):
args.append("-v2transport=1")
test_node_i = TestNode(
old_num_nodes + i,
get_datadir_path(self.options.tmpdir, old_num_nodes + i),
@ -540,7 +545,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
coverage_dir=self.options.coveragedir,
cwd=self.options.tmpdir,
extra_conf=extra_confs[i],
extra_args=extra_args[i],
extra_args=args,
use_cli=self.options.usecli,
start_perf=self.options.perf,
use_valgrind=self.options.valgrind,
@ -692,7 +697,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
def wait_for_node_exit(self, i, timeout):
self.nodes[i].process.wait(timeout)
def connect_nodes(self, a, b):
def connect_nodes(self, a, b, *, peer_advertises_v2=None):
# A node cannot connect to itself, bail out early
if (a == b):
return
@ -700,7 +705,16 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
from_connection = self.nodes[a]
to_connection = self.nodes[b]
ip_port = "127.0.0.1:" + str(p2p_port(b))
from_connection.addnode(ip_port, "onetry")
if peer_advertises_v2 is None:
peer_advertises_v2 = self.options.v2transport
if peer_advertises_v2:
from_connection.addnode(node=ip_port, command="onetry", v2transport=True)
else:
# skip the optional third argument (default false) for
# compatibility with older clients
from_connection.addnode(ip_port, "onetry")
# Use subversion as peer id. Test nodes have their node number appended to the user agent string
from_connection_subver = from_connection.getnetworkinfo()['subversion']
@ -721,13 +735,13 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
assert peer is not None, "Error: peer disconnected"
return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv
self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 24))
self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 24))
self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'verack', 21))
self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'verack', 21))
# The message bytes are counted before processing the message, so make
# sure it was fully processed by waiting for a ping.
self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 32))
self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 32))
self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 29))
self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 29))
def disconnect_nodes(self, a, b):
# A node cannot disconnect from itself, bail out early
@ -1161,11 +1175,11 @@ class DashTestFramework(BitcoinTestFramework):
# controller node is the only node that has an extra option allowing it to submit sporks
append_config(self.nodes[0].datadir, ["sporkkey=cP4EKFyJsHT39LDqgdcB43Y3YXjNyjb5Fuas1GQSeAtjnZWmZEQK"])
def connect_nodes(self, a, b):
def connect_nodes(self, a, b, *, peer_advertises_v2=None):
for mn2 in self.mninfo:
if mn2.node is not None:
mn2.node.setmnthreadactive(False)
super().connect_nodes(a, b)
super().connect_nodes(a, b, peer_advertises_v2=peer_advertises_v2)
for mn2 in self.mninfo:
if mn2.node is not None:
mn2.node.setmnthreadactive(True)

View File

@ -175,6 +175,7 @@ BASE_SCRIPTS = [
'mempool_reorg.py',
'mempool_persist.py',
'p2p_block_sync.py',
'p2p_block_sync.py --v2transport',
'wallet_multiwallet.py --legacy-wallet',
'wallet_multiwallet.py --descriptors',
'wallet_multiwallet.py --usecli',
@ -204,12 +205,15 @@ BASE_SCRIPTS = [
'wallet_groups.py --legacy-wallet',
'wallet_groups.py --descriptors',
'p2p_compactblocks_hb.py',
'p2p_compactblocks_hb.py --v2transport',
'p2p_disconnect_ban.py',
'p2p_disconnect_ban.py --v2transport',
'feature_addressindex.py',
'feature_timestampindex.py',
'feature_spentindex.py',
'rpc_decodescript.py',
'rpc_blockchain.py',
'rpc_blockchain.py --v2transport',
'rpc_deprecated.py',
'wallet_disable.py --legacy-wallet',
'wallet_disable.py --descriptors',
@ -231,8 +235,11 @@ BASE_SCRIPTS = [
'mining_prioritisetransaction.py',
'p2p_invalid_locator.py',
'p2p_invalid_block.py',
'p2p_invalid_block.py --v2transport',
'p2p_invalid_messages.py',
'p2p_invalid_tx.py',
'p2p_invalid_tx.py --v2transport',
'p2p_v2_transport.py',
'feature_assumevalid.py',
'example_test.py',
'wallet_txn_doublespend.py --legacy-wallet',
@ -257,9 +264,12 @@ BASE_SCRIPTS = [
'wallet_importprunedfunds.py --legacy-wallet',
'wallet_importprunedfunds.py --descriptors',
'p2p_leak_tx.py',
'p2p_leak_tx.py --v2transport',
'p2p_eviction.py',
'p2p_ibd_stalling.py',
'p2p_ibd_stalling.py --v2transport',
'p2p_net_deadlock.py',
'p2p_net_deadlock.py --v2transport',
'rpc_signmessage.py',
'rpc_generateblock.py',
'rpc_generate.py',