Do not send dash-specific requests to masternodes before we are fully connected (#1882)

* Do not send dash-specific requests to masternodes before we are fully connected

Open all masternode connections via CConnman::ThreadOpenMasternodeConnections only. Queue requests and process them after some timeout.

* drop excessive `mnodeman.`

* switch from queues to maps of pending requests

* adjust few strings, add TODO for POOL_STATE_CONNECTING

* fix

* there can be only one pending dsa request per ps client
This commit is contained in:
UdjinM6 2018-02-01 04:10:52 +03:00 committed by GitHub
parent 5a5f618726
commit 1b1a440f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 96 deletions

View File

@ -740,6 +740,48 @@ std::pair<CService, std::set<uint256> > CMasternodeMan::PopScheduledMnbRequestCo
return std::make_pair(pairFront.first, setResult);
}
void CMasternodeMan::ProcessPendingMnbRequests(CConnman& connman)
{
std::pair<CService, std::set<uint256> > p = PopScheduledMnbRequestConnection();
if (!(p.first == CService() || p.second.empty())) {
if (connman.IsMasternodeOrDisconnectRequested(p.first)) return;
mapPendingMNB.insert(std::make_pair(p.first, std::make_pair(GetTime(), p.second)));
connman.AddPendingMasternode(p.first);
}
std::map<CService, std::pair<int64_t, std::set<uint256> > >::iterator itPendingMNB = mapPendingMNB.begin();
while (itPendingMNB != mapPendingMNB.end()) {
bool fDone = connman.ForNode(itPendingMNB->first, [&](CNode* pnode) {
// compile request vector
std::vector<CInv> vToFetch;
std::set<uint256>& setHashes = itPendingMNB->second.second;
std::set<uint256>::iterator it = setHashes.begin();
while(it != setHashes.end()) {
if(*it != uint256()) {
vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, *it));
LogPrint("masternode", "-- asking for mnb %s from addr=%s\n", it->ToString(), pnode->addr.ToString());
}
++it;
}
// ask for data
CNetMsgMaker msgMaker(pnode->GetSendVersion());
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::GETDATA, vToFetch));
return true;
});
int64_t nTimeAdded = itPendingMNB->second.first;
if (fDone || (GetTime() - nTimeAdded > 15)) {
if (!fDone) {
LogPrint("masternode", "CMasternodeMan::%s -- failed to connect to %s\n", __func__, itPendingMNB->first.ToString());
}
mapPendingMNB.erase(itPendingMNB++);
} else {
++itPendingMNB;
}
}
LogPrint("masternode", "%s -- mapPendingMNB size: %d\n", __func__, mapPendingMNB.size());
}
void CMasternodeMan::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
@ -1049,23 +1091,45 @@ bool CMasternodeMan::SendVerifyRequest(const CAddress& addr, const std::vector<C
return false;
}
connman.OpenMasternodeConnection(addr);
bool fSuccess = connman.ForNode(addr, CConnman::AllNodes, [&](CNode* pnode){
netfulfilledman.AddFulfilledRequest(addr, strprintf("%s", NetMsgType::MNVERIFY)+"-request");
if (connman.IsMasternodeOrDisconnectRequested(addr)) return false;
connman.AddPendingMasternode(addr);
// use random nonce, store it and require node to reply with correct one later
CMasternodeVerification mnv(addr, GetRandInt(999999), nCachedBlockHeight - 1);
mWeAskedForVerification[addr] = mnv;
LOCK(cs_mapPendingMNV);
mapPendingMNV.insert(std::make_pair(addr, std::make_pair(GetTime(), mnv)));
LogPrintf("CMasternodeMan::SendVerifyRequest -- verifying node using nonce %d addr=%s\n", mnv.nonce, addr.ToString());
CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange)
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::MNVERIFY, mnv));
return true;
});
if (!fSuccess) {
LogPrintf("CMasternodeMan::SendVerifyRequest -- can't connect to node to verify it, addr=%s\n", addr.ToString());
return false;
}
void CMasternodeMan::ProcessPendingMnvRequests(CConnman& connman)
{
LOCK(cs_mapPendingMNV);
std::map<CService, std::pair<int64_t, CMasternodeVerification> >::iterator itPendingMNV = mapPendingMNV.begin();
while (itPendingMNV != mapPendingMNV.end()) {
bool fDone = connman.ForNode(itPendingMNV->first, [&](CNode* pnode) {
netfulfilledman.AddFulfilledRequest(pnode->addr, strprintf("%s", NetMsgType::MNVERIFY)+"-request");
// use random nonce, store it and require node to reply with correct one later
mWeAskedForVerification[pnode->addr] = itPendingMNV->second.second;
LogPrint("masternode", "-- verifying node using nonce %d addr=%s\n", itPendingMNV->second.second.nonce, pnode->addr.ToString());
CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange)
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::MNVERIFY, itPendingMNV->second.second));
return true;
});
int64_t nTimeAdded = itPendingMNV->second.first;
if (fDone || (GetTime() - nTimeAdded > 15)) {
if (!fDone) {
LogPrint("masternode", "CMasternodeMan::%s -- failed to connect to %s\n", __func__, itPendingMNV->first.ToString());
}
mapPendingMNV.erase(itPendingMNV++);
} else {
++itPendingMNV;
}
}
LogPrint("masternode", "%s -- mapPendingMNV size: %d\n", __func__, mapPendingMNV.size());
}
void CMasternodeMan::SendVerifyReply(CNode* pnode, CMasternodeVerification& mnv, CConnman& connman)

