partial bitcoin#27981: Fix potential network stalling bug

To allow for the removal of a node from `vReceivableNodes`, the
collection of node pointers have been made into an `std::set`.

Marking as partial as it should be revisited when bitcoin#24356 is
backported.
This commit is contained in:
Kittywhiskers Van Gogh 2024-08-27 18:36:31 +00:00
parent 13f6dc1b27
commit 8c986d6b08
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
2 changed files with 38 additions and 25 deletions

View File

@ -936,7 +936,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
} }
size_t CConnman::SocketSendData(CNode& node) std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{ {
auto it = node.vSendMsg.begin(); auto it = node.vSendMsg.begin();
size_t nSentSize = 0; size_t nSentSize = 0;
@ -994,7 +994,7 @@ size_t CConnman::SocketSendData(CNode& node)
} }
node.vSendMsg.erase(node.vSendMsg.begin(), it); node.vSendMsg.erase(node.vSendMsg.begin(), it);
node.nSendMsgSize = node.vSendMsg.size(); node.nSendMsgSize = node.vSendMsg.size();
return nSentSize; return {nSentSize, !node.vSendMsg.empty()};
} }
static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b) static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
@ -1711,8 +1711,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
recv_set.insert(hListenSocket.sock->Get()); recv_set.insert(hListenSocket.sock->Get());
} }
for (CNode* pnode : nodes) for (CNode* pnode : nodes) {
{
bool select_recv = !pnode->fHasRecvData; bool select_recv = !pnode->fHasRecvData;
bool select_send = !pnode->fCanSendData; bool select_send = !pnode->fCanSendData;
@ -2027,9 +2026,9 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
if (interruptNet) return; if (interruptNet) return;
std::vector<CNode*> vErrorNodes; std::set<CNode*> vErrorNodes;
std::vector<CNode*> vReceivableNodes; std::set<CNode*> vReceivableNodes;
std::vector<CNode*> vSendableNodes; std::set<CNode*> vSendableNodes;
{ {
LOCK(cs_mapSocketToNode); LOCK(cs_mapSocketToNode);
for (auto hSocket : error_set) { for (auto hSocket : error_set) {
@ -2038,7 +2037,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
continue; continue;
} }
it->second->AddRef(); it->second->AddRef();
vErrorNodes.emplace_back(it->second); vErrorNodes.emplace(it->second);
} }
for (auto hSocket : recv_set) { for (auto hSocket : recv_set) {
if (error_set.count(hSocket)) { if (error_set.count(hSocket)) {
@ -2073,7 +2072,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
{ {
LOCK(cs_sendable_receivable_nodes); LOCK(cs_sendable_receivable_nodes);
vReceivableNodes.reserve(mapReceivableNodes.size());
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) { if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it); it = mapReceivableNodes.erase(it);
@ -2088,7 +2086,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// receiving data (which should succeed as the socket signalled as receivable). // receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
it->second->AddRef(); it->second->AddRef();
vReceivableNodes.emplace_back(it->second); vReceivableNodes.emplace(it->second);
} }
++it; ++it;
} }
@ -2099,7 +2097,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
// but don't have any in this iteration // but don't have any in this iteration
LOCK(cs_mapNodesWithDataToSend); LOCK(cs_mapNodesWithDataToSend);
vSendableNodes.reserve(mapNodesWithDataToSend.size());
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
if (it->second->nSendMsgSize == 0) { if (it->second->nSendMsgSize == 0) {
// See comment in PushMessage // See comment in PushMessage
@ -2108,13 +2105,36 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
} else { } else {
if (it->second->fCanSendData) { if (it->second->fCanSendData) {
it->second->AddRef(); it->second->AddRef();
vSendableNodes.emplace_back(it->second); vSendableNodes.emplace(it->second);
} }
++it; ++it;
} }
} }
} }
for (CNode* pnode : vSendableNodes) {
if (interruptNet) {
break;
}
// Send data
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) {
RecordBytesSent(bytes_sent);
// If both receiving and (non-optimistic) sending were possible, we first attempt
// sending. If that succeeds, but does not fully drain the send queue, do not
// attempt to receive. This avoids needlessly queueing data if the remote peer
// is slow at receiving data, by means of TCP flow control. We only do this when
// sending actually succeeded to make sure progress is always made; otherwise a
// deadlock would be possible when both sides have data to send, but neither is
// receiving.
if (data_left && vReceivableNodes.erase(pnode)) {
pnode->Release();
}
}
}
for (CNode* pnode : vErrorNodes) for (CNode* pnode : vErrorNodes)
{ {
if (interruptNet) { if (interruptNet) {
@ -2136,16 +2156,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
SocketRecvData(pnode); SocketRecvData(pnode);
} }
for (CNode* pnode : vSendableNodes) {
if (interruptNet) {
break;
}
// Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent);
}
for (auto& node : vErrorNodes) { for (auto& node : vErrorNodes) {
node->Release(); node->Release();
} }
@ -4183,7 +4193,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{ {
LOCK(pnode->cs_vSend); LOCK(pnode->cs_vSend);
bool hasPendingData = !pnode->vSendMsg.empty(); bool optimisticSend(pnode->vSendMsg.empty());
//log total amount of bytes per message type //log total amount of bytes per message type
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize; pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
@ -4206,7 +4216,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
} }
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
m_wakeup_pipe->Write(); m_wakeup_pipe->Write();
} }
} }

View File

@ -1387,8 +1387,11 @@ private:
NodeId GetNewNodeId(); NodeId GetNewNodeId();
size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); /** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void DumpAddresses(); void DumpAddresses();
// Network stats // Network stats