Change governance sync process (#1265)

* On gov sync first sync objs, then ask for votes on per-obj basis from different peers.

This should help to sync obj list initially and split the load among many peers. Also adds ability to catch up votes later after the sync.

* ask for all objects, do this in cycles

* Fix Sync() code, better readability

* ask multiple nodes at once when possible, perf boost for large numper of objs

* Addressed comments: pass reference, more peer version check
This commit is contained in:
UdjinM6 2017-01-18 00:02:38 +04:00 committed by GitHub
parent f995a264e2
commit f1ee9d9c71
4 changed files with 105 additions and 23 deletions

View File

@ -321,6 +321,12 @@ public:
// AFTER DESERIALIZATION OCCURS, CACHED VARIABLES MUST BE CALCULATED MANUALLY // AFTER DESERIALIZATION OCCURS, CACHED VARIABLES MUST BE CALCULATED MANUALLY
} }
CGovernanceObject& operator=(CGovernanceObject from)
{
swap(*this, from);
return *this;
}
private: private:
// FUNCTIONS FOR DEALING WITH DATA STRING // FUNCTIONS FOR DEALING WITH DATA STRING
void LoadData(); void LoadData();

View File

@ -141,6 +141,11 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, std::string& strCommand, C
return; return;
} }
if(!masternodeSync.IsMasternodeListSynced()) {
LogPrint("gobject", "MNGOVERNANCEOBJECT -- masternode list not synced\n");
return;
}
CGovernanceObject govobj; CGovernanceObject govobj;
vRecv >> govobj; vRecv >> govobj;
@ -654,28 +659,15 @@ void CGovernanceManager::Sync(CNode* pfrom, uint256 nProp)
{ {
LOCK2(cs_main, cs); LOCK2(cs_main, cs);
fRateChecksEnabled = false;
if(nProp == uint256()) {
// all valid objects, no votes
for(object_m_it it = mapObjects.begin(); it != mapObjects.end(); ++it) { for(object_m_it it = mapObjects.begin(); it != mapObjects.end(); ++it) {
uint256 h = it->first;
CGovernanceObject& govobj = it->second; CGovernanceObject& govobj = it->second;
std::string strHash = it->first.ToString();
if((nProp != uint256()) && (h != nProp)) {
continue;
}
std::string strHash = h.ToString();
LogPrint("gobject", "CGovernanceManager::Sync -- attempting to sync govobj: %s, peer=%d\n", strHash, pfrom->id); LogPrint("gobject", "CGovernanceManager::Sync -- attempting to sync govobj: %s, peer=%d\n", strHash, pfrom->id);
std::string strError;
bool fIsValid = govobj.IsValidLocally(strError, true);
if(!fIsValid) {
LogPrintf("CGovernanceManager::Sync -- not syncing invalid govobj: %s, strError = %s, fCachedValid = %d, peer=%d\n",
strHash, strError, govobj.IsSetCachedValid(), pfrom->id);
continue;
}
if(!govobj.IsSetCachedValid()) { if(!govobj.IsSetCachedValid()) {
LogPrintf("CGovernanceManager::Sync -- invalid flag cached, not syncing govobj: %s, fCachedValid = %d, peer=%d\n", LogPrintf("CGovernanceManager::Sync -- invalid flag cached, not syncing govobj: %s, fCachedValid = %d, peer=%d\n",
strHash, govobj.IsSetCachedValid(), pfrom->id); strHash, govobj.IsSetCachedValid(), pfrom->id);
@ -684,7 +676,30 @@ void CGovernanceManager::Sync(CNode* pfrom, uint256 nProp)
// Push the inventory budget proposal message over to the other client // Push the inventory budget proposal message over to the other client
LogPrint("gobject", "CGovernanceManager::Sync -- syncing govobj: %s, peer=%d\n", strHash, pfrom->id); LogPrint("gobject", "CGovernanceManager::Sync -- syncing govobj: %s, peer=%d\n", strHash, pfrom->id);
pfrom->PushInventory(CInv(MSG_GOVERNANCE_OBJECT, h)); pfrom->PushInventory(CInv(MSG_GOVERNANCE_OBJECT, it->first));
++nObjCount;
}
} else {
// single valid object and its valid votes
object_m_it it = mapObjects.find(nProp);
if(it == mapObjects.end()) {
LogPrint("gobject", "CGovernanceManager::Sync -- no matching object for hash %s, peer=%d\n", nProp.ToString(), pfrom->id);
return;
}
CGovernanceObject& govobj = it->second;
std::string strHash = it->first.ToString();
LogPrint("gobject", "CGovernanceManager::Sync -- attempting to sync govobj: %s, peer=%d\n", strHash, pfrom->id);
if(!govobj.IsSetCachedValid()) {
LogPrintf("CGovernanceManager::Sync -- invalid flag cached, not syncing govobj: %s, fCachedValid = %d, peer=%d\n",
strHash, govobj.IsSetCachedValid(), pfrom->id);
return;
}
// Push the inventory budget proposal message over to the other client
LogPrint("gobject", "CGovernanceManager::Sync -- syncing govobj: %s, peer=%d\n", strHash, pfrom->id);
pfrom->PushInventory(CInv(MSG_GOVERNANCE_OBJECT, it->first));
++nObjCount; ++nObjCount;
std::vector<CGovernanceVote> vecVotes = govobj.GetVoteFile().GetVotes(); std::vector<CGovernanceVote> vecVotes = govobj.GetVoteFile().GetVotes();
@ -696,7 +711,6 @@ void CGovernanceManager::Sync(CNode* pfrom, uint256 nProp)
++nVoteCount; ++nVoteCount;
} }
} }
fRateChecksEnabled = true;
} }
pfrom->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount); pfrom->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount);
@ -920,6 +934,54 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH
pfrom->PushMessage(NetMsgType::MNGOVERNANCESYNC, nHash); pfrom->PushMessage(NetMsgType::MNGOVERNANCESYNC, nHash);
} }
void CGovernanceManager::RequestGovernanceObjectVotes(CNode* pnode)
{
if(pnode->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) return;
std::vector<CNode*> vNodesCopy;
vNodesCopy.push_back(pnode);
RequestGovernanceObjectVotes(vNodesCopy);
}
void CGovernanceManager::RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy)
{
static std::map<uint256, int64_t> mapAskedRecently;
LOCK2(cs_main, cs);
std::vector<CGovernanceObject*> vpGovObjsTmp;
std::vector<CGovernanceObject*> vpGovObjsTriggersTmp;
int64_t nNow = GetTime();
for(object_m_it it = mapObjects.begin(); it != mapObjects.end(); ++it) {
if(mapAskedRecently.count(it->first) && mapAskedRecently[it->first] > nNow) continue;
if(it->second.nObjectType == GOVERNANCE_OBJECT_TRIGGER)
vpGovObjsTriggersTmp.push_back(&(it->second));
else
vpGovObjsTmp.push_back(&(it->second));
}
BOOST_FOREACH(CNode* pnode, vNodesCopy) {
// only use reqular peers, don't try to ask from temporary nodes we connected to -
// they stay connected for a short period of time and it's possible that we won't get everything we should
if(pnode->fMasternode) continue;
// only use up to date peers
if(pnode->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) continue;
// stop early to prevent setAskFor overflow
if(pnode->setAskFor.size() > SETASKFOR_MAX_SZ/2) continue;
uint256 nHashGovobj;
// ask for triggers first
if(vpGovObjsTriggersTmp.size()) {
int r = GetRandInt(vpGovObjsTriggersTmp.size());
nHashGovobj = vpGovObjsTriggersTmp[r]->GetHash();
vpGovObjsTriggersTmp.erase(vpGovObjsTriggersTmp.begin() + r);
} else {
if(vpGovObjsTmp.empty()) return;
int r = GetRandInt(vpGovObjsTmp.size());
nHashGovobj = vpGovObjsTmp[r]->GetHash();
vpGovObjsTmp.erase(vpGovObjsTmp.begin() + r);
}
LogPrintf("CGovernanceManager::RequestGovernanceObjectVotes -- Requesting votes for %s, peer=%d\n", nHashGovobj.ToString(), pnode->id);
RequestGovernanceObject(pnode, nHashGovobj);
mapAskedRecently[nHashGovobj] = nNow + mapObjects.size() * 60; // ask again after full cycle
}
}
bool CGovernanceManager::AcceptObjectMessage(const uint256& nHash) bool CGovernanceManager::AcceptObjectMessage(const uint256& nHash)
{ {
LOCK(cs); LOCK(cs);

View File

@ -375,6 +375,9 @@ public:
void InitOnLoad(); void InitOnLoad();
void RequestGovernanceObjectVotes(CNode* pnode);
void RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy);
private: private:
void RequestGovernanceObject(CNode* pfrom, const uint256& nHash); void RequestGovernanceObject(CNode* pfrom, const uint256& nHash);

View File

@ -276,7 +276,15 @@ void CMasternodeSync::ProcessTick()
LogPrintf("CMasternodeSync::ProcessTick -- WARNING: not enough data, restarting sync\n"); LogPrintf("CMasternodeSync::ProcessTick -- WARNING: not enough data, restarting sync\n");
Reset(); Reset();
} else { } else {
//if syncing is complete and we have masternodes, return std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
pnode->AddRef();
}
governance.RequestGovernanceObjectVotes(vNodesCopy);
ReleaseNodes(vNodesCopy);
return; return;
} }
} }
@ -475,8 +483,11 @@ void CMasternodeSync::ProcessTick()
// } // }
// } // }
// only request once from each peer // only request obj sync once from each peer, then request votes on per-obj basis
if(netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) continue; if(netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) {
governance.RequestGovernanceObjectVotes(pnode);
continue;
}
netfulfilledman.AddFulfilledRequest(pnode->addr, "governance-sync"); netfulfilledman.AddFulfilledRequest(pnode->addr, "governance-sync");
if (pnode->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) continue; if (pnode->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) continue;