From 607dbfdeaf7ec053d959c47c125d60c0b7e7216a Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 19:41:12 -0500 Subject: [PATCH 1/6] P2P: parse network datastream into header/data components in socket thread Replaces CNode::vRecv buffer with a vector of CNetMessage's. This simplifies ProcessMessages() and eliminates several redundant data copies. Overview: * socket thread now parses incoming message datastream into header/data components, as encapsulated by CNetMessage * socket thread adds each CNetMessage to a vector inside CNode * message thread (ProcessMessages) iterates through CNode's CNetMessage vector Message parsing is made more strict: * Socket is disconnected, if message larger than MAX_SIZE or if CMessageHeader deserialization fails (latter is impossible?). Previously, code would simply eat garbage data all day long. * Socket is disconnected, if we fail to find pchMessageStart. We do not search through garbage, to find pchMessageStart. Each message must begin precisely after the last message ends. ProcessMessages() always processes a complete message, and is more efficient: * buffer is always precisely sized, using CDataStream::resize(), rather than progressively sized in 64k chunks. More efficient for large messages like "block". * whole-buffer memory copy eliminated (vRecv -> vMsg) * other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase) --- src/main.cpp | 65 +++++++++++++++-------------------- src/net.cpp | 96 +++++++++++++++++++++++++++++++++++++++++++++------- src/net.h | 70 +++++++++++++++++++++++++++++++++++--- 3 files changed, 176 insertions(+), 55 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 22baf0f3eb..3406144595 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3168,7 +3168,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) else if (strCommand == "verack") { - pfrom->vRecv.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); } @@ -3705,13 +3705,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return true; } +// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) { - CDataStream& vRecv = pfrom->vRecv; - if (vRecv.empty()) + if (pfrom->vRecvMsg.empty()) return true; //if (fDebug) - // printf("ProcessMessages(%u bytes)\n", vRecv.size()); + // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); // // Message format @@ -3722,32 +3722,32 @@ bool ProcessMessages(CNode* pfrom) // (x) data // - loop + unsigned int nMsgPos = 0; + for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++) { // Don't bother if send buffer is too full to respond anyway if (pfrom->vSend.size() >= SendBufferSize()) break; - // Scan for message start - CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart)); - int nHeaderSize = vRecv.GetSerializeSize(CMessageHeader()); - if (vRecv.end() - pstart < nHeaderSize) - { - if ((int)vRecv.size() > nHeaderSize) - { - printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n"); - vRecv.erase(vRecv.begin(), vRecv.end() - nHeaderSize); - } + // get next message; end, if an incomplete message is found + CNetMessage& msg = pfrom->vRecvMsg[nMsgPos]; + + //if (fDebug) + // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", + // msg.hdr.nMessageSize, msg.vRecv.size(), + // msg.complete() ? "Y" : "N"); + + if (!msg.complete()) break; + + // Scan for message start + if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { + printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); + return false; } - if (pstart - vRecv.begin() > 0) - printf("\n\nPROCESSMESSAGE SKIPPED %"PRIpdd" BYTES\n\n", pstart - vRecv.begin()); - vRecv.erase(vRecv.begin(), pstart); // Read header - vector vHeaderSave(vRecv.begin(), vRecv.begin() + nHeaderSize); - CMessageHeader hdr; - vRecv >> hdr; + CMessageHeader& hdr = msg.hdr; if (!hdr.IsValid()) { printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str()); @@ -3757,19 +3757,9 @@ bool ProcessMessages(CNode* pfrom) // Message size unsigned int nMessageSize = hdr.nMessageSize; - if (nMessageSize > MAX_SIZE) - { - printf("ProcessMessages(%s, %u bytes) : nMessageSize > MAX_SIZE\n", strCommand.c_str(), nMessageSize); - continue; - } - if (nMessageSize > vRecv.size()) - { - // Rewind and wait for rest of message - vRecv.insert(vRecv.begin(), vHeaderSave.begin(), vHeaderSave.end()); - break; - } // Checksum + CDataStream& vRecv = msg.vRecv; uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); @@ -3780,17 +3770,13 @@ bool ProcessMessages(CNode* pfrom) continue; } - // Copy message to its own buffer - CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion); - vRecv.ignore(nMessageSize); - // Process message bool fRet = false; try { { LOCK(cs_main); - fRet = ProcessMessage(pfrom, strCommand, vMsg); + fRet = ProcessMessage(pfrom, strCommand, vRecv); } if (fShutdown) return true; @@ -3822,7 +3808,10 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - vRecv.Compact(); + // remove processed messages; one incomplete message may remain + if (nMsgPos > 0) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), + pfrom->vRecvMsg.begin() + nMsgPos); return true; } diff --git a/src/net.cpp b/src/net.cpp index 6c8fe3ffc9..0e558228d7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,7 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); + vRecvMsg.clear(); } } @@ -628,6 +628,78 @@ void CNode::copyStats(CNodeStats &stats) } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.size() == 0 || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + + if (handled < 0) + return false; + + pch += handled; + nBytes -= handled; + } + + return true; +} + +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); + + return nCopy; +} + +int CNetMessage::readData(const char *pch, unsigned int nBytes) +{ + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + @@ -676,7 +748,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -708,7 +780,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -873,15 +945,12 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { + if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); + printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); pnode->CloseSocketDisconnect(); } else { @@ -890,8 +959,8 @@ void ThreadSocketHandler2(void* parg) int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); } else if (nBytes == 0) @@ -1693,9 +1762,10 @@ void ThreadMessageHandler2(void* parg) { // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } if (fShutdown) return; diff --git a/src/net.h b/src/net.h index 3b46523cd9..78f8e72fb0 100644 --- a/src/net.h +++ b/src/net.h @@ -27,7 +27,7 @@ extern int nBestHeight; -inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } +inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); } void AddOneShot(std::string strDest); @@ -126,6 +126,44 @@ public: +class CNetMessage { +public: + bool in_data; // parsing header (false) or data (true) + + CDataStream hdrbuf; // partially received header + CMessageHeader hdr; // complete header + unsigned int nHdrPos; + + CDataStream vRecv; // received message data + unsigned int nDataPos; + + CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + hdrbuf.resize(24); + in_data = false; + nHdrPos = 0; + nDataPos = 0; + } + + bool complete() const + { + if (!in_data) + return false; + return (hdr.nMessageSize == nDataPos); + } + + void SetVersion(int nVersionIn) + { + hdrbuf.SetVersion(nVersionIn); + vRecv.SetVersion(nVersionIn); + } + + int readHeader(const char *pch, unsigned int nBytes); + int readData(const char *pch, unsigned int nBytes); +}; + + + + /** Information about a peer */ class CNode @@ -135,9 +173,12 @@ public: uint64 nServices; SOCKET hSocket; CDataStream vSend; - CDataStream vRecv; CCriticalSection cs_vSend; - CCriticalSection cs_vRecv; + + std::vector vRecvMsg; + CCriticalSection cs_vRecvMsg; + int nRecvVersion; + int64 nLastSend; int64 nLastRecv; int64 nLastSendEmpty; @@ -191,10 +232,11 @@ public: CCriticalSection cs_inventory; std::multimap mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION), vRecv(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; + nRecvVersion = MIN_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; nLastSendEmpty = GetTime(); @@ -250,6 +292,26 @@ public: return std::max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0); } + // requires LOCK(cs_vRecvMsg) + unsigned int GetTotalRecvSize() + { + unsigned int total = 0; + for (unsigned int i = 0; i < vRecvMsg.size(); i++) + total += vRecvMsg[i].vRecv.size(); + return total; + } + + // requires LOCK(cs_vRecvMsg) + bool ReceiveMsgBytes(const char *pch, unsigned int nBytes); + + // requires LOCK(cs_vRecvMsg) + void SetRecvVersion(int nVersionIn) + { + nRecvVersion = nVersionIn; + for (unsigned int i = 0; i < vRecvMsg.size(); i++) + vRecvMsg[i].SetVersion(nVersionIn); + } + CNode* AddRef(int64 nTimeout=0) { if (nTimeout != 0) From bc2f5aa72cfb3f456280a6d34c5d425bf24b009c Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 18:04:52 -0500 Subject: [PATCH 2/6] P2P, cosmetic: break out buffer send(2) code into separate function --- src/net.cpp | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 0e558228d7..96719367ce 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -708,6 +708,30 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) +// requires LOCK(cs_vSend) +void SocketSendData(CNode *pnode) +{ + CDataStream& vSend = pnode->vSend; + if (vSend.empty()) + return; + + int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) + { + vSend.erase(vSend.begin(), vSend.begin() + nBytes); + pnode->nLastSend = GetTime(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } +} void ThreadSocketHandler(void* parg) { @@ -994,28 +1018,7 @@ void ThreadSocketHandler2(void* parg) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - { - CDataStream& vSend = pnode->vSend; - if (!vSend.empty()) - { - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } - } - } + SocketSendData(pnode); } // From b9ff2970b9fbb24e2fffc449b4ef478d019633d8 Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 18:20:26 -0500 Subject: [PATCH 3/6] P2P: improve RX/TX flow control 1) "optimistic write": Push each message to kernel socket buffer immediately. 2) If there is write data at select time, that implies send() blocked during optimistic write. Drain write queue, before receiving any more messages. This avoids needlessly queueing received data, if the remote peer is not themselves receiving data. Result: write buffer (and thus memory usage) is kept small, DoS potential is slightly lower, and TCP flow control signalling is properly utilized. The kernel will queue data into the socket buffer, then signal the remote peer to stop sending data, until we resume reading again. --- src/net.cpp | 16 ++++++++++------ src/net.h | 5 +++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 96719367ce..eafb335642 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -855,14 +855,18 @@ void ThreadSocketHandler2(void* parg) { if (pnode->hSocket == INVALID_SOCKET) continue; - FD_SET(pnode->hSocket, &fdsetRecv); - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = max(hSocketMax, pnode->hSocket); - have_fds = true; { TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSend.empty()) - FD_SET(pnode->hSocket, &fdsetSend); + if (lockSend) { + // do not read, if draining write queue + if (!pnode->vSend.empty()) + FD_SET(pnode->hSocket, &fdsetSend); + else + FD_SET(pnode->hSocket, &fdsetRecv); + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = max(hSocketMax, pnode->hSocket); + have_fds = true; + } } } } diff --git a/src/net.h b/src/net.h index 78f8e72fb0..03d32526bc 100644 --- a/src/net.h +++ b/src/net.h @@ -42,6 +42,7 @@ unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string())); void StartNode(void* parg); bool StopNode(); +void SocketSendData(CNode *pnode); enum { @@ -437,6 +438,10 @@ public: printf("(%d bytes)\n", nSize); } + // If write queue empty, attempt "optimistic write" + if (nHeaderStart == 0) + SocketSendData(this); + nHeaderStart = -1; nMessageStart = -1; LEAVE_CRITICAL_SECTION(cs_vSend); From 967f24590b43f0f84148f669d886b40fe45aa978 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Fri, 1 Mar 2013 01:41:28 +0100 Subject: [PATCH 4/6] Some fixes to CNetMessage processing * Change CNode::vRecvMsg to be a deque instead of a vector (less copying) * Make sure to acquire cs_vRecvMsg in CNode::CloseSocketDisconnect (as it may be called without that lock). --- src/main.cpp | 28 ++++++++++++++-------------- src/net.cpp | 11 +++++++++-- src/net.h | 10 +++++----- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 3406144595..0c80f23df9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3708,8 +3708,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) { - if (pfrom->vRecvMsg.empty()) - return true; //if (fDebug) // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); @@ -3721,29 +3719,34 @@ bool ProcessMessages(CNode* pfrom) // (4) checksum // (x) data // + bool fOk = true; - unsigned int nMsgPos = 0; - for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++) - { + std::deque::iterator it = pfrom->vRecvMsg.begin(); + while (it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->vSend.size() >= SendBufferSize()) break; - // get next message; end, if an incomplete message is found - CNetMessage& msg = pfrom->vRecvMsg[nMsgPos]; + // get next message + CNetMessage& msg = *it; //if (fDebug) // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", // msg.hdr.nMessageSize, msg.vRecv.size(), // msg.complete() ? "Y" : "N"); + // end, if an incomplete message is found if (!msg.complete()) break; + // at this point, any failure means we can delete the current message + it++; + // Scan for message start if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); - return false; + fOk = false; + break; } // Read header @@ -3779,7 +3782,7 @@ bool ProcessMessages(CNode* pfrom) fRet = ProcessMessage(pfrom, strCommand, vRecv); } if (fShutdown) - return true; + break; } catch (std::ios_base::failure& e) { @@ -3808,11 +3811,8 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - // remove processed messages; one incomplete message may remain - if (nMsgPos > 0) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), - pfrom->vRecvMsg.begin() + nMsgPos); - return true; + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + return fOk; } diff --git a/src/net.cpp b/src/net.cpp index eafb335642..1016d5d9f0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecvMsg.clear(); + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); } } @@ -634,7 +638,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) while (nBytes > 0) { // get current incomplete message, or create a new one - if (vRecvMsg.size() == 0 || + if (vRecvMsg.empty() || vRecvMsg.back().complete()) vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); @@ -1767,6 +1771,9 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); diff --git a/src/net.h b/src/net.h index 03d32526bc..d779265b79 100644 --- a/src/net.h +++ b/src/net.h @@ -176,7 +176,7 @@ public: CDataStream vSend; CCriticalSection cs_vSend; - std::vector vRecvMsg; + std::deque vRecvMsg; CCriticalSection cs_vRecvMsg; int nRecvVersion; @@ -297,8 +297,8 @@ public: unsigned int GetTotalRecvSize() { unsigned int total = 0; - for (unsigned int i = 0; i < vRecvMsg.size(); i++) - total += vRecvMsg[i].vRecv.size(); + BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) + total += msg.vRecv.size() + 24; return total; } @@ -309,8 +309,8 @@ public: void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - for (unsigned int i = 0; i < vRecvMsg.size(); i++) - vRecvMsg[i].SetVersion(nVersionIn); + BOOST_FOREACH(CNetMessage &msg, vRecvMsg) + msg.SetVersion(nVersionIn); } CNode* AddRef(int64 nTimeout=0) From 41b052ad87633d5a8a989c512c8710b875f2ba88 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Sun, 24 Mar 2013 16:52:24 +0100 Subject: [PATCH 5/6] Use per-message send buffer, rather than per connection --- src/main.cpp | 13 ++++++---- src/net.cpp | 57 ++++++++++++++++++++++++++++--------------- src/net.h | 65 +++++++++++++++++++++++-------------------------- src/protocol.h | 3 ++- src/serialize.h | 8 +++++- 5 files changed, 85 insertions(+), 61 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 0c80f23df9..77061fabd6 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3104,7 +3104,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // Change version pfrom->PushMessage("verack"); - pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); if (!pfrom->fInbound) { @@ -3722,9 +3722,9 @@ bool ProcessMessages(CNode* pfrom) bool fOk = true; std::deque::iterator it = pfrom->vRecvMsg.begin(); - while (it != pfrom->vRecvMsg.end()) { + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->vSend.size() >= SendBufferSize()) + if (pfrom->nSendSize >= SendBufferSize()) break; // get next message @@ -3811,7 +3811,10 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + // In case the connection got shut down, its receive buffer was wiped + if (!pfrom->fDisconnect) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + return fOk; } @@ -3826,7 +3829,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle) // Keep-alive ping. We send a nonce of zero because we don't use it anywhere // right now. - if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) { + if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) { uint64 nonce = 0; if (pto->nVersion > BIP0031_VERSION) pto->PushMessage("ping", nonce); diff --git a/src/net.cpp b/src/net.cpp index 1016d5d9f0..9ee6cb423c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -715,26 +715,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) void SocketSendData(CNode *pnode) { - CDataStream& vSend = pnode->vSend; - if (vSend.empty()) - return; + std::deque::iterator it = pnode->vSendMsg.begin(); - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; } } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } void ThreadSocketHandler(void* parg) @@ -776,7 +793,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -863,7 +880,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { // do not read, if draining write queue - if (!pnode->vSend.empty()) + if (!pnode->vSendMsg.empty()) FD_SET(pnode->hSocket, &fdsetSend); else FD_SET(pnode->hSocket, &fdsetRecv); @@ -1032,7 +1049,7 @@ void ThreadSocketHandler2(void* parg) // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { diff --git a/src/net.h b/src/net.h index d779265b79..9805e39f18 100644 --- a/src/net.h +++ b/src/net.h @@ -173,7 +173,10 @@ public: // socket uint64 nServices; SOCKET hSocket; - CDataStream vSend; + CDataStream ssSend; + size_t nSendSize; // total size of all vSendMsg entries + size_t nSendOffset; // offset inside the first vSendMsg already sent + std::deque vSendMsg; CCriticalSection cs_vSend; std::deque vRecvMsg; @@ -184,8 +187,6 @@ public: int64 nLastRecv; int64 nLastSendEmpty; int64 nTimeConnected; - int nHeaderStart; - unsigned int nMessageStart; CAddress addr; std::string addrName; CService addrLocal; @@ -233,7 +234,7 @@ public: CCriticalSection cs_inventory; std::multimap mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; @@ -242,8 +243,6 @@ public: nLastRecv = 0; nLastSendEmpty = GetTime(); nTimeConnected = GetTime(); - nHeaderStart = -1; - nMessageStart = -1; addr = addrIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -256,6 +255,8 @@ public: fDisconnect = false; nRefCount = 0; nReleaseTime = 0; + nSendSize = 0; + nSendOffset = 0; hashContinue = 0; pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; @@ -387,11 +388,8 @@ public: void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) { ENTER_CRITICAL_SECTION(cs_vSend); - if (nHeaderStart != -1) - AbortMessage(); - nHeaderStart = vSend.size(); - vSend << CMessageHeader(pszCommand, 0); - nMessageStart = vSend.size(); + assert(ssSend.size() == 0); + ssSend << CMessageHeader(pszCommand, 0); if (fDebug) printf("sending: %s ", pszCommand); } @@ -399,11 +397,8 @@ public: // TODO: Document the precondition of this function. Is cs_vSend locked? void AbortMessage() UNLOCK_FUNCTION(cs_vSend) { - if (nHeaderStart < 0) - return; - vSend.resize(nHeaderStart); - nHeaderStart = -1; - nMessageStart = -1; + ssSend.clear(); + LEAVE_CRITICAL_SECTION(cs_vSend); if (fDebug) @@ -420,30 +415,32 @@ public: return; } - if (nHeaderStart < 0) + if (ssSend.size() == 0) return; // Set the size - unsigned int nSize = vSend.size() - nMessageStart; - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); + unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; + memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); // Set the checksum - uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); + assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); + memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); if (fDebug) { printf("(%d bytes)\n", nSize); } + std::deque::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); + ssSend.GetAndClear(*it); + nSendSize += (*it).size(); + // If write queue empty, attempt "optimistic write" - if (nHeaderStart == 0) + if (it == vSendMsg.begin()) SocketSendData(this); - nHeaderStart = -1; - nMessageStart = -1; LEAVE_CRITICAL_SECTION(cs_vSend); } @@ -470,7 +467,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1; + ssSend << a1; EndMessage(); } catch (...) @@ -486,7 +483,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2; + ssSend << a1 << a2; EndMessage(); } catch (...) @@ -502,7 +499,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3; + ssSend << a1 << a2 << a3; EndMessage(); } catch (...) @@ -518,7 +515,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4; + ssSend << a1 << a2 << a3 << a4; EndMessage(); } catch (...) @@ -534,7 +531,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5; + ssSend << a1 << a2 << a3 << a4 << a5; EndMessage(); } catch (...) @@ -550,7 +547,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6; + ssSend << a1 << a2 << a3 << a4 << a5 << a6; EndMessage(); } catch (...) @@ -566,7 +563,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; EndMessage(); } catch (...) @@ -582,7 +579,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; EndMessage(); } catch (...) @@ -598,7 +595,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; EndMessage(); } catch (...) diff --git a/src/protocol.h b/src/protocol.h index f5c162054e..4998425070 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -56,7 +56,8 @@ class CMessageHeader CHECKSUM_SIZE=sizeof(int), MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE, - CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE + CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE, + HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE }; char pchMessageStart[MESSAGE_START_SIZE]; char pchCommand[COMMAND_SIZE]; diff --git a/src/serialize.h b/src/serialize.h index f2626281c1..e3d9939bcc 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -789,6 +789,7 @@ struct ser_streamplaceholder +typedef std::vector > CSerializeData; /** Double ended buffer combining vector and stream-like interfaces. * @@ -798,7 +799,7 @@ struct ser_streamplaceholder class CDataStream { protected: - typedef std::vector > vector_type; + typedef CSerializeData vector_type; vector_type vch; unsigned int nReadPos; short state; @@ -1095,6 +1096,11 @@ public: ::Unserialize(*this, obj, nType, nVersion); return (*this); } + + void GetAndClear(CSerializeData &data) { + vch.swap(data); + CSerializeData().swap(vch); + } }; From c7f039b674b43b741f20bf7521eb8a68426f4275 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Fri, 29 Mar 2013 23:49:38 +0100 Subject: [PATCH 6/6] Process getdata invs separately until send buffer overflows There exists a per-message-processed send buffer overflow protection, where processing is halted when the send buffer is larger than the allowed maximum. This protection does not apply to individual items, however, and getdata has the potential for causing large amounts of data to be sent. In case several hundreds of blocks are requested in one getdata, the send buffer can easily grow 50 megabytes above the send buffer limit. This commit breaks up the processing of getdata requests, remembering them inside a CNode when too many are requested at once. --- src/main.cpp | 210 ++++++++++++++++++++++++++++----------------------- src/net.h | 1 + 2 files changed, 117 insertions(+), 94 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 77061fabd6..b29091b4fe 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3029,6 +3029,115 @@ bool static AlreadyHave(const CInv& inv) unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 }; +void static ProcessGetData(CNode* pfrom) +{ + std::deque::iterator it = pfrom->vRecvGetData.begin(); + + vector vNotFound; + + while (it != pfrom->vRecvGetData.end()) { + // Don't bother if send buffer is too full to respond anyway + if (pfrom->nSendSize >= SendBufferSize()) + break; + + const CInv &inv = *it; + { + if (fShutdown) + break; + it++; + + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) + { + // Send block from disk + map::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) + { + CBlock block; + block.ReadFromDisk((*mi).second); + if (inv.type == MSG_BLOCK) + pfrom->PushMessage("block", block); + else // MSG_FILTERED_BLOCK) + { + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) + { + CMerkleBlock merkleBlock(block, *pfrom->pfilter); + pfrom->PushMessage("merkleblock", merkleBlock); + // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see + // This avoids hurting performance by pointlessly requiring a round-trip + // Note that there is currently no way for a node to request any single transactions we didnt send here - + // they must either disconnect and retry or request the full block. + // Thus, the protocol spec specified allows for us to provide duplicate txn here, + // however we MUST always provide at least what the remote peer needs + typedef std::pair PairType; + BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) + if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) + pfrom->PushMessage("tx", block.vtx[pair.first]); + } + // else + // no response + } + + // Trigger them to send a getblocks request for the next batch of inventory + if (inv.hash == pfrom->hashContinue) + { + // Bypass PushInventory, this must send even if redundant, + // and we want it right after the last block so they don't + // wait for other stuff first. + vector vInv; + vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); + pfrom->PushMessage("inv", vInv); + pfrom->hashContinue = 0; + } + } + } + else if (inv.IsKnownType()) + { + // Send stream from relay memory + bool pushed = false; + { + LOCK(cs_mapRelay); + map::iterator mi = mapRelay.find(inv); + if (mi != mapRelay.end()) { + pfrom->PushMessage(inv.GetCommand(), (*mi).second); + pushed = true; + } + } + if (!pushed && inv.type == MSG_TX) { + LOCK(mempool.cs); + if (mempool.exists(inv.hash)) { + CTransaction tx = mempool.lookup(inv.hash); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss.reserve(1000); + ss << tx; + pfrom->PushMessage("tx", ss); + pushed = true; + } + } + if (!pushed) { + vNotFound.push_back(inv); + } + } + + // Track requests for our stuff. + Inventory(inv.hash); + } + } + + pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); + + if (!vNotFound.empty()) { + // Let the peer know that we didn't find what it asked for, so it doesn't + // have to wait around forever. Currently only SPV clients actually care + // about this message: it's needed when they are recursively walking the + // dependencies of relevant unconfirmed transactions. SPV clients want to + // do that because they want to know about (and store and rebroadcast and + // risk analyze) the dependencies of transactions relevant to them, without + // having to download the entire memory pool. + pfrom->PushMessage("notfound", vNotFound); + } +} + bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) { RandAddSeedPerfmon(); @@ -3302,101 +3411,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) if (fDebugNet || (vInv.size() != 1)) printf("received getdata (%"PRIszu" invsz)\n", vInv.size()); - vector vNotFound; - BOOST_FOREACH(const CInv& inv, vInv) - { - if (fShutdown) - return true; - if (fDebugNet || (vInv.size() == 1)) - printf("received getdata for: %s\n", inv.ToString().c_str()); + if ((fDebugNet && vInv.size() > 0) || (vInv.size() == 1)) + printf("received getdata for: %s\n", vInv[0].ToString().c_str()); - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK) - { - // Send block from disk - map::iterator mi = mapBlockIndex.find(inv.hash); - if (mi != mapBlockIndex.end()) - { - CBlock block; - block.ReadFromDisk((*mi).second); - if (inv.type == MSG_BLOCK) - pfrom->PushMessage("block", block); - else // MSG_FILTERED_BLOCK) - { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) - { - CMerkleBlock merkleBlock(block, *pfrom->pfilter); - pfrom->PushMessage("merkleblock", merkleBlock); - // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see - // This avoids hurting performance by pointlessly requiring a round-trip - // Note that there is currently no way for a node to request any single transactions we didnt send here - - // they must either disconnect and retry or request the full block. - // Thus, the protocol spec specified allows for us to provide duplicate txn here, - // however we MUST always provide at least what the remote peer needs - typedef std::pair PairType; - BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn) - if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second))) - pfrom->PushMessage("tx", block.vtx[pair.first]); - } - // else - // no response - } - - // Trigger them to send a getblocks request for the next batch of inventory - if (inv.hash == pfrom->hashContinue) - { - // Bypass PushInventory, this must send even if redundant, - // and we want it right after the last block so they don't - // wait for other stuff first. - vector vInv; - vInv.push_back(CInv(MSG_BLOCK, hashBestChain)); - pfrom->PushMessage("inv", vInv); - pfrom->hashContinue = 0; - } - } - } - else if (inv.IsKnownType()) - { - // Send stream from relay memory - bool pushed = false; - { - LOCK(cs_mapRelay); - map::iterator mi = mapRelay.find(inv); - if (mi != mapRelay.end()) { - pfrom->PushMessage(inv.GetCommand(), (*mi).second); - pushed = true; - } - } - if (!pushed && inv.type == MSG_TX) { - LOCK(mempool.cs); - if (mempool.exists(inv.hash)) { - CTransaction tx = mempool.lookup(inv.hash); - CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); - ss.reserve(1000); - ss << tx; - pfrom->PushMessage("tx", ss); - pushed = true; - } - } - if (!pushed) { - vNotFound.push_back(inv); - } - } - - // Track requests for our stuff. - Inventory(inv.hash); - - if (!vNotFound.empty()) { - // Let the peer know that we didn't find what it asked for, so it doesn't - // have to wait around forever. Currently only SPV clients actually care - // about this message: it's needed when they are recursively walking the - // dependencies of relevant unconfirmed transactions. SPV clients want to - // do that because they want to know about (and store and rebroadcast and - // risk analyze) the dependencies of transactions relevant to them, without - // having to download the entire memory pool. - pfrom->PushMessage("notfound", vNotFound); - } - } + pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom); } @@ -3721,6 +3740,9 @@ bool ProcessMessages(CNode* pfrom) // bool fOk = true; + if (!pfrom->vRecvGetData.empty()) + ProcessGetData(pfrom); + std::deque::iterator it = pfrom->vRecvMsg.begin(); while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway diff --git a/src/net.h b/src/net.h index 9805e39f18..368e4cd4bb 100644 --- a/src/net.h +++ b/src/net.h @@ -179,6 +179,7 @@ public: std::deque vSendMsg; CCriticalSection cs_vSend; + std::deque vRecvGetData; std::deque vRecvMsg; CCriticalSection cs_vRecvMsg; int nRecvVersion;