diff --git a/src/net.cpp b/src/net.cpp index cc3274716..70e40c727 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -462,11 +462,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() @@ -673,16 +668,18 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); @@ -714,7 +711,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -805,7 +802,7 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { std::deque::iterator it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -822,6 +819,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1110,8 +1108,7 @@ void CConnman::ThreadSocketHandler() std::vector vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fNetworkNode=%d fInbound=%d fMasternode=%d\n", pnode->id, pnode->addr.ToString(), pnode->GetRefCount(), pnode->fNetworkNode, pnode->fInbound, pnode->fMasternode); @@ -1148,13 +1145,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1209,15 +1202,10 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { @@ -1228,10 +1216,7 @@ void CConnman::ThreadSocketHandler() } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1284,8 +1269,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1296,11 +1279,23 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - if(notify) - condMsgProc.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } } else if (nBytes == 0) { @@ -1334,8 +1329,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1371,8 +1367,14 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + { + std::lock_guard lock(mutexMsgProc); + fMsgProcWake = true; + } + condMsgProc.notify_one(); +} @@ -1892,7 +1894,7 @@ void CConnman::ThreadMessageHandler() { std::vector vNodesCopy = CopyNodeVector(); - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1900,22 +1902,8 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->fDisconnect = true; - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) - { - fSleep = false; - } - } - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; @@ -1931,10 +1919,11 @@ void CConnman::ThreadMessageHandler() ReleaseNodeVector(vNodesCopy); - if (fSleep) { - std::unique_lock lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2129,7 +2118,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; SetBestHeight(connOptions.nBestHeight); @@ -2191,6 +2180,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2684,6 +2678,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn vchKeyedNetGroup = CalculateKeyedNetGroup(addr); id = idIn; nLocalServices = nLocalServicesIn; + fPauseRecv = false; + fPauseSend = false; + nProcessQueueSize = 0; GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce)); nMyStartingHeight = nMyStartingHeightIn; @@ -2819,6 +2816,9 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size(); pnode->nSendSize += strm.size(); + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; + // If write queue empty, attempt "optimistic write" if (optimisticSend == true) nBytesSent = SocketSendData(pnode); diff --git a/src/net.h b/src/net.h index 15a71c7b3..193eba6f2 100644 --- a/src/net.h +++ b/src/net.h @@ -386,6 +386,7 @@ public: int GetBestHeight() const; + unsigned int GetReceiveFloodSize() const; private: struct ListenSocket { SOCKET socket; @@ -403,6 +404,8 @@ private: void ThreadDNSAddressSeed(); void ThreadMnbRequestConnections(); + void WakeMessageHandler(); + CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CSubNet& subNet); CNode* FindNode(const std::string& addrName); @@ -415,6 +418,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -425,8 +429,6 @@ private: void DumpData(); void DumpBanlist(); - unsigned int GetReceiveFloodSize() const; - CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand); void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand); void EndMessage(CDataStream& strm); @@ -487,6 +489,9 @@ private: std::atomic nBestHeight; CClientUIInterface* clientInterface; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic flagInterruptMsgProc; @@ -505,7 +510,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -666,11 +670,13 @@ public: std::deque vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list vProcessMsg; + size_t nProcessQueueSize; + std::deque vRecvGetData; - std::deque vRecvMsg; - CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; - int nRecvVersion; + std::atomic nRecvVersion; int64_t nLastSend; int64_t nLastRecv; @@ -708,6 +714,9 @@ public: CBloomFilter* pfilter; int nRefCount; NodeId id; + + std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -770,6 +779,7 @@ private: ServiceFlags nLocalServices; int nMyStartingHeight; int nSendVersion; + std::list vRecvMsg; // Used only by SocketHandler thread public: NodeId GetId() const { @@ -791,24 +801,15 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); - // requires LOCK(cs_vRecvMsg) void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - BOOST_FOREACH(CNetMessage &msg, vRecvMsg) - msg.SetVersion(nVersionIn); + } + int GetRecvVersion() + { + return nRecvVersion; } void SetSendVersion(int nVersionIn); int GetSendVersion() const; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index edae32103..0e1a83afa 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -782,15 +782,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic& interruptMsgProc) { std::deque::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - vector vNotFound; LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -1074,7 +1072,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { const CChainParams& chainparams = Params(); RandAddSeedPerfmon(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); @@ -1416,11 +1413,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Track requests for our stuff GetMainSignals().Inventory(inv.hash); - - if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) { - Misbehaving(pfrom->GetId(), 50); - return error("send buffer size() = %u", pfrom->nSendSize); - } } if (!vToFetch.empty()) @@ -2171,14 +2163,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } -// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - //if (fDebug) - // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size()); - // // Message format // (4) message start @@ -2187,40 +2174,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - std::deque::iterator it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) - break; + if (pfrom->fPauseSend) + return false; - // get next message - CNetMessage& msg = *it; - - //if (fDebug) - // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, - // msg.hdr.nMessageSize, msg.vRecv.size(), - // msg.complete() ? "Y" : "N"); - - // end, if an incomplete message is found - if (!msg.complete()) - break; - - // at this point, any failure means we can delete the current message - it++; + std::list msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); + msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2228,7 +2215,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2244,7 +2231,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2253,7 +2240,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2282,14 +2271,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } diff --git a/src/net_processing.h b/src/net_processing.h index db04b6014..afb0f0c94 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -45,6 +45,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done */ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interrupt);