Merge pull request #3420 from codablock/pr_speedups5

Avoid calling SendMessages (and others) for all nodes all the time
This commit is contained in:
Alexander Block 2020-04-17 07:34:23 +02:00 committed by GitHub
commit 1e30054b9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 10 deletions

View File

@ -1660,15 +1660,22 @@ void CConnman::SocketHandler()
} }
} }
InactivityCheck(pnode);
} }
ReleaseNodeVector(vNodesCopy); ReleaseNodeVector(vNodesCopy);
} }
void CConnman::ThreadSocketHandler() void CConnman::ThreadSocketHandler()
{ {
int64_t nLastCleanupNodes = 0;
while (!interruptNet) while (!interruptNet)
{ {
if (GetTimeMillis() - nLastCleanupNodes > 1000) {
ForEachNode(AllNodes, [&](CNode* pnode) {
InactivityCheck(pnode);
});
nLastCleanupNodes = GetTimeMillis();
}
DisconnectNodes(); DisconnectNodes();
NotifyNumConnectionsChanged(); NotifyNumConnectionsChanged();
SocketHandler(); SocketHandler();
@ -2436,11 +2443,20 @@ void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, bool probe)
void CConnman::ThreadMessageHandler() void CConnman::ThreadMessageHandler()
{ {
int64_t nLastForceSendMessages = 0;
while (!flagInterruptMsgProc) while (!flagInterruptMsgProc)
{ {
std::vector<CNode*> vNodesCopy = CopyNodeVector(); std::vector<CNode*> vNodesCopy = CopyNodeVector();
int64_t nNow = GetTimeMillis();
bool fMoreWork = false; bool fMoreWork = false;
bool fForceSendMessages = false;
if (nNow - nLastForceSendMessages >= 100) {
fForceSendMessages = true;
nLastForceSendMessages = nNow;
}
for (CNode* pnode : vNodesCopy) for (CNode* pnode : vNodesCopy)
{ {
@ -2448,12 +2464,13 @@ void CConnman::ThreadMessageHandler()
continue; continue;
// Receive messages // Receive messages
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); bool fDidWork = false;
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc, fDidWork);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc) if (flagInterruptMsgProc)
return; return;
// Send messages // Send messages
{ if (fDidWork || fForceSendMessages) {
LOCK(pnode->cs_sendProcessing); LOCK(pnode->cs_sendProcessing);
m_msgproc->SendMessages(pnode, flagInterruptMsgProc); m_msgproc->SendMessages(pnode, flagInterruptMsgProc);
} }

View File

@ -633,7 +633,7 @@ struct CombinerAll
class NetEventsInterface class NetEventsInterface
{ {
public: public:
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0; virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt, bool &fRetDidWork) = 0;
virtual bool SendMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0; virtual bool SendMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
virtual void InitializeNode(CNode* pnode) = 0; virtual void InitializeNode(CNode* pnode) = 0;
virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0; virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0;

View File

@ -3529,7 +3529,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman)
return false; return false;
} }
bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc) bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc, bool &fRetDidWork)
{ {
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();
// //
@ -3541,13 +3541,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
// (x) data // (x) data
// //
bool fMoreWork = false; bool fMoreWork = false;
fRetDidWork = false;
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty()) {
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
fRetDidWork = true;
}
if (!pfrom->orphan_work_set.empty()) { if (!pfrom->orphan_work_set.empty()) {
LOCK2(cs_main, g_cs_orphans); LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set); ProcessOrphanTx(connman, pfrom->orphan_work_set);
fRetDidWork = true;
} }
if (pfrom->fDisconnect) if (pfrom->fDisconnect)
@ -3571,6 +3575,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman->GetReceiveFloodSize(); pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman->GetReceiveFloodSize();
fMoreWork = !pfrom->vProcessMsg.empty(); fMoreWork = !pfrom->vProcessMsg.empty();
fRetDidWork = true;
} }
CNetMessage& msg(msgs.front()); CNetMessage& msg(msgs.front());
@ -3917,13 +3922,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
// transactions become unconfirmed and spams other nodes. // transactions become unconfirmed and spams other nodes.
if (!fReindex && !fImporting && !IsInitialBlockDownload()) if (!fReindex && !fImporting && !IsInitialBlockDownload())
{ {
GetMainSignals().Broadcast(nTimeBestReceived, connman); static int64_t nLastBroadcastTime = 0;
// HACK: Call this only once every few seconds. SendMessages is called once per peer, which makes this signal very expensive
// The proper solution would be to move this out of here, but this is not worth the effort right now as bitcoin#15632 will later do this.
// Luckily, the Broadcast signal is not used for anything else then CWallet::ResendWalletTransactionsBefore.
if (nNow - nLastBroadcastTime >= 5000000) {
GetMainSignals().Broadcast(nTimeBestReceived, connman);
nLastBroadcastTime = nNow;
}
} }
// //
// Try sending block announcements via headers // Try sending block announcements via headers
// //
{ if (!pto->fMasternode) {
// If we have less than MAX_BLOCKS_TO_ANNOUNCE in our // If we have less than MAX_BLOCKS_TO_ANNOUNCE in our
// list of block hashes we're relaying, and our peer wants // list of block hashes we're relaying, and our peer wants
// headers announcements, then find the first header // headers announcements, then find the first header
@ -4297,7 +4309,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
// Message: getdata (blocks) // Message: getdata (blocks)
// //
std::vector<CInv> vGetData; std::vector<CInv> vGetData;
if (!pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { if (!pto->fClient && !pto->fMasternode && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
std::vector<const CBlockIndex*> vToDownload; std::vector<const CBlockIndex*> vToDownload;
NodeId staller = -1; NodeId staller = -1;
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams); FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams);

View File

@ -53,7 +53,7 @@ public:
void InitializeNode(CNode* pnode) override; void InitializeNode(CNode* pnode) override;
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override; void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override;
/** Process protocol messages received from a given node */ /** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override; bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt, bool &fRetDidWork) override;
/** /**
* Send queued protocol messages to be sent to a give node. * Send queued protocol messages to be sent to a give node.
* *