diff --git a/src/net.cpp b/src/net.cpp index df2109e3f7..70c04d7a0e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler() } { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (lockRecv && !pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { + size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } @@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; diff --git a/src/net.h b/src/net.h index 21864e73d1..0eb430a8bf 100644 --- a/src/net.h +++ b/src/net.h @@ -610,6 +610,7 @@ public: CCriticalSection cs_vProcessMsg; std::list vProcessMsg; + size_t nProcessQueueSize; std::deque vRecvGetData; std::list vRecvMsg; @@ -650,6 +651,7 @@ public: const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -743,15 +745,6 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9963a872e8..93b6e2ec01 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru return false; // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front());