Backport Bitcoin PR#9441: Net: Massive speedup. Net locks overhaul (#1586)
* net: fix typo causing the wrong receive buffer size Surprisingly this hasn't been causing me any issues while testing, probably because it requires lots of large blocks to be flying around. Send/Recv corks need tests! * net: make vRecvMsg a list so that we can use splice() * net: make GetReceiveFloodSize public This will be needed so that the message processor can cork incoming messages * net: only disconnect if fDisconnect has been set These conditions are problematic to check without locking, and we shouldn't be relying on the refcount to disconnect. * net: wait until the node is destroyed to delete its recv buffer when vRecvMsg becomes a private buffer, it won't make sense to allow other threads to mess with it anymore. * net: set message deserialization version when it's actually time to deserialize We'll soon no longer have access to vRecvMsg, and this is more intuitive anyway. * net: handle message accounting in ReceiveMsgBytes This allows locking to be pushed down to only where it's needed Also reuse the current time rather than checking multiple times. * net: record bytes written before notifying the message processor * net: Add a simple function for waking the message handler This may be used publicly in the future * net: remove useless comments * net: remove redundant max sendbuffer size check This is left-over from before there was proper accounting. Hitting 2x the sendbuffer size should not be possible. * net: rework the way that the messagehandler sleeps In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases. * net: add a new message queue for the message processor This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split. * net: add a flag to indicate when a node's process queue is full Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue. * net: add a flag to indicate when a node's send buffer is full Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer. * net: remove cs_vRecvMsg vRecvMsg is now only touched by the socket handler thread. The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also only used by the socket handler thread, with the exception of queries from rpc/gui. These accesses are not threadsafe, but they never were. This needs to be addressed separately. Also, update comment describing data flow
This commit is contained in:
parent
b9c67258ba
commit
2472999da0
116
src/net.cpp
116
src/net.cpp
@ -462,11 +462,6 @@ void CNode::CloseSocketDisconnect()
|
|||||||
LogPrint("net", "disconnecting peer=%d\n", id);
|
LogPrint("net", "disconnecting peer=%d\n", id);
|
||||||
CloseSocket(hSocket);
|
CloseSocket(hSocket);
|
||||||
}
|
}
|
||||||
|
|
||||||
// in case this fails, we'll empty the recv buffer when the CNode is deleted
|
|
||||||
TRY_LOCK(cs_vRecvMsg, lockRecv);
|
|
||||||
if (lockRecv)
|
|
||||||
vRecvMsg.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CConnman::ClearBanned()
|
void CConnman::ClearBanned()
|
||||||
@ -673,16 +668,18 @@ void CNode::copyStats(CNodeStats &stats)
|
|||||||
}
|
}
|
||||||
#undef X
|
#undef X
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
|
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
|
||||||
{
|
{
|
||||||
complete = false;
|
complete = false;
|
||||||
|
int64_t nTimeMicros = GetTimeMicros();
|
||||||
|
nLastRecv = nTimeMicros / 1000000;
|
||||||
|
nRecvBytes += nBytes;
|
||||||
while (nBytes > 0) {
|
while (nBytes > 0) {
|
||||||
|
|
||||||
// get current incomplete message, or create a new one
|
// get current incomplete message, or create a new one
|
||||||
if (vRecvMsg.empty() ||
|
if (vRecvMsg.empty() ||
|
||||||
vRecvMsg.back().complete())
|
vRecvMsg.back().complete())
|
||||||
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
|
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
|
||||||
|
|
||||||
CNetMessage& msg = vRecvMsg.back();
|
CNetMessage& msg = vRecvMsg.back();
|
||||||
|
|
||||||
@ -714,7 +711,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
|
|||||||
assert(i != mapRecvBytesPerMsgCmd.end());
|
assert(i != mapRecvBytesPerMsgCmd.end());
|
||||||
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
|
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
|
||||||
|
|
||||||
msg.nTime = GetTimeMicros();
|
msg.nTime = nTimeMicros;
|
||||||
complete = true;
|
complete = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -805,7 +802,7 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
|
|||||||
|
|
||||||
|
|
||||||
// requires LOCK(cs_vSend)
|
// requires LOCK(cs_vSend)
|
||||||
size_t SocketSendData(CNode *pnode)
|
size_t CConnman::SocketSendData(CNode *pnode)
|
||||||
{
|
{
|
||||||
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
|
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
|
||||||
size_t nSentSize = 0;
|
size_t nSentSize = 0;
|
||||||
@ -822,6 +819,7 @@ size_t SocketSendData(CNode *pnode)
|
|||||||
if (pnode->nSendOffset == data.size()) {
|
if (pnode->nSendOffset == data.size()) {
|
||||||
pnode->nSendOffset = 0;
|
pnode->nSendOffset = 0;
|
||||||
pnode->nSendSize -= data.size();
|
pnode->nSendSize -= data.size();
|
||||||
|
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
|
||||||
it++;
|
it++;
|
||||||
} else {
|
} else {
|
||||||
// could not send full message; stop sending more
|
// could not send full message; stop sending more
|
||||||
@ -1110,8 +1108,7 @@ void CConnman::ThreadSocketHandler()
|
|||||||
std::vector<CNode*> vNodesCopy = vNodes;
|
std::vector<CNode*> vNodesCopy = vNodes;
|
||||||
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
||||||
{
|
{
|
||||||
if (pnode->fDisconnect ||
|
if (pnode->fDisconnect)
|
||||||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
|
|
||||||
{
|
{
|
||||||
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fNetworkNode=%d fInbound=%d fMasternode=%d\n",
|
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fNetworkNode=%d fInbound=%d fMasternode=%d\n",
|
||||||
pnode->id, pnode->addr.ToString(), pnode->GetRefCount(), pnode->fNetworkNode, pnode->fInbound, pnode->fMasternode);
|
pnode->id, pnode->addr.ToString(), pnode->GetRefCount(), pnode->fNetworkNode, pnode->fInbound, pnode->fMasternode);
|
||||||
@ -1148,13 +1145,9 @@ void CConnman::ThreadSocketHandler()
|
|||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend)
|
if (lockSend)
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
|
||||||
if (lockRecv)
|
|
||||||
{
|
|
||||||
TRY_LOCK(pnode->cs_inventory, lockInv);
|
TRY_LOCK(pnode->cs_inventory, lockInv);
|
||||||
if (lockInv)
|
if (lockInv)
|
||||||
fDelete = true;
|
fDelete = true;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (fDelete)
|
if (fDelete)
|
||||||
@ -1209,15 +1202,10 @@ void CConnman::ThreadSocketHandler()
|
|||||||
// write buffer in this case before receiving more. This avoids
|
// write buffer in this case before receiving more. This avoids
|
||||||
// needlessly queueing received data, if the remote peer is not themselves
|
// needlessly queueing received data, if the remote peer is not themselves
|
||||||
// receiving data. This means properly utilizing TCP flow control signalling.
|
// receiving data. This means properly utilizing TCP flow control signalling.
|
||||||
// * Otherwise, if there is no (complete) message in the receive buffer,
|
// * Otherwise, if there is space left in the receive buffer, select() for
|
||||||
// or there is space left in the buffer, select() for receiving data.
|
// receiving data.
|
||||||
// * (if neither of the above applies, there is certainly one message
|
// * Hand off all complete messages to the processor, to be handled without
|
||||||
// in the receiver buffer ready to be processed).
|
// blocking here.
|
||||||
// Together, that means that at least one of the following is always possible,
|
|
||||||
// so we don't deadlock:
|
|
||||||
// * We send some data.
|
|
||||||
// * We wait for data to be received (and disconnect after timeout).
|
|
||||||
// * We process a message in the buffer (message handler thread).
|
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend) {
|
if (lockSend) {
|
||||||
@ -1228,10 +1216,7 @@ void CConnman::ThreadSocketHandler()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
if (!pnode->fPauseRecv)
|
||||||
if (lockRecv && (
|
|
||||||
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
|
|
||||||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
|
|
||||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1284,8 +1269,6 @@ void CConnman::ThreadSocketHandler()
|
|||||||
continue;
|
continue;
|
||||||
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
|
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
|
||||||
if (lockRecv)
|
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
// typical socket buffer is 8K-64K
|
// typical socket buffer is 8K-64K
|
||||||
@ -1296,11 +1279,23 @@ void CConnman::ThreadSocketHandler()
|
|||||||
bool notify = false;
|
bool notify = false;
|
||||||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||||
pnode->CloseSocketDisconnect();
|
pnode->CloseSocketDisconnect();
|
||||||
if(notify)
|
|
||||||
condMsgProc.notify_one();
|
|
||||||
pnode->nLastRecv = GetTime();
|
|
||||||
pnode->nRecvBytes += nBytes;
|
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
|
if (notify) {
|
||||||
|
size_t nSizeAdded = 0;
|
||||||
|
auto it(pnode->vRecvMsg.begin());
|
||||||
|
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||||
|
if (!it->complete())
|
||||||
|
break;
|
||||||
|
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
LOCK(pnode->cs_vProcessMsg);
|
||||||
|
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||||
|
pnode->nProcessQueueSize += nSizeAdded;
|
||||||
|
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
||||||
|
}
|
||||||
|
WakeMessageHandler();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (nBytes == 0)
|
else if (nBytes == 0)
|
||||||
{
|
{
|
||||||
@ -1334,8 +1329,9 @@ void CConnman::ThreadSocketHandler()
|
|||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend) {
|
if (lockSend) {
|
||||||
size_t nBytes = SocketSendData(pnode);
|
size_t nBytes = SocketSendData(pnode);
|
||||||
if (nBytes)
|
if (nBytes) {
|
||||||
RecordBytesSent(nBytes);
|
RecordBytesSent(nBytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1371,8 +1367,14 @@ void CConnman::ThreadSocketHandler()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CConnman::WakeMessageHandler()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutexMsgProc);
|
||||||
|
fMsgProcWake = true;
|
||||||
|
}
|
||||||
|
condMsgProc.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -1892,7 +1894,7 @@ void CConnman::ThreadMessageHandler()
|
|||||||
{
|
{
|
||||||
std::vector<CNode*> vNodesCopy = CopyNodeVector();
|
std::vector<CNode*> vNodesCopy = CopyNodeVector();
|
||||||
|
|
||||||
bool fSleep = true;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
BOOST_FOREACH(CNode* pnode, vNodesCopy)
|
||||||
{
|
{
|
||||||
@ -1900,22 +1902,8 @@ void CConnman::ThreadMessageHandler()
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
// Receive messages
|
// Receive messages
|
||||||
{
|
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
|
||||||
if (lockRecv)
|
|
||||||
{
|
|
||||||
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
|
|
||||||
pnode->fDisconnect = true;
|
|
||||||
|
|
||||||
if (pnode->nSendSize < GetSendBufferSize())
|
|
||||||
{
|
|
||||||
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
|
|
||||||
{
|
|
||||||
fSleep = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (flagInterruptMsgProc)
|
if (flagInterruptMsgProc)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -1931,10 +1919,11 @@ void CConnman::ThreadMessageHandler()
|
|||||||
|
|
||||||
ReleaseNodeVector(vNodesCopy);
|
ReleaseNodeVector(vNodesCopy);
|
||||||
|
|
||||||
if (fSleep) {
|
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
||||||
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
if (!fMoreWork) {
|
||||||
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
|
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
|
||||||
}
|
}
|
||||||
|
fMsgProcWake = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2129,7 +2118,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
|
|||||||
nMaxFeeler = connOptions.nMaxFeeler;
|
nMaxFeeler = connOptions.nMaxFeeler;
|
||||||
|
|
||||||
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
|
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
|
||||||
nReceiveFloodSize = connOptions.nSendBufferMaxSize;
|
nReceiveFloodSize = connOptions.nReceiveFloodSize;
|
||||||
|
|
||||||
SetBestHeight(connOptions.nBestHeight);
|
SetBestHeight(connOptions.nBestHeight);
|
||||||
|
|
||||||
@ -2191,6 +2180,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
|
|||||||
interruptNet.reset();
|
interruptNet.reset();
|
||||||
flagInterruptMsgProc = false;
|
flagInterruptMsgProc = false;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(mutexMsgProc);
|
||||||
|
fMsgProcWake = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Send and receive from sockets, accept connections
|
// Send and receive from sockets, accept connections
|
||||||
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
|
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
|
||||||
|
|
||||||
@ -2684,6 +2678,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
|
|||||||
vchKeyedNetGroup = CalculateKeyedNetGroup(addr);
|
vchKeyedNetGroup = CalculateKeyedNetGroup(addr);
|
||||||
id = idIn;
|
id = idIn;
|
||||||
nLocalServices = nLocalServicesIn;
|
nLocalServices = nLocalServicesIn;
|
||||||
|
fPauseRecv = false;
|
||||||
|
fPauseSend = false;
|
||||||
|
nProcessQueueSize = 0;
|
||||||
|
|
||||||
GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
|
GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
|
||||||
nMyStartingHeight = nMyStartingHeightIn;
|
nMyStartingHeight = nMyStartingHeightIn;
|
||||||
@ -2819,6 +2816,9 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
|
|||||||
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
|
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
|
||||||
pnode->nSendSize += strm.size();
|
pnode->nSendSize += strm.size();
|
||||||
|
|
||||||
|
if (pnode->nSendSize > nSendBufferMaxSize)
|
||||||
|
pnode->fPauseSend = true;
|
||||||
|
|
||||||
// If write queue empty, attempt "optimistic write"
|
// If write queue empty, attempt "optimistic write"
|
||||||
if (optimisticSend == true)
|
if (optimisticSend == true)
|
||||||
nBytesSent = SocketSendData(pnode);
|
nBytesSent = SocketSendData(pnode);
|
||||||
|
39
src/net.h
39
src/net.h
@ -386,6 +386,7 @@ public:
|
|||||||
int GetBestHeight() const;
|
int GetBestHeight() const;
|
||||||
|
|
||||||
|
|
||||||
|
unsigned int GetReceiveFloodSize() const;
|
||||||
private:
|
private:
|
||||||
struct ListenSocket {
|
struct ListenSocket {
|
||||||
SOCKET socket;
|
SOCKET socket;
|
||||||
@ -403,6 +404,8 @@ private:
|
|||||||
void ThreadDNSAddressSeed();
|
void ThreadDNSAddressSeed();
|
||||||
void ThreadMnbRequestConnections();
|
void ThreadMnbRequestConnections();
|
||||||
|
|
||||||
|
void WakeMessageHandler();
|
||||||
|
|
||||||
CNode* FindNode(const CNetAddr& ip);
|
CNode* FindNode(const CNetAddr& ip);
|
||||||
CNode* FindNode(const CSubNet& subNet);
|
CNode* FindNode(const CSubNet& subNet);
|
||||||
CNode* FindNode(const std::string& addrName);
|
CNode* FindNode(const std::string& addrName);
|
||||||
@ -415,6 +418,7 @@ private:
|
|||||||
|
|
||||||
NodeId GetNewNodeId();
|
NodeId GetNewNodeId();
|
||||||
|
|
||||||
|
size_t SocketSendData(CNode *pnode);
|
||||||
//!check is the banlist has unwritten changes
|
//!check is the banlist has unwritten changes
|
||||||
bool BannedSetIsDirty();
|
bool BannedSetIsDirty();
|
||||||
//!set the "dirty" flag for the banlist
|
//!set the "dirty" flag for the banlist
|
||||||
@ -425,8 +429,6 @@ private:
|
|||||||
void DumpData();
|
void DumpData();
|
||||||
void DumpBanlist();
|
void DumpBanlist();
|
||||||
|
|
||||||
unsigned int GetReceiveFloodSize() const;
|
|
||||||
|
|
||||||
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
|
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
|
||||||
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
|
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
|
||||||
void EndMessage(CDataStream& strm);
|
void EndMessage(CDataStream& strm);
|
||||||
@ -487,6 +489,9 @@ private:
|
|||||||
std::atomic<int> nBestHeight;
|
std::atomic<int> nBestHeight;
|
||||||
CClientUIInterface* clientInterface;
|
CClientUIInterface* clientInterface;
|
||||||
|
|
||||||
|
/** flag for waking the message processor. */
|
||||||
|
bool fMsgProcWake;
|
||||||
|
|
||||||
std::condition_variable condMsgProc;
|
std::condition_variable condMsgProc;
|
||||||
std::mutex mutexMsgProc;
|
std::mutex mutexMsgProc;
|
||||||
std::atomic<bool> flagInterruptMsgProc;
|
std::atomic<bool> flagInterruptMsgProc;
|
||||||
@ -505,7 +510,6 @@ void Discover(boost::thread_group& threadGroup);
|
|||||||
void MapPort(bool fUseUPnP);
|
void MapPort(bool fUseUPnP);
|
||||||
unsigned short GetListenPort();
|
unsigned short GetListenPort();
|
||||||
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
|
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
|
||||||
size_t SocketSendData(CNode *pnode);
|
|
||||||
|
|
||||||
struct CombinerAll
|
struct CombinerAll
|
||||||
{
|
{
|
||||||
@ -666,11 +670,13 @@ public:
|
|||||||
std::deque<CSerializeData> vSendMsg;
|
std::deque<CSerializeData> vSendMsg;
|
||||||
CCriticalSection cs_vSend;
|
CCriticalSection cs_vSend;
|
||||||
|
|
||||||
|
CCriticalSection cs_vProcessMsg;
|
||||||
|
std::list<CNetMessage> vProcessMsg;
|
||||||
|
size_t nProcessQueueSize;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
std::deque<CInv> vRecvGetData;
|
||||||
std::deque<CNetMessage> vRecvMsg;
|
|
||||||
CCriticalSection cs_vRecvMsg;
|
|
||||||
uint64_t nRecvBytes;
|
uint64_t nRecvBytes;
|
||||||
int nRecvVersion;
|
std::atomic<int> nRecvVersion;
|
||||||
|
|
||||||
int64_t nLastSend;
|
int64_t nLastSend;
|
||||||
int64_t nLastRecv;
|
int64_t nLastRecv;
|
||||||
@ -708,6 +714,9 @@ public:
|
|||||||
CBloomFilter* pfilter;
|
CBloomFilter* pfilter;
|
||||||
int nRefCount;
|
int nRefCount;
|
||||||
NodeId id;
|
NodeId id;
|
||||||
|
|
||||||
|
std::atomic_bool fPauseRecv;
|
||||||
|
std::atomic_bool fPauseSend;
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
||||||
@ -770,6 +779,7 @@ private:
|
|||||||
ServiceFlags nLocalServices;
|
ServiceFlags nLocalServices;
|
||||||
int nMyStartingHeight;
|
int nMyStartingHeight;
|
||||||
int nSendVersion;
|
int nSendVersion;
|
||||||
|
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
||||||
public:
|
public:
|
||||||
|
|
||||||
NodeId GetId() const {
|
NodeId GetId() const {
|
||||||
@ -791,24 +801,15 @@ public:
|
|||||||
return nRefCount;
|
return nRefCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
unsigned int GetTotalRecvSize()
|
|
||||||
{
|
|
||||||
unsigned int total = 0;
|
|
||||||
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
|
|
||||||
total += msg.vRecv.size() + 24;
|
|
||||||
return total;
|
|
||||||
}
|
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
void SetRecvVersion(int nVersionIn)
|
void SetRecvVersion(int nVersionIn)
|
||||||
{
|
{
|
||||||
nRecvVersion = nVersionIn;
|
nRecvVersion = nVersionIn;
|
||||||
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
|
}
|
||||||
msg.SetVersion(nVersionIn);
|
int GetRecvVersion()
|
||||||
|
{
|
||||||
|
return nRecvVersion;
|
||||||
}
|
}
|
||||||
void SetSendVersion(int nVersionIn);
|
void SetSendVersion(int nVersionIn);
|
||||||
int GetSendVersion() const;
|
int GetSendVersion() const;
|
||||||
|
@ -782,15 +782,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
|
|||||||
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
||||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
|
||||||
|
|
||||||
vector<CInv> vNotFound;
|
vector<CInv> vNotFound;
|
||||||
|
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
|
||||||
while (it != pfrom->vRecvGetData.end()) {
|
while (it != pfrom->vRecvGetData.end()) {
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->fPauseSend)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
const CInv &inv = *it;
|
const CInv &inv = *it;
|
||||||
@ -1074,7 +1072,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||||||
{
|
{
|
||||||
const CChainParams& chainparams = Params();
|
const CChainParams& chainparams = Params();
|
||||||
RandAddSeedPerfmon();
|
RandAddSeedPerfmon();
|
||||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
|
||||||
|
|
||||||
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
|
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
|
||||||
|
|
||||||
@ -1416,11 +1413,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||||||
|
|
||||||
// Track requests for our stuff
|
// Track requests for our stuff
|
||||||
GetMainSignals().Inventory(inv.hash);
|
GetMainSignals().Inventory(inv.hash);
|
||||||
|
|
||||||
if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) {
|
|
||||||
Misbehaving(pfrom->GetId(), 50);
|
|
||||||
return error("send buffer size() = %u", pfrom->nSendSize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!vToFetch.empty())
|
if (!vToFetch.empty())
|
||||||
@ -2171,14 +2163,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
const CChainParams& chainparams = Params();
|
const CChainParams& chainparams = Params();
|
||||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
|
||||||
//if (fDebug)
|
|
||||||
// LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size());
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Message format
|
// Message format
|
||||||
// (4) message start
|
// (4) message start
|
||||||
@ -2187,40 +2174,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
// (4) checksum
|
// (4) checksum
|
||||||
// (x) data
|
// (x) data
|
||||||
//
|
//
|
||||||
bool fOk = true;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
if (!pfrom->vRecvGetData.empty())
|
if (!pfrom->vRecvGetData.empty())
|
||||||
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
|
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
|
||||||
|
|
||||||
|
if (pfrom->fDisconnect)
|
||||||
|
return false;
|
||||||
|
|
||||||
// this maintains the order of responses
|
// this maintains the order of responses
|
||||||
if (!pfrom->vRecvGetData.empty()) return fOk;
|
if (!pfrom->vRecvGetData.empty()) return true;
|
||||||
|
|
||||||
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
|
|
||||||
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
|
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->fPauseSend)
|
||||||
break;
|
return false;
|
||||||
|
|
||||||
// get next message
|
std::list<CNetMessage> msgs;
|
||||||
CNetMessage& msg = *it;
|
{
|
||||||
|
LOCK(pfrom->cs_vProcessMsg);
|
||||||
//if (fDebug)
|
if (pfrom->vProcessMsg.empty())
|
||||||
// LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__,
|
return false;
|
||||||
// msg.hdr.nMessageSize, msg.vRecv.size(),
|
// Just take one message
|
||||||
// msg.complete() ? "Y" : "N");
|
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||||
|
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||||
// end, if an incomplete message is found
|
pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
|
||||||
if (!msg.complete())
|
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||||
break;
|
}
|
||||||
|
CNetMessage& msg(msgs.front());
|
||||||
// at this point, any failure means we can delete the current message
|
|
||||||
it++;
|
|
||||||
|
|
||||||
|
msg.SetVersion(pfrom->GetRecvVersion());
|
||||||
// Scan for message start
|
// Scan for message start
|
||||||
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), MESSAGE_START_SIZE) != 0) {
|
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), MESSAGE_START_SIZE) != 0) {
|
||||||
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
|
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
|
||||||
fOk = false;
|
pfrom->fDisconnect = true;
|
||||||
break;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read header
|
// Read header
|
||||||
@ -2228,7 +2215,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (!hdr.IsValid(chainparams.MessageStart()))
|
if (!hdr.IsValid(chainparams.MessageStart()))
|
||||||
{
|
{
|
||||||
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
|
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
|
||||||
continue;
|
return fMoreWork;
|
||||||
}
|
}
|
||||||
string strCommand = hdr.GetCommand();
|
string strCommand = hdr.GetCommand();
|
||||||
|
|
||||||
@ -2244,7 +2231,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
SanitizeString(strCommand), nMessageSize,
|
SanitizeString(strCommand), nMessageSize,
|
||||||
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
|
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
|
||||||
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
|
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
|
||||||
continue;
|
return fMoreWork;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process message
|
// Process message
|
||||||
@ -2253,7 +2240,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
{
|
{
|
||||||
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, connman, interruptMsgProc);
|
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, connman, interruptMsgProc);
|
||||||
if (interruptMsgProc)
|
if (interruptMsgProc)
|
||||||
return true;
|
return false;
|
||||||
|
if (!pfrom->vRecvGetData.empty())
|
||||||
|
fMoreWork = true;
|
||||||
}
|
}
|
||||||
catch (const std::ios_base::failure& e)
|
catch (const std::ios_base::failure& e)
|
||||||
{
|
{
|
||||||
@ -2282,14 +2271,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (!fRet)
|
if (!fRet)
|
||||||
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
|
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
|
||||||
|
|
||||||
break;
|
return fMoreWork;
|
||||||
}
|
|
||||||
|
|
||||||
// In case the connection got shut down, its receive buffer was wiped
|
|
||||||
if (!pfrom->fDisconnect)
|
|
||||||
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
|
|
||||||
|
|
||||||
return fOk;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -45,6 +45,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
* @param[in] pto The node which we are sending messages to.
|
* @param[in] pto The node which we are sending messages to.
|
||||||
* @param[in] connman The connection manager for that node.
|
* @param[in] connman The connection manager for that node.
|
||||||
* @param[in] interrupt Interrupt condition for processing threads
|
* @param[in] interrupt Interrupt condition for processing threads
|
||||||
|
* @return True if there is more work to be done
|
||||||
*/
|
*/
|
||||||
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
|
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user