merge bitcoin#27324: bitcoin#27257 follow-ups

This commit is contained in:
Kittywhiskers Van Gogh 2024-09-06 08:27:39 +00:00
parent 9023dd25af
commit ab11e0f998
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
5 changed files with 33 additions and 27 deletions

View File

@ -617,7 +617,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
pszDest ? pszDest : "", pszDest ? pszDest : "",
conn_type, conn_type,
/*inbound_onion=*/false, /*inbound_onion=*/false,
CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session) }); CNodeOptions{
.i2p_sam_session = std::move(i2p_transient_session),
.recv_flood_size = nReceiveFloodSize,
});
pnode->AddRef(); pnode->AddRef();
::g_stats_client->inc("peers.connect", 1.0f); ::g_stats_client->inc("peers.connect", 1.0f);
@ -1115,7 +1118,7 @@ bool CConnman::AttemptToEvictConnection()
.m_is_local = node->addr.IsLocal(), .m_is_local = node->addr.IsLocal(),
.m_network = node->ConnectedThroughNetwork(), .m_network = node->ConnectedThroughNetwork(),
.m_noban = node->HasPermission(NetPermissionFlags::NoBan), .m_noban = node->HasPermission(NetPermissionFlags::NoBan),
.m_conn_type = node->GetConnectionType(), .m_conn_type = node->m_conn_type,
}; };
vEvictionCandidates.push_back(candidate); vEvictionCandidates.push_back(candidate);
} }
@ -1279,8 +1282,9 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
ConnectionType::INBOUND, ConnectionType::INBOUND,
inbound_onion, inbound_onion,
CNodeOptions{ CNodeOptions{
.permission_flags = permission_flags, .permission_flags = permission_flags,
.prefer_evict = discouraged, .prefer_evict = discouraged,
.recv_flood_size = nReceiveFloodSize,
}); });
pnode->AddRef(); pnode->AddRef();
// If this flag is present, the user probably expect that RPC and QT report it as whitelisted (backward compatibility) // If this flag is present, the user probably expect that RPC and QT report it as whitelisted (backward compatibility)
@ -1339,7 +1343,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
// Count existing connections // Count existing connections
int existing_connections = WITH_LOCK(m_nodes_mutex, int existing_connections = WITH_LOCK(m_nodes_mutex,
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; });); return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
// Max connections of specified type already exist // Max connections of specified type already exist
if (max_connections != std::nullopt && existing_connections >= max_connections) return false; if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
@ -2088,7 +2092,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
} }
RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
if (notify) { if (notify) {
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize); pnode->MarkReceivedMsgsForProcessing();
WakeMessageHandler(); WakeMessageHandler();
} }
} }
@ -2464,7 +2468,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++; if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++;
// Make sure our persistent outbound slots belong to different netgroups. // Make sure our persistent outbound slots belong to different netgroups.
switch (pnode->GetConnectionType()) { switch (pnode->m_conn_type) {
// We currently don't take inbound connections into account. Since they are // We currently don't take inbound connections into account. Since they are
// free to make, an attacker could make them to prevent us from connecting to // free to make, an attacker could make them to prevent us from connecting to
// certain peers. // certain peers.
@ -4015,8 +4019,6 @@ ServiceFlags CConnman::GetLocalServices() const
return nLocalServices; return nLocalServices;
} }
unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
CNode::CNode(NodeId idIn, CNode::CNode(NodeId idIn,
std::shared_ptr<Sock> sock, std::shared_ptr<Sock> sock,
const CAddress& addrIn, const CAddress& addrIn,
@ -4037,9 +4039,10 @@ CNode::CNode(NodeId idIn,
m_inbound_onion{inbound_onion}, m_inbound_onion{inbound_onion},
m_prefer_evict{node_opts.prefer_evict}, m_prefer_evict{node_opts.prefer_evict},
nKeyedNetGroup{nKeyedNetGroupIn}, nKeyedNetGroup{nKeyedNetGroupIn},
m_conn_type{conn_type_in},
id{idIn}, id{idIn},
nLocalHostNonce{nLocalHostNonceIn}, nLocalHostNonce{nLocalHostNonceIn},
m_conn_type{conn_type_in}, m_recv_flood_size{node_opts.recv_flood_size},
m_i2p_sam_session{std::move(node_opts.i2p_sam_session)} m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
{ {
if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
@ -4055,7 +4058,7 @@ CNode::CNode(NodeId idIn,
} }
} }
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) void CNode::MarkReceivedMsgsForProcessing()
{ {
AssertLockNotHeld(m_msg_process_queue_mutex); AssertLockNotHeld(m_msg_process_queue_mutex);
@ -4069,10 +4072,10 @@ void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
LOCK(m_msg_process_queue_mutex); LOCK(m_msg_process_queue_mutex);
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg); m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
m_msg_process_queue_size += nSizeAdded; m_msg_process_queue_size += nSizeAdded;
fPauseRecv = m_msg_process_queue_size > recv_flood_size; fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
} }
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size) std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
{ {
LOCK(m_msg_process_queue_mutex); LOCK(m_msg_process_queue_mutex);
if (m_msg_process_queue.empty()) return std::nullopt; if (m_msg_process_queue.empty()) return std::nullopt;
@ -4081,7 +4084,7 @@ std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood
// Just take one message // Just take one message
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin()); msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
m_msg_process_queue_size -= msgs.front().m_raw_message_size; m_msg_process_queue_size -= msgs.front().m_raw_message_size;
fPauseRecv = m_msg_process_queue_size > recv_flood_size; fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
} }

