diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 41b21bb3d3..e77b203122 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -896,6 +896,8 @@ void CSigSharesManager::SendMessages() CollectSigSharesToAnnounce(sigSharesToAnnounce); } + bool didSend = false; + g_connman->ForEachNode([&](CNode* pnode) { CNetMsgMaker msgMaker(pnode->GetSendVersion()); @@ -905,7 +907,8 @@ void CSigSharesManager::SendMessages() assert(p.second.CountSet() != 0); LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", p.second.ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false); + didSend = true; } } @@ -915,7 +918,8 @@ void CSigSharesManager::SendMessages() assert(!p.second.sigShares.empty()); LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", p.second.ToInv().ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false); + didSend = true; } } @@ -925,12 +929,17 @@ void CSigSharesManager::SendMessages() assert(p.second.CountSet() != 0); LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", p.second.ToString(), pnode->id); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false); + didSend = true; } } return true; }); + + if (didSend) { + g_connman->WakeSelect(); + } } void CSigSharesManager::Cleanup() diff --git a/src/net.cpp b/src/net.cpp index 3c7a6ce9b2..0c1862a8e3 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1210,6 +1210,18 @@ void CConnman::ThreadSocketHandler() SOCKET hSocketMax = 0; bool have_fds = false; +#ifndef WIN32 + // We add a pipe to the read set so that the select() call can be woken up from the outside + // This is done when data is available for sending and at the same time optimistic sending was disabled + // when pushing the data. + // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to + // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually + // run on Linux and friends. + FD_SET(wakeupPipe[0], &fdsetRecv); + hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]); + have_fds = true; +#endif + BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) { FD_SET(hListenSocket.socket, &fdsetRecv); hSocketMax = std::max(hSocketMax, hListenSocket.socket); @@ -1276,6 +1288,20 @@ void CConnman::ThreadSocketHandler() return; } +#ifndef WIN32 + // drain the wakeup pipe + if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) { + LogPrint("net", "woke up select()\n"); + char buf[128]; + while (true) { + int r = read(wakeupPipe[0], buf, sizeof(buf)); + if (r <= 0) { + break; + } + } + } +#endif + // // Accept new connections // @@ -1426,6 +1452,21 @@ void CConnman::WakeMessageHandler() condMsgProc.notify_one(); } +void CConnman::WakeSelect() +{ +#ifndef WIN32 + if (wakeupPipe[1] == -1) { + return; + } + + LogPrint("net", "waking up select()\n"); + + char buf[1]; + if (write(wakeupPipe[1], buf, 1) != 1) { + LogPrint("net", "write to wakeupPipe failed\n"); + } +#endif +} @@ -2387,6 +2428,22 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c fMsgProcWake = false; } +#ifndef WIN32 + if (pipe(wakeupPipe) != 0) { + wakeupPipe[0] = wakeupPipe[1] = -1; + LogPrint("net", "pipe() for wakeupPipe failed\n"); + } else { + int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); + if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } + fFlags = fcntl(wakeupPipe[1], F_GETFL, 0); + if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { + LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n"); + } + } +#endif + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2512,6 +2569,12 @@ void CConnman::Stop() semAddnode = NULL; delete semMasternodeOutbound; semMasternodeOutbound = NULL; + +#ifndef WIN32 + if (wakeupPipe[0] != -1) close(wakeupPipe[0]); + if (wakeupPipe[1] != -1) close(wakeupPipe[1]); + wakeupPipe[0] = wakeupPipe[1] = -1; +#endif } void CConnman::DeleteNode(CNode* pnode) @@ -3054,7 +3117,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } -void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) +void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend) { size_t nMessageSize = msg.data.size(); size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; @@ -3071,7 +3134,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - bool optimisticSend(pnode->vSendMsg.empty()); + bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty()); //log total amount of bytes per command pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; diff --git a/src/net.h b/src/net.h index 9516e3478c..49fc0eb1ed 100644 --- a/src/net.h +++ b/src/net.h @@ -201,7 +201,7 @@ public: bool IsMasternodeOrDisconnectRequested(const CService& addr); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true); template bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) @@ -408,6 +408,8 @@ public: unsigned int GetReceiveFloodSize() const; void WakeMessageHandler(); + void WakeSelect(); + private: struct ListenSocket { SOCKET socket; @@ -525,6 +527,11 @@ private: CThreadInterrupt interruptNet; +#ifndef WIN32 + /** a pipe which is added to select() calls to wakeup before the timeout */ + int wakeupPipe[2]{-1,-1}; +#endif + std::thread threadDNSAddressSeed; std::thread threadSocketHandler; std::thread threadOpenAddedConnections;