From 4d712e366ca7fffaf96394ef01c9246482c0d92e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:28 -0500 Subject: [PATCH] net: add a new message queue for the message processor This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split. --- src/net.cpp | 12 +++++++++++- src/net.h | 3 +++ src/net_processing.cpp | 25 ++++++++++--------------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 947f016798..df2109e3f7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if (notify) + if (notify) { + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + } WakeMessageHandler(); + } } else if (nBytes == 0) { diff --git a/src/net.h b/src/net.h index 4fc41bddac..21864e73d1 100644 --- a/src/net.h +++ b/src/net.h @@ -608,6 +608,9 @@ public: std::deque> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list vProcessMsg; + std::deque vRecvGetData; std::list vRecvMsg; CCriticalSection cs_vRecvMsg; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 605e142e8d..9963a872e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (pfrom->nSendSize >= nMaxSendBufferSize) return false; - auto it = pfrom->vRecvMsg.begin(); - if (it == pfrom->vRecvMsg.end()) - return false; - - // end, if an incomplete message is found - if (!it->complete()) - return false; - - // get next message - CNetMessage msg = std::move(*it); - - // at this point, any failure means we can delete the current message - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); - - fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); + std::list msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start