View File

@ -63,6 +63,9 @@ private:
std::map<uint256, std::pair< int64_t, std::set<CNetAddr> > > mMnbRecoveryRequests;
std::map<uint256, std::vector<CMasternodeBroadcast> > mMnbRecoveryGoodReplies;
std::list< std::pair<CService, uint256> > listScheduledMnbRequestConnections;
std::map<CService, std::pair<int64_t, std::set<uint256> > > mapPendingMNB;
std::map<CService, std::pair<int64_t, CMasternodeVerification> > mapPendingMNV;
CCriticalSection cs_mapPendingMNV;
/// Set when masternodes are added, cleared when CGovernanceManager is notified
bool fMasternodesAdded;
@ -180,12 +183,14 @@ public:
void ProcessMasternodeConnections(CConnman& connman);
std::pair<CService, std::set<uint256> > PopScheduledMnbRequestConnection();
void ProcessPendingMnbRequests(CConnman& connman);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
void DoFullVerificationStep(CConnman& connman);
void CheckSameAddr();
bool SendVerifyRequest(const CAddress& addr, const std::vector<CMasternode*>& vSortedByAddr, CConnman& connman);
void ProcessPendingMnvRequests(CConnman& connman);
void SendVerifyReply(CNode* pnode, CMasternodeVerification& mnv, CConnman& connman);
void ProcessVerifyReply(CNode* pnode, CMasternodeVerification& mnv);
void ProcessVerifyBroadcast(CNode* pnode, const CMasternodeVerification& mnv);

View File

