mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
partial bitcoin#27981: Fix potential network stalling bug
To allow for the removal of a node from `vReceivableNodes`, the collection of node pointers have been made into an `std::set`. Marking as partial as it should be revisited when bitcoin#24356 is backported.
This commit is contained in:
parent
13f6dc1b27
commit
8c986d6b08
58
src/net.cpp
58
src/net.cpp
@ -936,7 +936,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
|
||||
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
|
||||
}
|
||||
|
||||
size_t CConnman::SocketSendData(CNode& node)
|
||||
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
|
||||
{
|
||||
auto it = node.vSendMsg.begin();
|
||||
size_t nSentSize = 0;
|
||||
@ -994,7 +994,7 @@ size_t CConnman::SocketSendData(CNode& node)
|
||||
}
|
||||
node.vSendMsg.erase(node.vSendMsg.begin(), it);
|
||||
node.nSendMsgSize = node.vSendMsg.size();
|
||||
return nSentSize;
|
||||
return {nSentSize, !node.vSendMsg.empty()};
|
||||
}
|
||||
|
||||
static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
|
||||
@ -1711,8 +1711,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
|
||||
recv_set.insert(hListenSocket.sock->Get());
|
||||
}
|
||||
|
||||
for (CNode* pnode : nodes)
|
||||
{
|
||||
for (CNode* pnode : nodes) {
|
||||
bool select_recv = !pnode->fHasRecvData;
|
||||
bool select_send = !pnode->fCanSendData;
|
||||
|
||||
@ -2027,9 +2026,9 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
|
||||
if (interruptNet) return;
|
||||
|
||||
std::vector<CNode*> vErrorNodes;
|
||||
std::vector<CNode*> vReceivableNodes;
|
||||
std::vector<CNode*> vSendableNodes;
|
||||
std::set<CNode*> vErrorNodes;
|
||||
std::set<CNode*> vReceivableNodes;
|
||||
std::set<CNode*> vSendableNodes;
|
||||
{
|
||||
LOCK(cs_mapSocketToNode);
|
||||
for (auto hSocket : error_set) {
|
||||
@ -2038,7 +2037,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
continue;
|
||||
}
|
||||
it->second->AddRef();
|
||||
vErrorNodes.emplace_back(it->second);
|
||||
vErrorNodes.emplace(it->second);
|
||||
}
|
||||
for (auto hSocket : recv_set) {
|
||||
if (error_set.count(hSocket)) {
|
||||
@ -2073,7 +2072,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
{
|
||||
LOCK(cs_sendable_receivable_nodes);
|
||||
|
||||
vReceivableNodes.reserve(mapReceivableNodes.size());
|
||||
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
|
||||
if (!it->second->fHasRecvData) {
|
||||
it = mapReceivableNodes.erase(it);
|
||||
@ -2088,7 +2086,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
// receiving data (which should succeed as the socket signalled as receivable).
|
||||
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
|
||||
it->second->AddRef();
|
||||
vReceivableNodes.emplace_back(it->second);
|
||||
vReceivableNodes.emplace(it->second);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
@ -2099,7 +2097,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
|
||||
// but don't have any in this iteration
|
||||
LOCK(cs_mapNodesWithDataToSend);
|
||||
vSendableNodes.reserve(mapNodesWithDataToSend.size());
|
||||
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
|
||||
if (it->second->nSendMsgSize == 0) {
|
||||
// See comment in PushMessage
|
||||
@ -2108,13 +2105,36 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
} else {
|
||||
if (it->second->fCanSendData) {
|
||||
it->second->AddRef();
|
||||
vSendableNodes.emplace_back(it->second);
|
||||
vSendableNodes.emplace(it->second);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (CNode* pnode : vSendableNodes) {
|
||||
if (interruptNet) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Send data
|
||||
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
|
||||
if (bytes_sent) {
|
||||
RecordBytesSent(bytes_sent);
|
||||
|
||||
// If both receiving and (non-optimistic) sending were possible, we first attempt
|
||||
// sending. If that succeeds, but does not fully drain the send queue, do not
|
||||
// attempt to receive. This avoids needlessly queueing data if the remote peer
|
||||
// is slow at receiving data, by means of TCP flow control. We only do this when
|
||||
// sending actually succeeded to make sure progress is always made; otherwise a
|
||||
// deadlock would be possible when both sides have data to send, but neither is
|
||||
// receiving.
|
||||
if (data_left && vReceivableNodes.erase(pnode)) {
|
||||
pnode->Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (CNode* pnode : vErrorNodes)
|
||||
{
|
||||
if (interruptNet) {
|
||||
@ -2136,16 +2156,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
||||
SocketRecvData(pnode);
|
||||
}
|
||||
|
||||
for (CNode* pnode : vSendableNodes) {
|
||||
if (interruptNet) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Send data
|
||||
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
|
||||
if (bytes_sent) RecordBytesSent(bytes_sent);
|
||||
}
|
||||
|
||||
for (auto& node : vErrorNodes) {
|
||||
node->Release();
|
||||
}
|
||||
@ -4183,7 +4193,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
||||
|
||||
{
|
||||
LOCK(pnode->cs_vSend);
|
||||
bool hasPendingData = !pnode->vSendMsg.empty();
|
||||
bool optimisticSend(pnode->vSendMsg.empty());
|
||||
|
||||
//log total amount of bytes per message type
|
||||
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
|
||||
@ -4206,7 +4216,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
||||
}
|
||||
|
||||
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
|
||||
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
|
||||
if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
|
||||
m_wakeup_pipe->Write();
|
||||
}
|
||||
}
|
||||
|
@ -1387,8 +1387,11 @@ private:
|
||||
|
||||
NodeId GetNewNodeId();
|
||||
|
||||
size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
|
||||
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
|
||||
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
|
||||
|
||||
size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
||||
|
||||
void DumpAddresses();
|
||||
|
||||
// Network stats
|
||||
|
Loading…
Reference in New Issue
Block a user