From 0b8fe48fbe0dc04cc3fe9075767e9c67269687be Mon Sep 17 00:00:00 2001 From: pasta Date: Mon, 25 Mar 2024 11:03:01 -0500 Subject: [PATCH] refactor: add cs_mapSocketToNode and cs_sendable_receivable_nodes to minimize cs_vNode contention (and document an undocumented lock requirement) --- src/net.cpp | 87 ++++++++++++++++++++++++++++++----------------------- src/net.h | 9 +++--- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 5beddbf35d..1e5f2ed205 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -545,7 +545,7 @@ void CNode::CloseSocketDisconnect(CConnman* connman) AssertLockHeld(connman->cs_vNodes); fDisconnect = true; - LOCK(cs_hSocket); + LOCK2(connman->cs_mapSocketToNode, cs_hSocket); if (hSocket == INVALID_SOCKET) { return; } @@ -554,8 +554,11 @@ void CNode::CloseSocketDisconnect(CConnman* connman) fCanSendData = false; connman->mapSocketToNode.erase(hSocket); - connman->mapReceivableNodes.erase(GetId()); - connman->mapSendableNodes.erase(GetId()); + { + LOCK(connman->cs_sendable_receivable_nodes); + connman->mapReceivableNodes.erase(GetId()); + connman->mapSendableNodes.erase(GetId()); + } { LOCK(connman->cs_mapNodesWithDataToSend); if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) { @@ -1287,7 +1290,7 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, { LOCK(cs_vNodes); vNodes.push_back(pnode); - mapSocketToNode.emplace(hSocket, pnode); + WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode)); RegisterEvents(pnode); WakeSelect(); } @@ -1784,23 +1787,25 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s void CConnman::SocketHandler() { - bool fOnlyPoll = false; - { + bool fOnlyPoll = [this]() { // check if we have work to do and thus should avoid waiting for events - LOCK2(cs_vNodes, cs_mapNodesWithDataToSend); + LOCK2(cs_vNodes, cs_sendable_receivable_nodes); if (!mapReceivableNodes.empty()) { - fOnlyPoll = true; - } else if (!mapSendableNodes.empty() && !mapNodesWithDataToSend.empty()) { - // we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single - // node would be able to make the network thread busy with polling - for (auto& p : mapNodesWithDataToSend) { - if (mapSendableNodes.count(p.first)) { - fOnlyPoll = true; - break; + return true; + } else if (!mapSendableNodes.empty()) { + if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) { + // we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single + // node would be able to make the network thread busy with polling + for (auto& p : mapNodesWithDataToSend) { + if (mapSendableNodes.count(p.first)) { + return true; + break; + } } } } - } + return false; + }(); std::set recv_set, send_set, error_set; SocketEvents(recv_set, send_set, error_set, fOnlyPoll); @@ -1835,7 +1840,7 @@ void CConnman::SocketHandler() std::vector vReceivableNodes; std::vector vSendableNodes; { - LOCK(cs_vNodes); + LOCK(cs_mapSocketToNode); for (auto hSocket : error_set) { auto it = mapSocketToNode.find(hSocket); if (it == mapSocketToNode.end()) { @@ -1855,6 +1860,7 @@ void CConnman::SocketHandler() continue; } + LOCK(cs_sendable_receivable_nodes); auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); assert(jt.first->second == it->second); it->second->fHasRecvData = true; @@ -1865,6 +1871,7 @@ void CConnman::SocketHandler() continue; } + LOCK(cs_sendable_receivable_nodes); auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); assert(jt.first->second == it->second); it->second->fCanSendData = true; @@ -1872,24 +1879,28 @@ void CConnman::SocketHandler() // collect nodes that have a receivable socket // also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore - vReceivableNodes.reserve(mapReceivableNodes.size()); - for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { - if (!it->second->fHasRecvData) { - it = mapReceivableNodes.erase(it); - } else { - // Implement the following logic: - // * If there is data to send, try sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try - // receiving data (which should succeed as the socket signalled as receivable). - if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { - it->second->AddRef(); - vReceivableNodes.emplace_back(it->second); + { + LOCK(cs_sendable_receivable_nodes); + + vReceivableNodes.reserve(mapReceivableNodes.size()); + for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { + if (!it->second->fHasRecvData) { + it = mapReceivableNodes.erase(it); + } else { + // Implement the following logic: + // * If there is data to send, try sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try + // receiving data (which should succeed as the socket signalled as receivable). + if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { + it->second->AddRef(); + vReceivableNodes.emplace_back(it->second); + } + ++it; } - ++it; } } @@ -1953,7 +1964,7 @@ void CConnman::SocketHandler() } { - LOCK(cs_vNodes); + LOCK(cs_sendable_receivable_nodes); // remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do // (even if there are pending messages to be sent) for (auto it = mapSendableNodes.begin(); it != mapSendableNodes.end(); ) { @@ -2886,7 +2897,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai pnode->m_masternode_probe_connection = true; { - LOCK2(cs_vNodes, pnode->cs_hSocket); + LOCK2(cs_mapSocketToNode, pnode->cs_hSocket); mapSocketToNode.emplace(pnode->hSocket, pnode); } @@ -3501,9 +3512,9 @@ void CConnman::StopNodes() for (CNode* pnode : vNodesDisconnected) { DeleteNode(pnode); } - mapSocketToNode.clear(); + WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear()); { - LOCK(cs_vNodes); + LOCK(cs_sendable_receivable_nodes); mapReceivableNodes.clear(); } { diff --git a/src/net.h b/src/net.h index 2851e10959..a5a1bde747 100644 --- a/src/net.h +++ b/src/net.h @@ -1432,7 +1432,8 @@ private: std::set masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); std::vector vNodes GUARDED_BY(cs_vNodes); std::list vNodesDisconnected; - std::unordered_map mapSocketToNode; + mutable Mutex cs_mapSocketToNode; + std::unordered_map mapSocketToNode GUARDED_BY(cs_mapSocketToNode); mutable RecursiveMutex cs_vNodes; std::atomic nLastNodeId{0}; unsigned int nPrevNodeCount{0}; @@ -1542,9 +1543,9 @@ private: int epollfd{-1}; #endif - /** Protected by cs_vNodes */ - std::unordered_map mapReceivableNodes GUARDED_BY(cs_vNodes); - std::unordered_map mapSendableNodes GUARDED_BY(cs_vNodes); + Mutex cs_sendable_receivable_nodes; + std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); + std::unordered_map mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes); /** Protected by cs_mapNodesWithDataToSend */ std::unordered_map mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend); mutable RecursiveMutex cs_mapNodesWithDataToSend;