@ -1936,7 +1936,7 @@ void CConnman::ThreadOpenAddedConnections()
}
}
void CConnman::ThreadMnbRequestConnections()
void CConnman::ThreadOpenMasternodeConnections()
{
// Connecting to specific addresses, no masternode connections available
if (IsArgSet("-connect") && mapMultiArgs.at("-connect").size() > 0)
@ -1951,35 +1951,23 @@ void CConnman::ThreadMnbRequestConnections()
if (interruptNet)
return;
std::pair<CService, std::set<uint256> > p = mnodeman.PopScheduledMnbRequestConnection();
if(p.first == CService() || p.second.empty()) continue;
// compile request vector
std::vector<CInv> vToFetch;
std::set<uint256>::iterator it = p.second.begin();
while(it != p.second.end()) {
if(*it != uint256()) {
vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, *it));
LogPrint("masternode", "ThreadMnbRequestConnections -- asking for mnb %s from addr=%s\n", it->ToString(), p.first.ToString());
LOCK(cs_vPendingMasternodes);
std::vector<CService>::iterator it = vPendingMasternodes.begin();
while (it != vPendingMasternodes.end()) {
if (!IsMasternodeOrDisconnectRequested(*it)) {
OpenMasternodeConnection(CAddress(*it, NODE_NETWORK));
// should be in the list now if connection was opened
ForNode(*it, CConnman::AllNodes, [&](CNode* pnode) {
if (pnode->fDisconnect) {
return false;
}
++it;
}
CAddress addr(p.first, NODE_NETWORK);
OpenMasternodeConnection(addr);
ForNode(addr, CConnman::AllNodes, [&](CNode* pnode) {
if (pnode->fDisconnect) return false;
grant.MoveTo(pnode->grantMasternodeOutbound);
// ask for data
CNetMsgMaker msgMaker(pnode->GetSendVersion());
PushMessage(pnode, msgMaker.Make(NetMsgType::GETDATA, vToFetch));
return true;
});
}
it = vPendingMasternodes.erase(it);
}
}
}
// if successful, this moves the passed grant to the constructed node
@ -2369,7 +2357,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));
// Initiate masternode connections
threadMnbRequestConnections = std::thread(&TraceThread<std::function<void()> >, "mnbcon", std::function<void()>(std::bind(&CConnman::ThreadMnbRequestConnections, this)));
threadOpenMasternodeConnections = std::thread(&TraceThread<std::function<void()> >, "mncon", std::function<void()>(std::bind(&CConnman::ThreadOpenMasternodeConnections, this)));
// Process messages
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
@ -2431,8 +2419,8 @@ void CConnman::Stop()
{
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadMnbRequestConnections.joinable())
threadMnbRequestConnections.join();
if (threadOpenMasternodeConnections.joinable())
threadOpenMasternodeConnections.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
@ -2548,6 +2536,18 @@ bool CConnman::RemoveAddedNode(const std::string& strNode)
return false;
}
bool CConnman::AddPendingMasternode(const CService& service)
{
LOCK(cs_vPendingMasternodes);
for(std::vector<CService>::const_iterator it = vPendingMasternodes.begin(); it != vPendingMasternodes.end(); ++it) {
if (service == *it)
return false;
}
vPendingMasternodes.push_back(service);
return true;
}
size_t CConnman::GetNodeCount(NumConnections flags)
{
LOCK(cs_vNodes);

View File

@ -348,6 +348,7 @@ public:
bool AddNode(const std::string& node);
bool RemoveAddedNode(const std::string& node);
bool AddPendingMasternode(const CService& addr);
std::vector<AddedNodeInfo> GetAddedNodeInfo();
size_t GetNodeCount(NumConnections num);
@ -409,7 +410,7 @@ private:
void AcceptConnection(const ListenSocket& hListenSocket);
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
void ThreadMnbRequestConnections();
void ThreadOpenMasternodeConnections();
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
@ -475,6 +476,8 @@ private:
CCriticalSection cs_vOneShots;
std::vector<std::string> vAddedNodes;
CCriticalSection cs_vAddedNodes;
std::vector<CService> vPendingMasternodes;
CCriticalSection cs_vPendingMasternodes;
std::vector<CNode*> vNodes;
std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes;
@ -512,7 +515,7 @@ private:
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMnbRequestConnections;
std::thread threadOpenMasternodeConnections;
std::thread threadMessageHandler;
};
extern std::unique_ptr<CConnman> g_connman;

View File

