mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
merge bitcoin#27257: End friendship of CNode, CConnman and ConnmanTestMsg
This commit is contained in:
parent
3465df2689
commit
9023dd25af
64
src/net.cpp
64
src/net.cpp
@ -1034,7 +1034,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
|
|||||||
// Notify transport that bytes have been processed.
|
// Notify transport that bytes have been processed.
|
||||||
node.m_transport->MarkBytesSent(nBytes);
|
node.m_transport->MarkBytesSent(nBytes);
|
||||||
// Update statistics per message type.
|
// Update statistics per message type.
|
||||||
node.mapSendBytesPerMsgType[msg_type] += nBytes;
|
node.AccountForSentBytes(msg_type, nBytes);
|
||||||
nSentSize += nBytes;
|
nSentSize += nBytes;
|
||||||
if ((size_t)nBytes != data.size()) {
|
if ((size_t)nBytes != data.size()) {
|
||||||
// could not send full message; stop sending more
|
// could not send full message; stop sending more
|
||||||
@ -1115,7 +1115,7 @@ bool CConnman::AttemptToEvictConnection()
|
|||||||
.m_is_local = node->addr.IsLocal(),
|
.m_is_local = node->addr.IsLocal(),
|
||||||
.m_network = node->ConnectedThroughNetwork(),
|
.m_network = node->ConnectedThroughNetwork(),
|
||||||
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
|
.m_noban = node->HasPermission(NetPermissionFlags::NoBan),
|
||||||
.m_conn_type = node->m_conn_type,
|
.m_conn_type = node->GetConnectionType(),
|
||||||
};
|
};
|
||||||
vEvictionCandidates.push_back(candidate);
|
vEvictionCandidates.push_back(candidate);
|
||||||
}
|
}
|
||||||
@ -1339,7 +1339,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
|
|||||||
|
|
||||||
// Count existing connections
|
// Count existing connections
|
||||||
int existing_connections = WITH_LOCK(m_nodes_mutex,
|
int existing_connections = WITH_LOCK(m_nodes_mutex,
|
||||||
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
|
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->GetConnectionType() == conn_type; }););
|
||||||
|
|
||||||
// Max connections of specified type already exist
|
// Max connections of specified type already exist
|
||||||
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
|
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
|
||||||
@ -1494,16 +1494,8 @@ void CConnman::CalculateNumConnectionsChangedStats()
|
|||||||
mapSentBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0;
|
mapSentBytesMsgStats[NET_MESSAGE_TYPE_OTHER] = 0;
|
||||||
const NodesSnapshot snap{*this, /* filter = */ CConnman::FullyConnectedOnly};
|
const NodesSnapshot snap{*this, /* filter = */ CConnman::FullyConnectedOnly};
|
||||||
for (auto pnode : snap.Nodes()) {
|
for (auto pnode : snap.Nodes()) {
|
||||||
{
|
WITH_LOCK(pnode->cs_vRecv, pnode->UpdateRecvMapWithStats(mapRecvBytesMsgStats));
|
||||||
LOCK(pnode->cs_vRecv);
|
WITH_LOCK(pnode->cs_vSend, pnode->UpdateSentMapWithStats(mapSentBytesMsgStats));
|
||||||
for (const mapMsgTypeSize::value_type &i : pnode->mapRecvBytesPerMsgType)
|
|
||||||
mapRecvBytesMsgStats[i.first] += i.second;
|
|
||||||
}
|
|
||||||
{
|
|
||||||
LOCK(pnode->cs_vSend);
|
|
||||||
for (const mapMsgTypeSize::value_type &i : pnode->mapSendBytesPerMsgType)
|
|
||||||
mapSentBytesMsgStats[i.first] += i.second;
|
|
||||||
}
|
|
||||||
if (pnode->m_bloom_filter_loaded.load()) {
|
if (pnode->m_bloom_filter_loaded.load()) {
|
||||||
spvNodes++;
|
spvNodes++;
|
||||||
} else {
|
} else {
|
||||||
@ -2096,18 +2088,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
|
|||||||
}
|
}
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
if (notify) {
|
if (notify) {
|
||||||
size_t nSizeAdded = 0;
|
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
||||||
for (const auto& msg : pnode->vRecvMsg) {
|
|
||||||
// vRecvMsg contains only completed CNetMessage
|
|
||||||
// the single possible partially deserialized message are held by TransportDeserializer
|
|
||||||
nSizeAdded += msg.m_raw_message_size;
|
|
||||||
}
|
|
||||||
{
|
|
||||||
LOCK(pnode->cs_vProcessMsg);
|
|
||||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
|
|
||||||
pnode->nProcessQueueSize += nSizeAdded;
|
|
||||||
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
|
||||||
}
|
|
||||||
WakeMessageHandler();
|
WakeMessageHandler();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2483,7 +2464,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
|
|||||||
if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++;
|
if (pnode->IsFullOutboundConn() && pnode->ConnectedThroughNetwork() == Network::NET_ONION) nOutboundOnionRelay++;
|
||||||
|
|
||||||
// Make sure our persistent outbound slots belong to different netgroups.
|
// Make sure our persistent outbound slots belong to different netgroups.
|
||||||
switch (pnode->m_conn_type) {
|
switch (pnode->GetConnectionType()) {
|
||||||
// We currently don't take inbound connections into account. Since they are
|
// We currently don't take inbound connections into account. Since they are
|
||||||
// free to make, an attacker could make them to prevent us from connecting to
|
// free to make, an attacker could make them to prevent us from connecting to
|
||||||
// certain peers.
|
// certain peers.
|
||||||
@ -4074,6 +4055,37 @@ CNode::CNode(NodeId idIn,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
||||||
|
{
|
||||||
|
AssertLockNotHeld(m_msg_process_queue_mutex);
|
||||||
|
|
||||||
|
size_t nSizeAdded = 0;
|
||||||
|
for (const auto& msg : vRecvMsg) {
|
||||||
|
// vRecvMsg contains only completed CNetMessage
|
||||||
|
// the single possible partially deserialized message are held by TransportDeserializer
|
||||||
|
nSizeAdded += msg.m_raw_message_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOCK(m_msg_process_queue_mutex);
|
||||||
|
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
|
||||||
|
m_msg_process_queue_size += nSizeAdded;
|
||||||
|
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage(size_t recv_flood_size)
|
||||||
|
{
|
||||||
|
LOCK(m_msg_process_queue_mutex);
|
||||||
|
if (m_msg_process_queue.empty()) return std::nullopt;
|
||||||
|
|
||||||
|
std::list<CNetMessage> msgs;
|
||||||
|
// Just take one message
|
||||||
|
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
|
||||||
|
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
|
||||||
|
fPauseRecv = m_msg_process_queue_size > recv_flood_size;
|
||||||
|
|
||||||
|
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
|
||||||
|
}
|
||||||
|
|
||||||
bool CConnman::NodeFullyConnected(const CNode* pnode)
|
bool CConnman::NodeFullyConnected(const CNode* pnode)
|
||||||
{
|
{
|
||||||
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
|
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
|
||||||
|
54
src/net.h
54
src/net.h
@ -459,9 +459,6 @@ struct CNodeOptions
|
|||||||
/** Information about a peer */
|
/** Information about a peer */
|
||||||
class CNode
|
class CNode
|
||||||
{
|
{
|
||||||
friend class CConnman;
|
|
||||||
friend struct ConnmanTestMsg;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
|
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
|
||||||
* the sending side functions are only called under cs_vSend. */
|
* the sending side functions are only called under cs_vSend. */
|
||||||
@ -490,10 +487,6 @@ public:
|
|||||||
Mutex m_sock_mutex;
|
Mutex m_sock_mutex;
|
||||||
Mutex cs_vRecv;
|
Mutex cs_vRecv;
|
||||||
|
|
||||||
RecursiveMutex cs_vProcessMsg;
|
|
||||||
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
|
|
||||||
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
|
|
||||||
|
|
||||||
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
||||||
|
|
||||||
std::atomic<std::chrono::seconds> m_last_send{0s};
|
std::atomic<std::chrono::seconds> m_last_send{0s};
|
||||||
@ -553,6 +546,48 @@ public:
|
|||||||
std::atomic_bool fHasRecvData{false};
|
std::atomic_bool fHasRecvData{false};
|
||||||
std::atomic_bool fCanSendData{false};
|
std::atomic_bool fCanSendData{false};
|
||||||
|
|
||||||
|
const ConnectionType& GetConnectionType() const
|
||||||
|
{
|
||||||
|
return m_conn_type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Move all messages from the received queue to the processing queue. */
|
||||||
|
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||||
|
|
||||||
|
/** Poll the next message from the processing queue of this connection.
|
||||||
|
*
|
||||||
|
* Returns std::nullopt if the processing queue is empty, or a pair
|
||||||
|
* consisting of the message and a bool that indicates if the processing
|
||||||
|
* queue has more entries. */
|
||||||
|
std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
|
||||||
|
|
||||||
|
/** Account for the total size of a sent message in the per msg type connection stats. */
|
||||||
|
void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
|
||||||
|
{
|
||||||
|
mapSendBytesPerMsgType[msg_type] += sent_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update a supplied map with bytes sent for each msg type for this node */
|
||||||
|
void UpdateSentMapWithStats(mapMsgTypeSize& map_sentbytes_msg)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
|
||||||
|
{
|
||||||
|
for (auto& [msg_type, bytes] : mapSendBytesPerMsgType) {
|
||||||
|
map_sentbytes_msg[msg_type] += bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update a supplied map with bytes recv for each msg type for this node */
|
||||||
|
void UpdateRecvMapWithStats(mapMsgTypeSize& map_recvbytes_msg)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(cs_vRecv)
|
||||||
|
{
|
||||||
|
for (auto& [msg_type, bytes] : mapRecvBytesPerMsgType) {
|
||||||
|
map_recvbytes_msg[msg_type] += bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get network the peer connected through.
|
* Get network the peer connected through.
|
||||||
*
|
*
|
||||||
@ -564,6 +599,7 @@ public:
|
|||||||
* @return network the peer connected through.
|
* @return network the peer connected through.
|
||||||
*/
|
*/
|
||||||
Network ConnectedThroughNetwork() const;
|
Network ConnectedThroughNetwork() const;
|
||||||
|
|
||||||
bool IsOutboundOrBlockRelayConn() const {
|
bool IsOutboundOrBlockRelayConn() const {
|
||||||
switch (m_conn_type) {
|
switch (m_conn_type) {
|
||||||
case ConnectionType::OUTBOUND_FULL_RELAY:
|
case ConnectionType::OUTBOUND_FULL_RELAY:
|
||||||
@ -794,6 +830,10 @@ private:
|
|||||||
|
|
||||||
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
||||||
|
|
||||||
|
Mutex m_msg_process_queue_mutex;
|
||||||
|
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
|
||||||
|
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
|
||||||
|
|
||||||
// Our address, as reported by the peer
|
// Our address, as reported by the peer
|
||||||
CService addrLocal GUARDED_BY(m_addr_local_mutex);
|
CService addrLocal GUARDED_BY(m_addr_local_mutex);
|
||||||
mutable Mutex m_addr_local_mutex;
|
mutable Mutex m_addr_local_mutex;
|
||||||
|
@ -5013,8 +5013,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||||||
{
|
{
|
||||||
AssertLockHeld(g_msgproc_mutex);
|
AssertLockHeld(g_msgproc_mutex);
|
||||||
|
|
||||||
bool fMoreWork = false;
|
|
||||||
|
|
||||||
PeerRef peer = GetPeerRef(pfrom->GetId());
|
PeerRef peer = GetPeerRef(pfrom->GetId());
|
||||||
if (peer == nullptr) return false;
|
if (peer == nullptr) return false;
|
||||||
|
|
||||||
@ -5050,17 +5048,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->fPauseSend) return false;
|
if (pfrom->fPauseSend) return false;
|
||||||
|
|
||||||
std::list<CNetMessage> msgs;
|
auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())};
|
||||||
{
|
if (!poll_result) {
|
||||||
LOCK(pfrom->cs_vProcessMsg);
|
// No message to process
|
||||||
if (pfrom->vProcessMsg.empty()) return false;
|
return false;
|
||||||
// Just take one message
|
|
||||||
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
|
||||||
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
|
|
||||||
pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize();
|
|
||||||
fMoreWork = !pfrom->vProcessMsg.empty();
|
|
||||||
}
|
}
|
||||||
CNetMessage& msg(msgs.front());
|
|
||||||
|
CNetMessage& msg{poll_result->first};
|
||||||
|
bool fMoreWork = poll_result->second;
|
||||||
|
|
||||||
TRACE6(net, inbound_message,
|
TRACE6(net, inbound_message,
|
||||||
pfrom->GetId(),
|
pfrom->GetId(),
|
||||||
|
@ -68,18 +68,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
|
|||||||
{
|
{
|
||||||
assert(node.ReceiveMsgBytes(msg_bytes, complete));
|
assert(node.ReceiveMsgBytes(msg_bytes, complete));
|
||||||
if (complete) {
|
if (complete) {
|
||||||
size_t nSizeAdded = 0;
|
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
|
||||||
for (const auto& msg : node.vRecvMsg) {
|
|
||||||
// vRecvMsg contains only completed CNetMessage
|
|
||||||
// the single possible partially deserialized message are held by TransportDeserializer
|
|
||||||
nSizeAdded += msg.m_raw_message_size;
|
|
||||||
}
|
|
||||||
{
|
|
||||||
LOCK(node.cs_vProcessMsg);
|
|
||||||
node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
|
|
||||||
node.nProcessQueueSize += nSizeAdded;
|
|
||||||
node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user