refactor: add cs_mapSocketToNode and cs_sendable_receivable_nodes to minimize cs_vNode contention (and document an undocumented lock requirement)

This commit is contained in:
pasta 2024-03-25 11:03:01 -05:00
parent 4301ab9dfb
commit 0b8fe48fbe
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
2 changed files with 54 additions and 42 deletions

View File

@ -545,7 +545,7 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
AssertLockHeld(connman->cs_vNodes);
fDisconnect = true;
LOCK(cs_hSocket);
LOCK2(connman->cs_mapSocketToNode, cs_hSocket);
if (hSocket == INVALID_SOCKET) {
return;
}
@ -554,8 +554,11 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
fCanSendData = false;
connman->mapSocketToNode.erase(hSocket);
connman->mapReceivableNodes.erase(GetId());
connman->mapSendableNodes.erase(GetId());
{
LOCK(connman->cs_sendable_receivable_nodes);
connman->mapReceivableNodes.erase(GetId());
connman->mapSendableNodes.erase(GetId());
}
{
LOCK(connman->cs_mapNodesWithDataToSend);
if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) {
@ -1287,7 +1290,7 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
mapSocketToNode.emplace(hSocket, pnode);
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode));
RegisterEvents(pnode);
WakeSelect();
}
@ -1784,23 +1787,25 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
void CConnman::SocketHandler()
{
bool fOnlyPoll = false;
{
bool fOnlyPoll = [this]() {
// check if we have work to do and thus should avoid waiting for events
LOCK2(cs_vNodes, cs_mapNodesWithDataToSend);
LOCK2(cs_vNodes, cs_sendable_receivable_nodes);
if (!mapReceivableNodes.empty()) {
fOnlyPoll = true;
} else if (!mapSendableNodes.empty() && !mapNodesWithDataToSend.empty()) {
// we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single
// node would be able to make the network thread busy with polling
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
fOnlyPoll = true;
break;
return true;
} else if (!mapSendableNodes.empty()) {
if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) {
// we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single
// node would be able to make the network thread busy with polling
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
return true;
break;
}
}
}
}
}
return false;
}();
std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set, fOnlyPoll);
@ -1835,7 +1840,7 @@ void CConnman::SocketHandler()
std::vector<CNode*> vReceivableNodes;
std::vector<CNode*> vSendableNodes;
{
LOCK(cs_vNodes);
LOCK(cs_mapSocketToNode);
for (auto hSocket : error_set) {
auto it = mapSocketToNode.find(hSocket);
if (it == mapSocketToNode.end()) {
@ -1855,6 +1860,7 @@ void CConnman::SocketHandler()
continue;
}
LOCK(cs_sendable_receivable_nodes);
auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second);
assert(jt.first->second == it->second);
it->second->fHasRecvData = true;
@ -1865,6 +1871,7 @@ void CConnman::SocketHandler()
continue;
}
LOCK(cs_sendable_receivable_nodes);
auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second);
assert(jt.first->second == it->second);
it->second->fCanSendData = true;
@ -1872,24 +1879,28 @@ void CConnman::SocketHandler()
// collect nodes that have a receivable socket
// also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
} else {
// Implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef();
vReceivableNodes.emplace_back(it->second);
{
LOCK(cs_sendable_receivable_nodes);
vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
} else {
// Implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef();
vReceivableNodes.emplace_back(it->second);
}
++it;
}
++it;
}
}
@ -1953,7 +1964,7 @@ void CConnman::SocketHandler()
}
{
LOCK(cs_vNodes);
LOCK(cs_sendable_receivable_nodes);
// remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do
// (even if there are pending messages to be sent)
for (auto it = mapSendableNodes.begin(); it != mapSendableNodes.end(); ) {
@ -2886,7 +2897,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
pnode->m_masternode_probe_connection = true;
{
LOCK2(cs_vNodes, pnode->cs_hSocket);
LOCK2(cs_mapSocketToNode, pnode->cs_hSocket);
mapSocketToNode.emplace(pnode->hSocket, pnode);
}
@ -3501,9 +3512,9 @@ void CConnman::StopNodes()
for (CNode* pnode : vNodesDisconnected) {
DeleteNode(pnode);
}
mapSocketToNode.clear();
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear());
{
LOCK(cs_vNodes);
LOCK(cs_sendable_receivable_nodes);
mapReceivableNodes.clear();
}
{

View File

@ -1432,7 +1432,8 @@ private:
std::set<uint256> masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes);
std::vector<CNode*> vNodes GUARDED_BY(cs_vNodes);
std::list<CNode*> vNodesDisconnected;
std::unordered_map<SOCKET, CNode*> mapSocketToNode;
mutable Mutex cs_mapSocketToNode;
std::unordered_map<SOCKET, CNode*> mapSocketToNode GUARDED_BY(cs_mapSocketToNode);
mutable RecursiveMutex cs_vNodes;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
@ -1542,9 +1543,9 @@ private:
int epollfd{-1};
#endif
/** Protected by cs_vNodes */
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_vNodes);
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_vNodes);
Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes);
/** Protected by cs_mapNodesWithDataToSend */
std::unordered_map<NodeId, CNode*> mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend);
mutable RecursiveMutex cs_mapNodesWithDataToSend;