From cf2932098827c775063fe6c38e4283293af81e2f Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 15 Feb 2019 14:08:49 +0100 Subject: [PATCH 1/4] Allow to disable optimistic send in PushMessage() Profiling has shown that optimistic send causes measurable slowdowns when many messages are pushed, even if the sockets are non-blocking. Better to allow disabling of optimistic sending in such cases and let the network thread do the actual socket calls. --- src/net.cpp | 4 ++-- src/net.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 3c7a6ce9b..0a351af74 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3054,7 +3054,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend) { size_t nMessageSize = msg.data.size(); size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; @@ -3071,7 +3071,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - bool optimisticSend(pnode->vSendMsg.empty()); + bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; diff --git a/src/net.h b/src/net.h index 9516e3478..cfd5fa932 100644 --- a/src/net.h +++ b/src/net.h @@ -201,7 +201,7 @@ public: bool IsMasternodeOrDisconnectRequested(const CService& addr); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true); template bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) From acb87895f87c8317dc49089f648dfd80baed383d Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Thu, 14 Feb 2019 18:50:18 +0100 Subject: [PATCH 2/4] Implement WakeupSelect() to allow preliminary wakeup after message push This adds the reading side of a pipe to the read-set when calling select(). Writing to the writing side of the pipe then causes select() to wake up immediately. Otherwise it would wait for the timeout of 50ms, even if there is data that could possibly be sent. This is useful when many messages need are pushed with optimistic send being disabled. After all messages have been pushed, WakeSelect() can then wakeup the select() thread and force a re-check for pending data to send. This is currently only implemented for POSIX compliant systems as we assume that heavy-load daemons (like masternodes) are usually run on Linux. --- src/net.cpp | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/net.h | 7 +++++++ 2 files changed, 60 insertions(+) diff --git a/src/net.cpp b/src/net.cpp index 0a351af74..5d52874b6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1210,6 +1210,18 @@ void CConnman::ThreadSocketHandler() SOCKET hSocketMax = 0; bool have_fds = false; +#ifndef WIN32 + // We add a pipe to the read set so that the select() call can be woken up from the outside + // This is done when data is available for sending and at the same time optimistic sending was disabled + // when pushing the data. + // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to + // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually + // run on Linux and friends. + FD_SET(wakeupPipe[0], &fdsetRecv); + hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]); + have_fds = true; +#endif + BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) { FD_SET(hListenSocket.socket, &fdsetRecv); hSocketMax = std::max(hSocketMax, hListenSocket.socket); @@ -1276,6 +1288,20 @@ void CConnman::ThreadSocketHandler() return; } +#ifndef WIN32 + // drain the wakeup pipe + if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) { + LogPrint("net", "woke up select()\n"); + char buf[128]; + while (true) { + int r = read(wakeupPipe[0], buf, sizeof(buf)); + if (r <= 0) { + break; + } + } + } +#endif + // // Accept new connections // @@ -1426,6 +1452,21 @@ void CConnman::WakeMessageHandler() condMsgProc.notify_one(); } +void CConnman::WakeSelect() +{ +#ifndef WIN32 + if (wakeupPipe[1] == -1) { + return; + } + + LogPrint("net", "waking up select()\n"); + + char buf[1]; + if (write(wakeupPipe[1], buf, 1) != 1) { + LogPrint("net", "write to wakeupPipe failed\n"); + } +#endif +} @@ -2387,6 +2428,12 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c fMsgProcWake = false; } +#ifndef WIN32 + if (pipe2(wakeupPipe, O_NONBLOCK) != 0) { + wakeupPipe[0] = wakeupPipe[1] = -1; + } +#endif + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2512,6 +2559,12 @@ void CConnman::Stop() semAddnode = NULL; delete semMasternodeOutbound; semMasternodeOutbound = NULL; + +#ifndef WIN32 + if (wakeupPipe[0] != -1) close(wakeupPipe[0]); + if (wakeupPipe[1] != -1) close(wakeupPipe[1]); + wakeupPipe[0] = wakeupPipe[1] = -1; +#endif } void CConnman::DeleteNode(CNode* pnode) diff --git a/src/net.h b/src/net.h index cfd5fa932..49fc0eb1e 100644 --- a/src/net.h +++ b/src/net.h @@ -408,6 +408,8 @@ public: unsigned int GetReceiveFloodSize() const; void WakeMessageHandler(); + void WakeSelect(); + private: struct ListenSocket { SOCKET socket; @@ -525,6 +527,11 @@ private: CThreadInterrupt interruptNet; +#ifndef WIN32 + /** a pipe which is added to select() calls to wakeup before the timeout */ + int wakeupPipe[2]{-1,-1}; +#endif + std::thread threadDNSAddressSeed; std::thread threadSocketHandler; std::thread threadOpenAddedConnections; From c03480d20f08c434cd0a266ad22d61601a59d89b Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 15 Feb 2019 15:05:32 +0100 Subject: [PATCH 3/4] Disable optimistic sending when pushing sig share related messages And instead let the network thread do the actual sending. --- src/llmq/quorums_signing_shares.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 41b21bb3d..e77b20312 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -896,6 +896,8 @@ void CSigSharesManager::SendMessages() CollectSigSharesToAnnounce(sigSharesToAnnounce); } + bool didSend = false; + g_connman->ForEachNode([&](CNode* pnode) { CNetMsgMaker msgMaker(pnode->GetSendVersion()); @@ -905,7 +907,8 @@ void CSigSharesManager::SendMessages() assert(p.second.CountSet() != 0); LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", p.second.ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false); + didSend = true; } } @@ -915,7 +918,8 @@ void CSigSharesManager::SendMessages() assert(!p.second.sigShares.empty()); LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", p.second.ToInv().ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false); + didSend = true; } } @@ -925,12 +929,17 @@ void CSigSharesManager::SendMessages() assert(p.second.CountSet() != 0); LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", p.second.ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false); + didSend = true; } } return true; }); + + if (didSend) { + g_connman->WakeSelect(); + } } void CSigSharesManager::Cleanup() From d7bd0954f3dc05976d8cc0947d482e405b80c8d5 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 15 Feb 2019 16:30:42 +0100 Subject: [PATCH 4/4] Use pipe() together with fcntl instead of pipe2() pipe2 is not supported on MacOS --- src/net.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/net.cpp b/src/net.cpp index 5d52874b6..0c1862a8e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2429,8 +2429,18 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c } #ifndef WIN32 - if (pipe2(wakeupPipe, O_NONBLOCK) != 0) { + if (pipe(wakeupPipe) != 0) { wakeupPipe[0] = wakeupPipe[1] = -1; + LogPrint("net", "pipe() for wakeupPipe failed\n"); + } else { + int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); + if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } + fFlags = fcntl(wakeupPipe[1], F_GETFL, 0); + if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } } #endif