View File

@ -277,6 +277,14 @@ public:
std::string m_type; std::string m_type;
CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {} CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {}
// Only one CNetMessage object will exist for the same message on either
// the receive or processing queue. For performance reasons we therefore
// delete the copy constructor and assignment operator to avoid the
// possibility of copying CNetMessage objects.
CNetMessage(CNetMessage&&) = default;
CNetMessage(const CNetMessage&) = delete;
CNetMessage& operator=(CNetMessage&&) = default;
CNetMessage& operator=(const CNetMessage&) = delete;
void SetVersion(int nVersionIn) void SetVersion(int nVersionIn)
{ {
@ -454,6 +462,7 @@ struct CNodeOptions
NetPermissionFlags permission_flags = NetPermissionFlags::None; NetPermissionFlags permission_flags = NetPermissionFlags::None;
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr; std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
bool prefer_evict = false; bool prefer_evict = false;
size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000};
}; };
/** Information about a peer */ /** Information about a peer */
@ -546,13 +555,10 @@ public:
std::atomic_bool fHasRecvData{false}; std::atomic_bool fHasRecvData{false};
std::atomic_bool fCanSendData{false}; std::atomic_bool fCanSendData{false};
const ConnectionType& GetConnectionType() const const ConnectionType m_conn_type;
{
return m_conn_type;
}
/** Move all messages from the received queue to the processing queue. */ /** Move all messages from the received queue to the processing queue. */
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) void MarkReceivedMsgsForProcessing()
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
/** Poll the next message from the processing queue of this connection. /** Poll the next message from the processing queue of this connection.
@ -560,7 +566,7 @@ public:
* Returns std::nullopt if the processing queue is empty, or a pair * Returns std::nullopt if the processing queue is empty, or a pair
* consisting of the message and a bool that indicates if the processing * consisting of the message and a bool that indicates if the processing
* queue has more entries. */ * queue has more entries. */
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size) std::optional<std::pair<CNetMessage, bool>> PollMessage()
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
/** Account for the total size of a sent message in the per msg type connection stats. */ /** Account for the total size of a sent message in the per msg type connection stats. */
@ -825,10 +831,10 @@ public:
private: private:
const NodeId id; const NodeId id;
const uint64_t nLocalHostNonce; const uint64_t nLocalHostNonce;
const ConnectionType m_conn_type;
std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION}; std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION};
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread const size_t m_recv_flood_size;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
Mutex m_msg_process_queue_mutex; Mutex m_msg_process_queue_mutex;
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
@ -1257,8 +1263,6 @@ public:
/** Get a unique deterministic randomizer. */ /** Get a unique deterministic randomizer. */
CSipHasher GetDeterministicRandomizer(uint64_t id) const; CSipHasher GetDeterministicRandomizer(uint64_t id) const;
unsigned int GetReceiveFloodSize() const;
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
/** Return true if we should disconnect the peer for failing an inactivity check. */ /** Return true if we should disconnect the peer for failing an inactivity check. */

View File

@ -5048,7 +5048,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend) return false; if (pfrom->fPauseSend) return false;
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())}; auto poll_result{pfrom->PollMessage()};
if (!poll_result) { if (!poll_result) {
// No message to process // No message to process
return false; return false;

View File

@ -126,7 +126,6 @@ FUZZ_TARGET_INIT(connman, initialize_connman)
std::vector<CNodeStats> stats; std::vector<CNodeStats> stats;
connman.GetNodeStats(stats); connman.GetNodeStats(stats);
(void)connman.GetOutboundTargetBytesLeft(); (void)connman.GetOutboundTargetBytesLeft();
(void)connman.GetReceiveFloodSize();
(void)connman.GetTotalBytesRecv(); (void)connman.GetTotalBytesRecv();
(void)connman.GetTotalBytesSent(); (void)connman.GetTotalBytesSent();
(void)connman.GetTryNewOutboundPeer(); (void)connman.GetTryNewOutboundPeer();

View File

@ -68,7 +68,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
{ {
assert(node.ReceiveMsgBytes(msg_bytes, complete)); assert(node.ReceiveMsgBytes(msg_bytes, complete));
if (complete) { if (complete) {
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize); node.MarkReceivedMsgsForProcessing();
} }
} }