move mnb verification requests to their own thread (#1274)

* move mnb verification requests to their own thread, should remove related ui/rpc freezes

* vector -> list
This commit is contained in:
UdjinM6 2017-01-21 23:03:55 +04:00 committed by GitHub
parent 348c001445
commit d8797023c8
4 changed files with 73 additions and 13 deletions

View File

@ -102,6 +102,7 @@ CMasternodeMan::CMasternodeMan()
mAskedUsForMasternodeList(), mAskedUsForMasternodeList(),
mWeAskedForMasternodeList(), mWeAskedForMasternodeList(),
mWeAskedForMasternodeListEntry(), mWeAskedForMasternodeListEntry(),
listScheduledMnbRequestConnections(),
nLastIndexRebuildTime(0), nLastIndexRebuildTime(0),
indexMasternodes(), indexMasternodes(),
indexMasternodesOld(), indexMasternodesOld(),
@ -160,6 +161,16 @@ void CMasternodeMan::AskForMN(CNode* pnode, const CTxIn &vin)
pnode->PushMessage(NetMsgType::DSEG, vin); pnode->PushMessage(NetMsgType::DSEG, vin);
} }
void CMasternodeMan::AskForMnb(CNode* pnode, const uint256 &hash)
{
if(!pnode || hash == uint256()) return;
LogPrint("masternode", "CMasternodeMan::AskForMnb -- asking for mnb %s from addr=%s\n", hash.ToString(), pnode->addr.ToString());
std::vector<CInv> vToFetch;
vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, hash));
pnode->PushMessage(NetMsgType::GETDATA, vToFetch);
}
void CMasternodeMan::Check() void CMasternodeMan::Check()
{ {
LOCK(cs); LOCK(cs);
@ -179,7 +190,7 @@ void CMasternodeMan::CheckAndRemove()
{ {
// Need LOCK2 here to ensure consistent locking order because code below locks cs_main // Need LOCK2 here to ensure consistent locking order because code below locks cs_main
// through GetHeight() signal in ConnectNode and in CheckMnbAndUpdateMasternodeList() // in CheckMnbAndUpdateMasternodeList()
LOCK2(cs_main, cs); LOCK2(cs_main, cs);
Check(); Check();
@ -223,18 +234,9 @@ void CMasternodeMan::CheckAndRemove()
if(mWeAskedForMasternodeListEntry.count(it->vin.prevout) && mWeAskedForMasternodeListEntry[it->vin.prevout].count(vecMasternodeRanks[i].second.addr)) continue; if(mWeAskedForMasternodeListEntry.count(it->vin.prevout) && mWeAskedForMasternodeListEntry[it->vin.prevout].count(vecMasternodeRanks[i].second.addr)) continue;
// didn't ask recently, ok to ask now // didn't ask recently, ok to ask now
CService addr = vecMasternodeRanks[i].second.addr; CService addr = vecMasternodeRanks[i].second.addr;
CNode* pnode = ConnectNode(CAddress(addr), NULL, true); setRequested.insert(addr);
if(pnode) { listScheduledMnbRequestConnections.push_back(std::make_pair(addr, hash));
LogPrint("masternode", "CMasternodeMan::CheckAndRemove -- asking for mnb of %s, addr=%s\n", it->vin.prevout.ToStringShort(), addr.ToString()); fAskedForMnbRecovery = true;
setRequested.insert(addr);
// can't use AskForMN here, inv system is way too smart, request data directly instead
std::vector<CInv> vToFetch;
vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, hash));
pnode->PushMessage(NetMsgType::GETDATA, vToFetch);
fAskedForMnbRecovery = true;
} else {
LogPrint("masternode", "CMasternodeMan::CheckAndRemove -- can't connect to node to ask for mnb, addr=%s\n", addr.ToString());
}
} }
// wait for mnb recovery replies for MNB_RECOVERY_WAIT_SECONDS seconds // wait for mnb recovery replies for MNB_RECOVERY_WAIT_SECONDS seconds
mMnbRecoveryRequests[hash] = std::make_pair(GetTime() + MNB_RECOVERY_WAIT_SECONDS, setRequested); mMnbRecoveryRequests[hash] = std::make_pair(GetTime() + MNB_RECOVERY_WAIT_SECONDS, setRequested);
@ -771,6 +773,16 @@ void CMasternodeMan::ProcessMasternodeConnections()
} }
} }
std::pair<CService, uint256> CMasternodeMan::PopScheduledMnbRequestConnection()
{
LOCK(cs);
if(listScheduledMnbRequestConnections.empty()) return make_pair(CService(), uint256());
std::pair<CService, uint256> p = listScheduledMnbRequestConnections.front();
listScheduledMnbRequestConnections.pop_front();
return p;
}
void CMasternodeMan::ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv) void CMasternodeMan::ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv)
{ {
if(fLiteMode) return; // disable all Dash specific functionality if(fLiteMode) return; // disable all Dash specific functionality

View File

@ -129,6 +129,7 @@ private:
// these maps are used for masternode recovery from MASTERNODE_NEW_START_REQUIRED state // these maps are used for masternode recovery from MASTERNODE_NEW_START_REQUIRED state
std::map<uint256, std::pair< int64_t, std::set<CNetAddr> > > mMnbRecoveryRequests; std::map<uint256, std::pair< int64_t, std::set<CNetAddr> > > mMnbRecoveryRequests;
std::map<uint256, std::vector<CMasternodeBroadcast> > mMnbRecoveryGoodReplies; std::map<uint256, std::vector<CMasternodeBroadcast> > mMnbRecoveryGoodReplies;
std::list< std::pair<CService, uint256> > listScheduledMnbRequestConnections;
int64_t nLastIndexRebuildTime; int64_t nLastIndexRebuildTime;
@ -200,6 +201,7 @@ public:
/// Ask (source) node for mnb /// Ask (source) node for mnb
void AskForMN(CNode *pnode, const CTxIn &vin); void AskForMN(CNode *pnode, const CTxIn &vin);
void AskForMnb(CNode *pnode, const uint256 &hash);
/// Check all Masternodes /// Check all Masternodes
void Check(); void Check();
@ -295,6 +297,7 @@ public:
CMasternode* GetMasternodeByRank(int nRank, int nBlockHeight, int nMinProtocol=0, bool fOnlyActive=true); CMasternode* GetMasternodeByRank(int nRank, int nBlockHeight, int nMinProtocol=0, bool fOnlyActive=true);
void ProcessMasternodeConnections(); void ProcessMasternodeConnections();
std::pair<CService, uint256> PopScheduledMnbRequestConnection();
void ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv); void ProcessMessage(CNode* pfrom, std::string& strCommand, CDataStream& vRecv);

View File

@ -24,6 +24,7 @@
#include "darksend.h" #include "darksend.h"
#include "instantx.h" #include "instantx.h"
#include "masternodeman.h"
#ifdef WIN32 #ifdef WIN32
#include <string.h> #include <string.h>
@ -65,6 +66,7 @@ using namespace std;
namespace { namespace {
const int MAX_OUTBOUND_CONNECTIONS = 8; const int MAX_OUTBOUND_CONNECTIONS = 8;
const int MAX_OUTBOUND_MASTERNODE_CONNECTIONS = 20;
struct ListenSocket { struct ListenSocket {
SOCKET socket; SOCKET socket;
@ -111,6 +113,7 @@ NodeId nLastNodeId = 0;
CCriticalSection cs_nLastNodeId; CCriticalSection cs_nLastNodeId;
static CSemaphore *semOutbound = NULL; static CSemaphore *semOutbound = NULL;
static CSemaphore *semMasternodeOutbound = NULL;
boost::condition_variable messageHandlerCondition; boost::condition_variable messageHandlerCondition;
// Signals for message handling // Signals for message handling
@ -1053,6 +1056,7 @@ void ThreadSocketHandler()
// release outbound grant (if any) // release outbound grant (if any)
pnode->grantOutbound.Release(); pnode->grantOutbound.Release();
pnode->grantMasternodeOutbound.Release();
// close socket and cleanup // close socket and cleanup
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
@ -1690,6 +1694,32 @@ void ThreadOpenAddedConnections()
} }
} }
void ThreadMnbRequestConnections()
{
// Connecting to specific addresses, no masternode connections available
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
return;
int nTick = 0;
while (true)
{
MilliSleep(1000);
nTick++;
CSemaphoreGrant grant(*semMasternodeOutbound);
boost::this_thread::interruption_point();
std::pair<CService, uint256> p = mnodeman.PopScheduledMnbRequestConnection();
if(p.first == CService()) continue;
CNode* pnode = ConnectNode(CAddress(p.first), NULL, true);
if(pnode) {
grant.MoveTo(pnode->grantMasternodeOutbound);
if(p.second != uint256())
mnodeman.AskForMnb(pnode, p.second);
}
}
}
// if successful, this moves the passed grant to the constructed node // if successful, this moves the passed grant to the constructed node
bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot) bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot)
{ {
@ -1967,6 +1997,11 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
semOutbound = new CSemaphore(nMaxOutbound); semOutbound = new CSemaphore(nMaxOutbound);
} }
if (semMasternodeOutbound == NULL) {
// initialize semaphore
semMasternodeOutbound = new CSemaphore(MAX_OUTBOUND_MASTERNODE_CONNECTIONS);
}
if (pnodeLocalHost == NULL) if (pnodeLocalHost == NULL)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices)); pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService("127.0.0.1", 0), nLocalServices));
@ -1993,6 +2028,9 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
// Initiate outbound connections // Initiate outbound connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections)); threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
// Initiate masternode connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "mnbcon", &ThreadMnbRequestConnections));
// Process messages // Process messages
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler)); threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
@ -2008,6 +2046,10 @@ bool StopNode()
for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++) for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
semOutbound->post(); semOutbound->post();
if (semMasternodeOutbound)
for (int i=0; i<MAX_OUTBOUND_MASTERNODE_CONNECTIONS; i++)
semMasternodeOutbound->post();
if (fAddressesInitialized) if (fAddressesInitialized)
{ {
DumpData(); DumpData();
@ -2043,6 +2085,8 @@ public:
vhListenSocket.clear(); vhListenSocket.clear();
delete semOutbound; delete semOutbound;
semOutbound = NULL; semOutbound = NULL;
delete semMasternodeOutbound;
semMasternodeOutbound = NULL;
delete pnodeLocalHost; delete pnodeLocalHost;
pnodeLocalHost = NULL; pnodeLocalHost = NULL;

View File

@ -364,6 +364,7 @@ public:
// If 'true' this node will be disconnected on CMasternodeMan::ProcessMasternodeConnections() // If 'true' this node will be disconnected on CMasternodeMan::ProcessMasternodeConnections()
bool fMasternode; bool fMasternode;
CSemaphoreGrant grantOutbound; CSemaphoreGrant grantOutbound;
CSemaphoreGrant grantMasternodeOutbound;
CCriticalSection cs_filter; CCriticalSection cs_filter;
CBloomFilter* pfilter; CBloomFilter* pfilter;
int nRefCount; int nRefCount;