From d8797023c821b68982b01e17490ce9b149bcba80 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Sat, 21 Jan 2017 23:03:55 +0400 Subject: [PATCH] move mnb verification requests to their own thread (#1274) * move mnb verification requests to their own thread, should remove related ui/rpc freezes * vector -> list --- src/masternodeman.cpp | 38 ++++++++++++++++++++++++------------- src/masternodeman.h | 3 +++ src/net.cpp | 44 +++++++++++++++++++++++++++++++++++++++++++ src/net.h | 1 + 4 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/masternodeman.cpp b/src/masternodeman.cpp index a65ee7bc0..ed13a8184 100644 --- a/src/masternodeman.cpp +++ b/src/masternodeman.cpp @@ -102,6 +102,7 @@ CMasternodeMan::CMasternodeMan() mAskedUsForMasternodeList(), mWeAskedForMasternodeList(), mWeAskedForMasternodeListEntry(), + listScheduledMnbRequestConnections(), nLastIndexRebuildTime(0), indexMasternodes(), indexMasternodesOld(), @@ -160,6 +161,16 @@ void CMasternodeMan::AskForMN(CNode* pnode, const CTxIn &vin) pnode->PushMessage(NetMsgType::DSEG, vin); } +void CMasternodeMan::AskForMnb(CNode* pnode, const uint256 &hash) +{ + if(!pnode || hash == uint256()) return; + + LogPrint("masternode", "CMasternodeMan::AskForMnb -- asking for mnb %s from addr=%s\n", hash.ToString(), pnode->addr.ToString()); + std::vector vToFetch; + vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, hash)); + pnode->PushMessage(NetMsgType::GETDATA, vToFetch); +} + void CMasternodeMan::Check() { LOCK(cs); @@ -179,7 +190,7 @@ void CMasternodeMan::CheckAndRemove() { // Need LOCK2 here to ensure consistent locking order because code below locks cs_main - // through GetHeight() signal in ConnectNode and in CheckMnbAndUpdateMasternodeList() + // in CheckMnbAndUpdateMasternodeList() LOCK2(cs_main, cs); Check(); @@ -223,18 +234,9 @@ void CMasternodeMan::CheckAndRemove() if(mWeAskedForMasternodeListEntry.count(it->vin.prevout) && mWeAskedForMasternodeListEntry[it->vin.prevout].count(vecMasternodeRanks[i].second.addr)) continue; // didn't ask recently, ok to ask now CService addr = vecMasternodeRanks[i].second.addr; - CNode* pnode = ConnectNode(CAddress(addr), NULL, true); - if(pnode) { - LogPrint("masternode", "CMasternodeMan::CheckAndRemove -- asking for mnb of %s, addr=%s\n", it->vin.prevout.ToStringShort(), addr.ToString()); - setRequested.insert(addr); - // can't use AskForMN here, inv system is way too smart, request data directly instead - std::vector vToFetch; - vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, hash)); - pnode->PushMessage(NetMsgType::GETDATA, vToFetch); - fAskedForMnbRecovery = true; - } else { - LogPrint("masternode", "CMasternodeMan::CheckAndRemove -- can't connect to node to ask for mnb, addr=%s\n", addr.ToString()); - } + setRequested.insert(addr); + listScheduledMnbRequestConnections.push_back(std::make_pair(addr, hash)); + fAskedForMnbRecovery = true; } // wait for mnb recovery replies for MNB_RECOVERY_WAIT_SECONDS seconds mMnbRecoveryRequests[hash] = std::make_pair(GetTime() + MNB_RECOVERY_WAIT_SECONDS, setRequested); @@ -771,6 +773,16 @@ void CMasternodeMan::ProcessMasternodeConnections() } } +std::pair CMasternodeMan::PopScheduledMnbRequestConnection() +{ + LOCK(cs); + if(listScheduledMnbRequestConnections.empty()) return make_pair(CService(), uint256()); + std::pair p = listScheduledMnbRequestConnections.front(); + listScheduledMnbRequestConnections.pop_front(); + return p; +} + + void CMasternodeMan::ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv) { if(fLiteMode) return; // disable all Dash specific functionality diff --git a/src/masternodeman.h b/src/masternodeman.h index ecb4bc976..2d5d91ba3 100644 --- a/src/masternodeman.h +++ b/src/masternodeman.h @@ -129,6 +129,7 @@ private: // these maps are used for masternode recovery from MASTERNODE_NEW_START_REQUIRED state std::map > > mMnbRecoveryRequests; std::map > mMnbRecoveryGoodReplies; + std::list< std::pair > listScheduledMnbRequestConnections; int64_t nLastIndexRebuildTime; @@ -200,6 +201,7 @@ public: /// Ask (source) node for mnb void AskForMN(CNode *pnode, const CTxIn &vin); + void AskForMnb(CNode *pnode, const uint256 &hash); /// Check all Masternodes void Check(); @@ -295,6 +297,7 @@ public: CMasternode* GetMasternodeByRank(int nRank, int nBlockHeight, int nMinProtocol=0, bool fOnlyActive=true); void ProcessMasternodeConnections(); + std::pair PopScheduledMnbRequestConnection(); void ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv); diff --git a/src/net.cpp b/src/net.cpp index 6ab083322..6e78ff37e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -24,6 +24,7 @@ #include "darksend.h" #include "instantx.h" +#include "masternodeman.h" #ifdef WIN32 #include @@ -65,6 +66,7 @@ using namespace std; namespace { const int MAX_OUTBOUND_CONNECTIONS = 8; + const int MAX_OUTBOUND_MASTERNODE_CONNECTIONS = 20; struct ListenSocket { SOCKET socket; @@ -111,6 +113,7 @@ NodeId nLastNodeId = 0; CCriticalSection cs_nLastNodeId; static CSemaphore *semOutbound = NULL; +static CSemaphore *semMasternodeOutbound = NULL; boost::condition_variable messageHandlerCondition; // Signals for message handling @@ -1053,6 +1056,7 @@ void ThreadSocketHandler() // release outbound grant (if any) pnode->grantOutbound.Release(); + pnode->grantMasternodeOutbound.Release(); // close socket and cleanup pnode->CloseSocketDisconnect(); @@ -1690,6 +1694,32 @@ void ThreadOpenAddedConnections() } } +void ThreadMnbRequestConnections() +{ + // Connecting to specific addresses, no masternode connections available + if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) + return; + + int nTick = 0; + while (true) + { + MilliSleep(1000); + nTick++; + + CSemaphoreGrant grant(*semMasternodeOutbound); + boost::this_thread::interruption_point(); + + std::pair p = mnodeman.PopScheduledMnbRequestConnection(); + if(p.first == CService()) continue; + CNode* pnode = ConnectNode(CAddress(p.first), NULL, true); + if(pnode) { + grant.MoveTo(pnode->grantMasternodeOutbound); + if(p.second != uint256()) + mnodeman.AskForMnb(pnode, p.second); + } + } +} + // if successful, this moves the passed grant to the constructed node bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot) { @@ -1967,6 +1997,11 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler) semOutbound = new CSemaphore(nMaxOutbound); } + if (semMasternodeOutbound == NULL) { + // initialize semaphore + semMasternodeOutbound = new CSemaphore(MAX_OUTBOUND_MASTERNODE_CONNECTIONS); + } + if (pnodeLocalHost == NULL) pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices)); @@ -1993,6 +2028,9 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler) // Initiate outbound connections threadGroup.create_thread(boost::bind(&TraceThread, "opencon", &ThreadOpenConnections)); + // Initiate masternode connections + threadGroup.create_thread(boost::bind(&TraceThread, "mnbcon", &ThreadMnbRequestConnections)); + // Process messages threadGroup.create_thread(boost::bind(&TraceThread, "msghand", &ThreadMessageHandler)); @@ -2008,6 +2046,10 @@ bool StopNode() for (int i=0; ipost(); + if (semMasternodeOutbound) + for (int i=0; ipost(); + if (fAddressesInitialized) { DumpData(); @@ -2043,6 +2085,8 @@ public: vhListenSocket.clear(); delete semOutbound; semOutbound = NULL; + delete semMasternodeOutbound; + semMasternodeOutbound = NULL; delete pnodeLocalHost; pnodeLocalHost = NULL; diff --git a/src/net.h b/src/net.h index 27f578507..39b76d38f 100644 --- a/src/net.h +++ b/src/net.h @@ -364,6 +364,7 @@ public: // If 'true' this node will be disconnected on CMasternodeMan::ProcessMasternodeConnections() bool fMasternode; CSemaphoreGrant grantOutbound; + CSemaphoreGrant grantMasternodeOutbound; CCriticalSection cs_filter; CBloomFilter* pfilter; int nRefCount;