@ -217,6 +217,7 @@ void CPrivateSendClient::SetNull()
nEntriesCount = 0;
fLastEntryAccepted = false;
infoMixingMasternode = masternode_info_t();
pendingDsaRequest = CPendingDsaRequest();
CPrivateSendBase::SetNull();
}
@ -880,31 +881,19 @@ bool CPrivateSendClient::JoinExistingQueue(CAmount nBalanceNeedsAnonymized, CCon
continue;
}
LogPrintf("CPrivateSendClient::JoinExistingQueue -- attempt to connect to masternode from queue, addr=%s\n", infoMn.addr.ToString());
// connect to Masternode and submit the queue request
CAddress addr(infoMn.addr, NODE_NETWORK);
connman.OpenMasternodeConnection(addr);
bool fSuccess = connman.ForNode(addr, CConnman::AllNodes, [&](CNode* pnode){
infoMixingMasternode = infoMn;
nSessionDenom = dsq.nDenom;
CDarksendAccept dsa(nSessionDenom, txMyCollateral);
CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange)
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, dsa));
LogPrintf("CPrivateSendClient::JoinExistingQueue -- connected (from queue), sending DSACCEPT: nSessionDenom: %d (%s), addr=%s\n",
nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), pnode->addr.ToString());
strAutoDenomResult = _("Mixing in progress...");
infoMixingMasternode = infoMn;
pendingDsaRequest = CPendingDsaRequest(infoMn.addr, CDarksendAccept(nSessionDenom, txMyCollateral));
connman.AddPendingMasternode(infoMn.addr);
// TODO: add new state POOL_STATE_CONNECTING and bump MIN_PRIVATESEND_PEER_PROTO_VERSION
SetState(POOL_STATE_QUEUE);
nTimeLastSuccessfulStep = GetTimeMillis();
return true;
});
if (!fSuccess) {
LogPrintf("CPrivateSendClient::JoinExistingQueue -- can't connect, addr=%s\n", infoMn.addr.ToString());
strAutoDenomResult = _("Error connecting to Masternode.");
continue;
}
LogPrintf("CPrivateSendClient::JoinExistingQueue -- pending connection (from queue): nSessionDenom: %d (%s), addr=%s\n",
nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), infoMn.addr.ToString());
strAutoDenomResult = _("Trying to connect...");
return true;
}
strAutoDenomResult = _("Failed to find mixing queue to join");
return false;
}
@ -949,12 +938,6 @@ bool CPrivateSendClient::StartNewQueue(CAmount nValueMin, CAmount nBalanceNeedsA
}
LogPrintf("CPrivateSendClient::StartNewQueue -- attempt %d connection to Masternode %s\n", nTries, infoMn.addr.ToString());
CAddress addr(infoMn.addr, NODE_NETWORK);
connman.OpenMasternodeConnection(addr);
bool fSuccess = connman.ForNode(addr, CConnman::AllNodes, [&](CNode* pnode){
LogPrintf("CPrivateSendClient::StartNewQueue -- connected, addr=%s\n", infoMn.addr.ToString());
infoMixingMasternode = infoMn;
std::vector<CAmount> vecAmounts;
pwalletMain->ConvertList(vecTxIn, vecAmounts);
@ -962,24 +945,43 @@ bool CPrivateSendClient::StartNewQueue(CAmount nValueMin, CAmount nBalanceNeedsA
while(nSessionDenom == 0) {
nSessionDenom = CPrivateSend::GetDenominationsByAmounts(vecAmounts);
}
CDarksendAccept dsa(nSessionDenom, txMyCollateral);
CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange)
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, dsa));
LogPrintf("CPrivateSendClient::StartNewQueue -- connected, sending DSACCEPT, nSessionDenom: %d (%s)\n",
nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom));
strAutoDenomResult = _("Mixing in progress...");
infoMixingMasternode = infoMn;
connman.AddPendingMasternode(infoMn.addr);
pendingDsaRequest = CPendingDsaRequest(infoMn.addr, CDarksendAccept(nSessionDenom, txMyCollateral));
// TODO: add new state POOL_STATE_CONNECTING and bump MIN_PRIVATESEND_PEER_PROTO_VERSION
SetState(POOL_STATE_QUEUE);
nTimeLastSuccessfulStep = GetTimeMillis();
LogPrintf("CPrivateSendClient::StartNewQueue -- pending connection, nSessionDenom: %d (%s), addr=%s\n",
nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), infoMn.addr.ToString());
strAutoDenomResult = _("Trying to connect...");
return true;
}
strAutoDenomResult = _("Failed to start a new mixing queue");
return false;
}
void CPrivateSendClient::ProcessPendingDsaRequest(CConnman& connman)
{
if (pendingDsaRequest == CPendingDsaRequest()) return;
bool fDone = connman.ForNode(pendingDsaRequest.GetAddr(), [&](CNode* pnode) {
LogPrint("privatesend", "-- processing dsa queue for addr=%s\n", pnode->addr.ToString());
nTimeLastSuccessfulStep = GetTimeMillis();
// TODO: this vvvv should be here after new state POOL_STATE_CONNECTING is added and MIN_PRIVATESEND_PEER_PROTO_VERSION is bumped
// SetState(POOL_STATE_QUEUE);
strAutoDenomResult = _("Mixing in progress...");
CNetMsgMaker msgMaker(pnode->GetSendVersion());
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, pendingDsaRequest.GetDSA()));
return true;
});
if (!fSuccess) {
LogPrintf("CPrivateSendClient::StartNewQueue -- can't connect, addr=%s\n", infoMn.addr.ToString());
nTries++;
continue;
if (fDone) {
pendingDsaRequest = CPendingDsaRequest();
} else if (pendingDsaRequest.IsExpired()) {
LogPrint("privatesend", "CPrivateSendClient::%s -- failed to connect to %s\n", __func__, pendingDsaRequest.GetAddr().ToString());
SetNull();
}
return true;
}
return false;
}
bool CPrivateSendClient::SubmitDenominate(CConnman& connman)
@ -1417,6 +1419,7 @@ void ThreadCheckPrivateSendClient(CConnman& connman)
if(masternodeSync.IsBlockchainSynced() && !ShutdownRequested()) {
nTick++;
privateSendClient.CheckTimeout();
privateSendClient.ProcessPendingDsaRequest(connman);
if(nDoAutoNextRun == nTick) {
privateSendClient.DoAutomaticDenominating(connman);
nDoAutoNextRun = nTick + PRIVATESEND_AUTO_TIMEOUT_MIN + GetRandInt(PRIVATESEND_AUTO_TIMEOUT_MAX - PRIVATESEND_AUTO_TIMEOUT_MIN);

View File

@ -29,6 +29,45 @@ static const int PRIVATESEND_KEYS_THRESHOLD_STOP = 50;
// The main object for accessing mixing
extern CPrivateSendClient privateSendClient;
class CPendingDsaRequest
{
private:
static const int TIMEOUT = 15;
CService addr;
CDarksendAccept dsa;
int64_t nTimeCreated;
public:
CPendingDsaRequest():
addr(CService()),
dsa(CDarksendAccept()),
nTimeCreated(0)
{};
CPendingDsaRequest(const CService& addr_, const CDarksendAccept& dsa_):
addr(addr_),
dsa(dsa_)
{ nTimeCreated = GetTime(); }
CService GetAddr() { return addr; }
CDarksendAccept GetDSA() { return dsa; }
bool IsExpired() { return GetTime() - nTimeCreated > TIMEOUT; }
friend bool operator==(const CPendingDsaRequest& a, const CPendingDsaRequest& b)
{
return a.addr == b.addr && a.dsa == b.dsa;
}
friend bool operator!=(const CPendingDsaRequest& a, const CPendingDsaRequest& b)
{
return !(a == b);
}
explicit operator bool() const
{
return *this != CPendingDsaRequest();
}
};
/** Used to keep track of current status of mixing pool
*/
class CPrivateSendClient : public CPrivateSendBase
@ -54,6 +93,7 @@ private:
masternode_info_t infoMixingMasternode;
CMutableTransaction txMyCollateral; // client side collateral
CPendingDsaRequest pendingDsaRequest;
CKeyHolderStorage keyHolderStorage; // storage for keys used in PrepareDenominate
@ -140,6 +180,8 @@ public:
/// Passively run mixing in the background according to the configuration in settings
bool DoAutomaticDenominating(CConnman& connman, bool fDryRun=false);
void ProcessPendingDsaRequest(CConnman& connman);
void CheckTimeout();
void UpdatedBlockTip(const CBlockIndex *pindex);

View File

@ -455,6 +455,9 @@ void ThreadCheckPrivateSend(CConnman& connman)
// make sure to check all masternodes first
mnodeman.Check();
mnodeman.ProcessPendingMnbRequests(connman);
mnodeman.ProcessPendingMnvRequests(connman);
// check if we should activate or ping every few minutes,
// slightly postpone first run to give net thread a chance to connect to some peers
if(nTick % MASTERNODE_MIN_MNP_SECONDS == 15)

View File

@ -121,6 +121,11 @@ public:
READWRITE(nDenom);
READWRITE(txCollateral);
}
friend bool operator==(const CDarksendAccept& a, const CDarksendAccept& b)
{
return a.nDenom == b.nDenom && a.txCollateral == b.txCollateral;
}
};
// A clients transaction in the mixing pool