mirror of
https://github.com/dashpay/dash.git
synced 2024-12-26 20:42:59 +01:00
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
This commit is contained in:
parent
c5a8b1b946
commit
4d712e366c
12
src/net.cpp
12
src/net.cpp
@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
|
|||||||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||||
pnode->CloseSocketDisconnect();
|
pnode->CloseSocketDisconnect();
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
if (notify)
|
if (notify) {
|
||||||
|
auto it(pnode->vRecvMsg.begin());
|
||||||
|
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||||
|
if (!it->complete())
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
LOCK(pnode->cs_vProcessMsg);
|
||||||
|
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||||
|
}
|
||||||
WakeMessageHandler();
|
WakeMessageHandler();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (nBytes == 0)
|
else if (nBytes == 0)
|
||||||
{
|
{
|
||||||
|
@ -608,6 +608,9 @@ public:
|
|||||||
std::deque<std::vector<unsigned char>> vSendMsg;
|
std::deque<std::vector<unsigned char>> vSendMsg;
|
||||||
CCriticalSection cs_vSend;
|
CCriticalSection cs_vSend;
|
||||||
|
|
||||||
|
CCriticalSection cs_vProcessMsg;
|
||||||
|
std::list<CNetMessage> vProcessMsg;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
std::deque<CInv> vRecvGetData;
|
||||||
std::list<CNetMessage> vRecvMsg;
|
std::list<CNetMessage> vRecvMsg;
|
||||||
CCriticalSection cs_vRecvMsg;
|
CCriticalSection cs_vRecvMsg;
|
||||||
|
@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
auto it = pfrom->vRecvMsg.begin();
|
std::list<CNetMessage> msgs;
|
||||||
if (it == pfrom->vRecvMsg.end())
|
{
|
||||||
return false;
|
LOCK(pfrom->cs_vProcessMsg);
|
||||||
|
if (pfrom->vProcessMsg.empty())
|
||||||
// end, if an incomplete message is found
|
return false;
|
||||||
if (!it->complete())
|
// Just take one message
|
||||||
return false;
|
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||||
|
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||||
// get next message
|
}
|
||||||
CNetMessage msg = std::move(*it);
|
CNetMessage& msg(msgs.front());
|
||||||
|
|
||||||
// at this point, any failure means we can delete the current message
|
|
||||||
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
|
|
||||||
|
|
||||||
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
|
|
||||||
|
|
||||||
msg.SetVersion(pfrom->GetRecvVersion());
|
msg.SetVersion(pfrom->GetRecvVersion());
|
||||||
// Scan for message start
|
// Scan for message start
|
||||||
|
Loading…
Reference in New Issue
Block a user