Merge pull request #2704 from codablock/pr_llmq_optimizations1

Optimize LLMQs sending of sig shares
This commit is contained in:
Alexander Block 2019-02-16 15:49:19 +01:00 committed by GitHub
commit 01940616f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 6 deletions

View File

@ -896,6 +896,8 @@ void CSigSharesManager::SendMessages()
CollectSigSharesToAnnounce(sigSharesToAnnounce); CollectSigSharesToAnnounce(sigSharesToAnnounce);
} }
bool didSend = false;
g_connman->ForEachNode([&](CNode* pnode) { g_connman->ForEachNode([&](CNode* pnode) {
CNetMsgMaker msgMaker(pnode->GetSendVersion()); CNetMsgMaker msgMaker(pnode->GetSendVersion());
@ -905,7 +907,8 @@ void CSigSharesManager::SendMessages()
assert(p.second.CountSet() != 0); assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n",
p.second.ToString(), pnode->id); p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false);
didSend = true;
} }
} }
@ -915,7 +918,8 @@ void CSigSharesManager::SendMessages()
assert(!p.second.sigShares.empty()); assert(!p.second.sigShares.empty());
LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n",
p.second.ToInv().ToString(), pnode->id); p.second.ToInv().ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false);
didSend = true;
} }
} }
@ -925,12 +929,17 @@ void CSigSharesManager::SendMessages()
assert(p.second.CountSet() != 0); assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n",
p.second.ToString(), pnode->id); p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false);
didSend = true;
} }
} }
return true; return true;
}); });
if (didSend) {
g_connman->WakeSelect();
}
} }
void CSigSharesManager::Cleanup() void CSigSharesManager::Cleanup()

View File

@ -1210,6 +1210,18 @@ void CConnman::ThreadSocketHandler()
SOCKET hSocketMax = 0; SOCKET hSocketMax = 0;
bool have_fds = false; bool have_fds = false;
#ifndef WIN32
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is available for sending and at the same time optimistic sending was disabled
// when pushing the data.
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
FD_SET(wakeupPipe[0], &fdsetRecv);
hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]);
have_fds = true;
#endif
BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) { BOOST_FOREACH(const ListenSocket& hListenSocket, vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv); FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket); hSocketMax = std::max(hSocketMax, hListenSocket.socket);
@ -1276,6 +1288,20 @@ void CConnman::ThreadSocketHandler()
return; return;
} }
#ifndef WIN32
// drain the wakeup pipe
if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) {
LogPrint("net", "woke up select()\n");
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif
// //
// Accept new connections // Accept new connections
// //
@ -1426,6 +1452,21 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one(); condMsgProc.notify_one();
} }
void CConnman::WakeSelect()
{
#ifndef WIN32
if (wakeupPipe[1] == -1) {
return;
}
LogPrint("net", "waking up select()\n");
char buf[1];
if (write(wakeupPipe[1], buf, 1) != 1) {
LogPrint("net", "write to wakeupPipe failed\n");
}
#endif
}
@ -2387,6 +2428,22 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
fMsgProcWake = false; fMsgProcWake = false;
} }
#ifndef WIN32
if (pipe(wakeupPipe) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
LogPrint("net", "pipe() for wakeupPipe failed\n");
} else {
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0);
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint("net", "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
}
#endif
// Send and receive from sockets, accept connections // Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@ -2512,6 +2569,12 @@ void CConnman::Stop()
semAddnode = NULL; semAddnode = NULL;
delete semMasternodeOutbound; delete semMasternodeOutbound;
semMasternodeOutbound = NULL; semMasternodeOutbound = NULL;
#ifndef WIN32
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
} }
void CConnman::DeleteNode(CNode* pnode) void CConnman::DeleteNode(CNode* pnode)
@ -3054,7 +3117,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
} }
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend)
{ {
size_t nMessageSize = msg.data.size(); size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE; size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
@ -3071,7 +3134,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0; size_t nBytesSent = 0;
{ {
LOCK(pnode->cs_vSend); LOCK(pnode->cs_vSend);
bool optimisticSend(pnode->vSendMsg.empty()); bool optimisticSend(allowOptimisticSend && pnode->vSendMsg.empty());
//log total amount of bytes per command //log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;

View File

@ -201,7 +201,7 @@ public:
bool IsMasternodeOrDisconnectRequested(const CService& addr); bool IsMasternodeOrDisconnectRequested(const CService& addr);
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); void PushMessage(CNode* pnode, CSerializedNetMsg&& msg, bool allowOptimisticSend = true);
template<typename Condition, typename Callable> template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) bool ForEachNodeContinueIf(const Condition& cond, Callable&& func)
@ -408,6 +408,8 @@ public:
unsigned int GetReceiveFloodSize() const; unsigned int GetReceiveFloodSize() const;
void WakeMessageHandler(); void WakeMessageHandler();
void WakeSelect();
private: private:
struct ListenSocket { struct ListenSocket {
SOCKET socket; SOCKET socket;
@ -525,6 +527,11 @@ private:
CThreadInterrupt interruptNet; CThreadInterrupt interruptNet;
#ifndef WIN32
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif
std::thread threadDNSAddressSeed; std::thread threadDNSAddressSeed;
std::thread threadSocketHandler; std::thread threadSocketHandler;
std::thread threadOpenAddedConnections; std::thread threadOpenAddedConnections;