mirror of
https://github.com/dashpay/dash.git
synced 2024-12-26 12:32:48 +01:00
partial bitcoin#27981: Fix potential network stalling bug
To allow for the removal of a node from `vReceivableNodes`, the collection of node pointers have been made into an `std::set`. Marking as partial as it should be revisited when bitcoin#24356 is backported.
This commit is contained in:
parent
13f6dc1b27
commit
8c986d6b08
58
src/net.cpp
58
src/net.cpp
@ -936,7 +936,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
|
|||||||
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
|
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t CConnman::SocketSendData(CNode& node)
|
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
|
||||||
{
|
{
|
||||||
auto it = node.vSendMsg.begin();
|
auto it = node.vSendMsg.begin();
|
||||||
size_t nSentSize = 0;
|
size_t nSentSize = 0;
|
||||||
@ -994,7 +994,7 @@ size_t CConnman::SocketSendData(CNode& node)
|
|||||||
}
|
}
|
||||||
node.vSendMsg.erase(node.vSendMsg.begin(), it);
|
node.vSendMsg.erase(node.vSendMsg.begin(), it);
|
||||||
node.nSendMsgSize = node.vSendMsg.size();
|
node.nSendMsgSize = node.vSendMsg.size();
|
||||||
return nSentSize;
|
return {nSentSize, !node.vSendMsg.empty()};
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
|
static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b)
|
||||||
@ -1711,8 +1711,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
|
|||||||
recv_set.insert(hListenSocket.sock->Get());
|
recv_set.insert(hListenSocket.sock->Get());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (CNode* pnode : nodes)
|
for (CNode* pnode : nodes) {
|
||||||
{
|
|
||||||
bool select_recv = !pnode->fHasRecvData;
|
bool select_recv = !pnode->fHasRecvData;
|
||||||
bool select_send = !pnode->fCanSendData;
|
bool select_send = !pnode->fCanSendData;
|
||||||
|
|
||||||
@ -2027,9 +2026,9 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
|
|
||||||
if (interruptNet) return;
|
if (interruptNet) return;
|
||||||
|
|
||||||
std::vector<CNode*> vErrorNodes;
|
std::set<CNode*> vErrorNodes;
|
||||||
std::vector<CNode*> vReceivableNodes;
|
std::set<CNode*> vReceivableNodes;
|
||||||
std::vector<CNode*> vSendableNodes;
|
std::set<CNode*> vSendableNodes;
|
||||||
{
|
{
|
||||||
LOCK(cs_mapSocketToNode);
|
LOCK(cs_mapSocketToNode);
|
||||||
for (auto hSocket : error_set) {
|
for (auto hSocket : error_set) {
|
||||||
@ -2038,7 +2037,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
it->second->AddRef();
|
it->second->AddRef();
|
||||||
vErrorNodes.emplace_back(it->second);
|
vErrorNodes.emplace(it->second);
|
||||||
}
|
}
|
||||||
for (auto hSocket : recv_set) {
|
for (auto hSocket : recv_set) {
|
||||||
if (error_set.count(hSocket)) {
|
if (error_set.count(hSocket)) {
|
||||||
@ -2073,7 +2072,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
{
|
{
|
||||||
LOCK(cs_sendable_receivable_nodes);
|
LOCK(cs_sendable_receivable_nodes);
|
||||||
|
|
||||||
vReceivableNodes.reserve(mapReceivableNodes.size());
|
|
||||||
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
|
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
|
||||||
if (!it->second->fHasRecvData) {
|
if (!it->second->fHasRecvData) {
|
||||||
it = mapReceivableNodes.erase(it);
|
it = mapReceivableNodes.erase(it);
|
||||||
@ -2088,7 +2086,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
// receiving data (which should succeed as the socket signalled as receivable).
|
// receiving data (which should succeed as the socket signalled as receivable).
|
||||||
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
|
if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) {
|
||||||
it->second->AddRef();
|
it->second->AddRef();
|
||||||
vReceivableNodes.emplace_back(it->second);
|
vReceivableNodes.emplace(it->second);
|
||||||
}
|
}
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
@ -2099,7 +2097,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
|
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
|
||||||
// but don't have any in this iteration
|
// but don't have any in this iteration
|
||||||
LOCK(cs_mapNodesWithDataToSend);
|
LOCK(cs_mapNodesWithDataToSend);
|
||||||
vSendableNodes.reserve(mapNodesWithDataToSend.size());
|
|
||||||
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
|
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
|
||||||
if (it->second->nSendMsgSize == 0) {
|
if (it->second->nSendMsgSize == 0) {
|
||||||
// See comment in PushMessage
|
// See comment in PushMessage
|
||||||
@ -2108,13 +2105,36 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
} else {
|
} else {
|
||||||
if (it->second->fCanSendData) {
|
if (it->second->fCanSendData) {
|
||||||
it->second->AddRef();
|
it->second->AddRef();
|
||||||
vSendableNodes.emplace_back(it->second);
|
vSendableNodes.emplace(it->second);
|
||||||
}
|
}
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (CNode* pnode : vSendableNodes) {
|
||||||
|
if (interruptNet) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send data
|
||||||
|
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
|
||||||
|
if (bytes_sent) {
|
||||||
|
RecordBytesSent(bytes_sent);
|
||||||
|
|
||||||
|
// If both receiving and (non-optimistic) sending were possible, we first attempt
|
||||||
|
// sending. If that succeeds, but does not fully drain the send queue, do not
|
||||||
|
// attempt to receive. This avoids needlessly queueing data if the remote peer
|
||||||
|
// is slow at receiving data, by means of TCP flow control. We only do this when
|
||||||
|
// sending actually succeeded to make sure progress is always made; otherwise a
|
||||||
|
// deadlock would be possible when both sides have data to send, but neither is
|
||||||
|
// receiving.
|
||||||
|
if (data_left && vReceivableNodes.erase(pnode)) {
|
||||||
|
pnode->Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (CNode* pnode : vErrorNodes)
|
for (CNode* pnode : vErrorNodes)
|
||||||
{
|
{
|
||||||
if (interruptNet) {
|
if (interruptNet) {
|
||||||
@ -2136,16 +2156,6 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
|
|||||||
SocketRecvData(pnode);
|
SocketRecvData(pnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (CNode* pnode : vSendableNodes) {
|
|
||||||
if (interruptNet) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send data
|
|
||||||
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
|
|
||||||
if (bytes_sent) RecordBytesSent(bytes_sent);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto& node : vErrorNodes) {
|
for (auto& node : vErrorNodes) {
|
||||||
node->Release();
|
node->Release();
|
||||||
}
|
}
|
||||||
@ -4183,7 +4193,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||||||
|
|
||||||
{
|
{
|
||||||
LOCK(pnode->cs_vSend);
|
LOCK(pnode->cs_vSend);
|
||||||
bool hasPendingData = !pnode->vSendMsg.empty();
|
bool optimisticSend(pnode->vSendMsg.empty());
|
||||||
|
|
||||||
//log total amount of bytes per message type
|
//log total amount of bytes per message type
|
||||||
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
|
pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize;
|
||||||
@ -4206,7 +4216,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
|
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
|
||||||
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
|
if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
|
||||||
m_wakeup_pipe->Write();
|
m_wakeup_pipe->Write();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1387,8 +1387,11 @@ private:
|
|||||||
|
|
||||||
NodeId GetNewNodeId();
|
NodeId GetNewNodeId();
|
||||||
|
|
||||||
size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
|
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
|
||||||
|
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
|
||||||
|
|
||||||
size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
|
||||||
|
|
||||||
void DumpAddresses();
|
void DumpAddresses();
|
||||||
|
|
||||||
// Network stats
|
// Network stats
|
||||||
|
Loading…
Reference in New Issue
Block a user