From 1b1a440f4f2fbeee1c621d23a770d4de822e9b86 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Thu, 1 Feb 2018 04:10:52 +0300 Subject: [PATCH] Do not send dash-specific requests to masternodes before we are fully connected (#1882) * Do not send dash-specific requests to masternodes before we are fully connected Open all masternode connections via CConnman::ThreadOpenMasternodeConnections only. Queue requests and process them after some timeout. * drop excessive `mnodeman.` * switch from queues to maps of pending requests * adjust few strings, add TODO for POOL_STATE_CONNECTING * fix * there can be only one pending dsa request per ps client --- src/masternodeman.cpp | 94 ++++++++++++++++++++++++++++------ src/masternodeman.h | 5 ++ src/net.cpp | 60 +++++++++++----------- src/net.h | 7 ++- src/privatesend-client.cpp | 101 +++++++++++++++++++------------------ src/privatesend-client.h | 42 +++++++++++++++ src/privatesend.cpp | 3 ++ src/privatesend.h | 5 ++ 8 files changed, 221 insertions(+), 96 deletions(-) diff --git a/src/masternodeman.cpp b/src/masternodeman.cpp index 8fe758cbdd..51a6eb1263 100644 --- a/src/masternodeman.cpp +++ b/src/masternodeman.cpp @@ -740,6 +740,48 @@ std::pair > CMasternodeMan::PopScheduledMnbRequestCo return std::make_pair(pairFront.first, setResult); } +void CMasternodeMan::ProcessPendingMnbRequests(CConnman& connman) +{ + std::pair > p = PopScheduledMnbRequestConnection(); + if (!(p.first == CService() || p.second.empty())) { + if (connman.IsMasternodeOrDisconnectRequested(p.first)) return; + mapPendingMNB.insert(std::make_pair(p.first, std::make_pair(GetTime(), p.second))); + connman.AddPendingMasternode(p.first); + } + + std::map > >::iterator itPendingMNB = mapPendingMNB.begin(); + while (itPendingMNB != mapPendingMNB.end()) { + bool fDone = connman.ForNode(itPendingMNB->first, [&](CNode* pnode) { + // compile request vector + std::vector vToFetch; + std::set& setHashes = itPendingMNB->second.second; + std::set::iterator it = setHashes.begin(); + while(it != setHashes.end()) { + if(*it != uint256()) { + vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, *it)); + LogPrint("masternode", "-- asking for mnb %s from addr=%s\n", it->ToString(), pnode->addr.ToString()); + } + ++it; + } + + // ask for data + CNetMsgMaker msgMaker(pnode->GetSendVersion()); + connman.PushMessage(pnode, msgMaker.Make(NetMsgType::GETDATA, vToFetch)); + return true; + }); + + int64_t nTimeAdded = itPendingMNB->second.first; + if (fDone || (GetTime() - nTimeAdded > 15)) { + if (!fDone) { + LogPrint("masternode", "CMasternodeMan::%s -- failed to connect to %s\n", __func__, itPendingMNB->first.ToString()); + } + mapPendingMNB.erase(itPendingMNB++); + } else { + ++itPendingMNB; + } + } + LogPrint("masternode", "%s -- mapPendingMNB size: %d\n", __func__, mapPendingMNB.size()); +} void CMasternodeMan::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) { @@ -1049,25 +1091,47 @@ bool CMasternodeMan::SendVerifyRequest(const CAddress& addr, const std::vectorGetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange) - connman.PushMessage(pnode, msgMaker.Make(NetMsgType::MNVERIFY, mnv)); - return true; - }); - if (!fSuccess) { - LogPrintf("CMasternodeMan::SendVerifyRequest -- can't connect to node to verify it, addr=%s\n", addr.ToString()); - return false; - } + if (connman.IsMasternodeOrDisconnectRequested(addr)) return false; + connman.AddPendingMasternode(addr); + // use random nonce, store it and require node to reply with correct one later + CMasternodeVerification mnv(addr, GetRandInt(999999), nCachedBlockHeight - 1); + LOCK(cs_mapPendingMNV); + mapPendingMNV.insert(std::make_pair(addr, std::make_pair(GetTime(), mnv))); + LogPrintf("CMasternodeMan::SendVerifyRequest -- verifying node using nonce %d addr=%s\n", mnv.nonce, addr.ToString()); return true; } +void CMasternodeMan::ProcessPendingMnvRequests(CConnman& connman) +{ + LOCK(cs_mapPendingMNV); + + std::map >::iterator itPendingMNV = mapPendingMNV.begin(); + + while (itPendingMNV != mapPendingMNV.end()) { + bool fDone = connman.ForNode(itPendingMNV->first, [&](CNode* pnode) { + netfulfilledman.AddFulfilledRequest(pnode->addr, strprintf("%s", NetMsgType::MNVERIFY)+"-request"); + // use random nonce, store it and require node to reply with correct one later + mWeAskedForVerification[pnode->addr] = itPendingMNV->second.second; + LogPrint("masternode", "-- verifying node using nonce %d addr=%s\n", itPendingMNV->second.second.nonce, pnode->addr.ToString()); + CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange) + connman.PushMessage(pnode, msgMaker.Make(NetMsgType::MNVERIFY, itPendingMNV->second.second)); + return true; + }); + + int64_t nTimeAdded = itPendingMNV->second.first; + if (fDone || (GetTime() - nTimeAdded > 15)) { + if (!fDone) { + LogPrint("masternode", "CMasternodeMan::%s -- failed to connect to %s\n", __func__, itPendingMNV->first.ToString()); + } + mapPendingMNV.erase(itPendingMNV++); + } else { + ++itPendingMNV; + } + } + LogPrint("masternode", "%s -- mapPendingMNV size: %d\n", __func__, mapPendingMNV.size()); +} + void CMasternodeMan::SendVerifyReply(CNode* pnode, CMasternodeVerification& mnv, CConnman& connman) { // only masternodes can sign this, why would someone ask regular node? diff --git a/src/masternodeman.h b/src/masternodeman.h index 273790a74c..3df7829c11 100644 --- a/src/masternodeman.h +++ b/src/masternodeman.h @@ -63,6 +63,9 @@ private: std::map > > mMnbRecoveryRequests; std::map > mMnbRecoveryGoodReplies; std::list< std::pair > listScheduledMnbRequestConnections; + std::map > > mapPendingMNB; + std::map > mapPendingMNV; + CCriticalSection cs_mapPendingMNV; /// Set when masternodes are added, cleared when CGovernanceManager is notified bool fMasternodesAdded; @@ -180,12 +183,14 @@ public: void ProcessMasternodeConnections(CConnman& connman); std::pair > PopScheduledMnbRequestConnection(); + void ProcessPendingMnbRequests(CConnman& connman); void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void DoFullVerificationStep(CConnman& connman); void CheckSameAddr(); bool SendVerifyRequest(const CAddress& addr, const std::vector& vSortedByAddr, CConnman& connman); + void ProcessPendingMnvRequests(CConnman& connman); void SendVerifyReply(CNode* pnode, CMasternodeVerification& mnv, CConnman& connman); void ProcessVerifyReply(CNode* pnode, CMasternodeVerification& mnv); void ProcessVerifyBroadcast(CNode* pnode, const CMasternodeVerification& mnv); diff --git a/src/net.cpp b/src/net.cpp index 2fb22e397f..9b13e31193 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1936,7 +1936,7 @@ void CConnman::ThreadOpenAddedConnections() } } -void CConnman::ThreadMnbRequestConnections() +void CConnman::ThreadOpenMasternodeConnections() { // Connecting to specific addresses, no masternode connections available if (IsArgSet("-connect") && mapMultiArgs.at("-connect").size() > 0) @@ -1951,34 +1951,22 @@ void CConnman::ThreadMnbRequestConnections() if (interruptNet) return; - std::pair > p = mnodeman.PopScheduledMnbRequestConnection(); - if(p.first == CService() || p.second.empty()) continue; - - // compile request vector - std::vector vToFetch; - std::set::iterator it = p.second.begin(); - while(it != p.second.end()) { - if(*it != uint256()) { - vToFetch.push_back(CInv(MSG_MASTERNODE_ANNOUNCE, *it)); - LogPrint("masternode", "ThreadMnbRequestConnections -- asking for mnb %s from addr=%s\n", it->ToString(), p.first.ToString()); + LOCK(cs_vPendingMasternodes); + std::vector::iterator it = vPendingMasternodes.begin(); + while (it != vPendingMasternodes.end()) { + if (!IsMasternodeOrDisconnectRequested(*it)) { + OpenMasternodeConnection(CAddress(*it, NODE_NETWORK)); + // should be in the list now if connection was opened + ForNode(*it, CConnman::AllNodes, [&](CNode* pnode) { + if (pnode->fDisconnect) { + return false; + } + grant.MoveTo(pnode->grantMasternodeOutbound); + return true; + }); } - ++it; + it = vPendingMasternodes.erase(it); } - - CAddress addr(p.first, NODE_NETWORK); - OpenMasternodeConnection(addr); - - ForNode(addr, CConnman::AllNodes, [&](CNode* pnode) { - if (pnode->fDisconnect) return false; - - grant.MoveTo(pnode->grantMasternodeOutbound); - - // ask for data - CNetMsgMaker msgMaker(pnode->GetSendVersion()); - PushMessage(pnode, msgMaker.Make(NetMsgType::GETDATA, vToFetch)); - - return true; - }); } } @@ -2369,7 +2357,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c threadOpenConnections = std::thread(&TraceThread >, "opencon", std::function(std::bind(&CConnman::ThreadOpenConnections, this))); // Initiate masternode connections - threadMnbRequestConnections = std::thread(&TraceThread >, "mnbcon", std::function(std::bind(&CConnman::ThreadMnbRequestConnections, this))); + threadOpenMasternodeConnections = std::thread(&TraceThread >, "mncon", std::function(std::bind(&CConnman::ThreadOpenMasternodeConnections, this))); // Process messages threadMessageHandler = std::thread(&TraceThread >, "msghand", std::function(std::bind(&CConnman::ThreadMessageHandler, this))); @@ -2431,8 +2419,8 @@ void CConnman::Stop() { if (threadMessageHandler.joinable()) threadMessageHandler.join(); - if (threadMnbRequestConnections.joinable()) - threadMnbRequestConnections.join(); + if (threadOpenMasternodeConnections.joinable()) + threadOpenMasternodeConnections.join(); if (threadOpenConnections.joinable()) threadOpenConnections.join(); if (threadOpenAddedConnections.joinable()) @@ -2548,6 +2536,18 @@ bool CConnman::RemoveAddedNode(const std::string& strNode) return false; } +bool CConnman::AddPendingMasternode(const CService& service) +{ + LOCK(cs_vPendingMasternodes); + for(std::vector::const_iterator it = vPendingMasternodes.begin(); it != vPendingMasternodes.end(); ++it) { + if (service == *it) + return false; + } + + vPendingMasternodes.push_back(service); + return true; +} + size_t CConnman::GetNodeCount(NumConnections flags) { LOCK(cs_vNodes); diff --git a/src/net.h b/src/net.h index fd334c086a..e97a04443c 100644 --- a/src/net.h +++ b/src/net.h @@ -348,6 +348,7 @@ public: bool AddNode(const std::string& node); bool RemoveAddedNode(const std::string& node); + bool AddPendingMasternode(const CService& addr); std::vector GetAddedNodeInfo(); size_t GetNodeCount(NumConnections num); @@ -409,7 +410,7 @@ private: void AcceptConnection(const ListenSocket& hListenSocket); void ThreadSocketHandler(); void ThreadDNSAddressSeed(); - void ThreadMnbRequestConnections(); + void ThreadOpenMasternodeConnections(); uint64_t CalculateKeyedNetGroup(const CAddress& ad) const; @@ -475,6 +476,8 @@ private: CCriticalSection cs_vOneShots; std::vector vAddedNodes; CCriticalSection cs_vAddedNodes; + std::vector vPendingMasternodes; + CCriticalSection cs_vPendingMasternodes; std::vector vNodes; std::list vNodesDisconnected; mutable CCriticalSection cs_vNodes; @@ -512,7 +515,7 @@ private: std::thread threadSocketHandler; std::thread threadOpenAddedConnections; std::thread threadOpenConnections; - std::thread threadMnbRequestConnections; + std::thread threadOpenMasternodeConnections; std::thread threadMessageHandler; }; extern std::unique_ptr g_connman; diff --git a/src/privatesend-client.cpp b/src/privatesend-client.cpp index 0a4d3c71ab..42705fb1ca 100644 --- a/src/privatesend-client.cpp +++ b/src/privatesend-client.cpp @@ -217,6 +217,7 @@ void CPrivateSendClient::SetNull() nEntriesCount = 0; fLastEntryAccepted = false; infoMixingMasternode = masternode_info_t(); + pendingDsaRequest = CPendingDsaRequest(); CPrivateSendBase::SetNull(); } @@ -880,31 +881,19 @@ bool CPrivateSendClient::JoinExistingQueue(CAmount nBalanceNeedsAnonymized, CCon continue; } - LogPrintf("CPrivateSendClient::JoinExistingQueue -- attempt to connect to masternode from queue, addr=%s\n", infoMn.addr.ToString()); - // connect to Masternode and submit the queue request - CAddress addr(infoMn.addr, NODE_NETWORK); - connman.OpenMasternodeConnection(addr); - - bool fSuccess = connman.ForNode(addr, CConnman::AllNodes, [&](CNode* pnode){ - infoMixingMasternode = infoMn; - nSessionDenom = dsq.nDenom; - CDarksendAccept dsa(nSessionDenom, txMyCollateral); - CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange) - connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, dsa)); - LogPrintf("CPrivateSendClient::JoinExistingQueue -- connected (from queue), sending DSACCEPT: nSessionDenom: %d (%s), addr=%s\n", - nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), pnode->addr.ToString()); - strAutoDenomResult = _("Mixing in progress..."); - SetState(POOL_STATE_QUEUE); - nTimeLastSuccessfulStep = GetTimeMillis(); - return true; - }); - if (!fSuccess) { - LogPrintf("CPrivateSendClient::JoinExistingQueue -- can't connect, addr=%s\n", infoMn.addr.ToString()); - strAutoDenomResult = _("Error connecting to Masternode."); - continue; - } + nSessionDenom = dsq.nDenom; + infoMixingMasternode = infoMn; + pendingDsaRequest = CPendingDsaRequest(infoMn.addr, CDarksendAccept(nSessionDenom, txMyCollateral)); + connman.AddPendingMasternode(infoMn.addr); + // TODO: add new state POOL_STATE_CONNECTING and bump MIN_PRIVATESEND_PEER_PROTO_VERSION + SetState(POOL_STATE_QUEUE); + nTimeLastSuccessfulStep = GetTimeMillis(); + LogPrintf("CPrivateSendClient::JoinExistingQueue -- pending connection (from queue): nSessionDenom: %d (%s), addr=%s\n", + nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), infoMn.addr.ToString()); + strAutoDenomResult = _("Trying to connect..."); return true; } + strAutoDenomResult = _("Failed to find mixing queue to join"); return false; } @@ -949,39 +938,52 @@ bool CPrivateSendClient::StartNewQueue(CAmount nValueMin, CAmount nBalanceNeedsA } LogPrintf("CPrivateSendClient::StartNewQueue -- attempt %d connection to Masternode %s\n", nTries, infoMn.addr.ToString()); - CAddress addr(infoMn.addr, NODE_NETWORK); - connman.OpenMasternodeConnection(addr); - bool fSuccess = connman.ForNode(addr, CConnman::AllNodes, [&](CNode* pnode){ - LogPrintf("CPrivateSendClient::StartNewQueue -- connected, addr=%s\n", infoMn.addr.ToString()); - infoMixingMasternode = infoMn; - - std::vector vecAmounts; - pwalletMain->ConvertList(vecTxIn, vecAmounts); - // try to get a single random denom out of vecAmounts - while(nSessionDenom == 0) { - nSessionDenom = CPrivateSend::GetDenominationsByAmounts(vecAmounts); - } - CDarksendAccept dsa(nSessionDenom, txMyCollateral); - CNetMsgMaker msgMaker(pnode->GetSendVersion()); // TODO this gives a warning about version not being set (we should wait for VERSION exchange) - connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, dsa)); - LogPrintf("CPrivateSendClient::StartNewQueue -- connected, sending DSACCEPT, nSessionDenom: %d (%s)\n", - nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom)); - strAutoDenomResult = _("Mixing in progress..."); - SetState(POOL_STATE_QUEUE); - nTimeLastSuccessfulStep = GetTimeMillis(); - return true; - }); - if (!fSuccess) { - LogPrintf("CPrivateSendClient::StartNewQueue -- can't connect, addr=%s\n", infoMn.addr.ToString()); - nTries++; - continue; + std::vector vecAmounts; + pwalletMain->ConvertList(vecTxIn, vecAmounts); + // try to get a single random denom out of vecAmounts + while(nSessionDenom == 0) { + nSessionDenom = CPrivateSend::GetDenominationsByAmounts(vecAmounts); } + + infoMixingMasternode = infoMn; + connman.AddPendingMasternode(infoMn.addr); + pendingDsaRequest = CPendingDsaRequest(infoMn.addr, CDarksendAccept(nSessionDenom, txMyCollateral)); + // TODO: add new state POOL_STATE_CONNECTING and bump MIN_PRIVATESEND_PEER_PROTO_VERSION + SetState(POOL_STATE_QUEUE); + nTimeLastSuccessfulStep = GetTimeMillis(); + LogPrintf("CPrivateSendClient::StartNewQueue -- pending connection, nSessionDenom: %d (%s), addr=%s\n", + nSessionDenom, CPrivateSend::GetDenominationsToString(nSessionDenom), infoMn.addr.ToString()); + strAutoDenomResult = _("Trying to connect..."); return true; } + strAutoDenomResult = _("Failed to start a new mixing queue"); return false; } +void CPrivateSendClient::ProcessPendingDsaRequest(CConnman& connman) +{ + if (pendingDsaRequest == CPendingDsaRequest()) return; + + bool fDone = connman.ForNode(pendingDsaRequest.GetAddr(), [&](CNode* pnode) { + LogPrint("privatesend", "-- processing dsa queue for addr=%s\n", pnode->addr.ToString()); + nTimeLastSuccessfulStep = GetTimeMillis(); + // TODO: this vvvv should be here after new state POOL_STATE_CONNECTING is added and MIN_PRIVATESEND_PEER_PROTO_VERSION is bumped + // SetState(POOL_STATE_QUEUE); + strAutoDenomResult = _("Mixing in progress..."); + CNetMsgMaker msgMaker(pnode->GetSendVersion()); + connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSACCEPT, pendingDsaRequest.GetDSA())); + return true; + }); + + if (fDone) { + pendingDsaRequest = CPendingDsaRequest(); + } else if (pendingDsaRequest.IsExpired()) { + LogPrint("privatesend", "CPrivateSendClient::%s -- failed to connect to %s\n", __func__, pendingDsaRequest.GetAddr().ToString()); + SetNull(); + } +} + bool CPrivateSendClient::SubmitDenominate(CConnman& connman) { std::string strError; @@ -1417,6 +1419,7 @@ void ThreadCheckPrivateSendClient(CConnman& connman) if(masternodeSync.IsBlockchainSynced() && !ShutdownRequested()) { nTick++; privateSendClient.CheckTimeout(); + privateSendClient.ProcessPendingDsaRequest(connman); if(nDoAutoNextRun == nTick) { privateSendClient.DoAutomaticDenominating(connman); nDoAutoNextRun = nTick + PRIVATESEND_AUTO_TIMEOUT_MIN + GetRandInt(PRIVATESEND_AUTO_TIMEOUT_MAX - PRIVATESEND_AUTO_TIMEOUT_MIN); diff --git a/src/privatesend-client.h b/src/privatesend-client.h index 0a714d80a6..3278940547 100644 --- a/src/privatesend-client.h +++ b/src/privatesend-client.h @@ -29,6 +29,45 @@ static const int PRIVATESEND_KEYS_THRESHOLD_STOP = 50; // The main object for accessing mixing extern CPrivateSendClient privateSendClient; +class CPendingDsaRequest +{ +private: + static const int TIMEOUT = 15; + + CService addr; + CDarksendAccept dsa; + int64_t nTimeCreated; + +public: + CPendingDsaRequest(): + addr(CService()), + dsa(CDarksendAccept()), + nTimeCreated(0) + {}; + + CPendingDsaRequest(const CService& addr_, const CDarksendAccept& dsa_): + addr(addr_), + dsa(dsa_) + { nTimeCreated = GetTime(); } + + CService GetAddr() { return addr; } + CDarksendAccept GetDSA() { return dsa; } + bool IsExpired() { return GetTime() - nTimeCreated > TIMEOUT; } + + friend bool operator==(const CPendingDsaRequest& a, const CPendingDsaRequest& b) + { + return a.addr == b.addr && a.dsa == b.dsa; + } + friend bool operator!=(const CPendingDsaRequest& a, const CPendingDsaRequest& b) + { + return !(a == b); + } + explicit operator bool() const + { + return *this != CPendingDsaRequest(); + } +}; + /** Used to keep track of current status of mixing pool */ class CPrivateSendClient : public CPrivateSendBase @@ -54,6 +93,7 @@ private: masternode_info_t infoMixingMasternode; CMutableTransaction txMyCollateral; // client side collateral + CPendingDsaRequest pendingDsaRequest; CKeyHolderStorage keyHolderStorage; // storage for keys used in PrepareDenominate @@ -140,6 +180,8 @@ public: /// Passively run mixing in the background according to the configuration in settings bool DoAutomaticDenominating(CConnman& connman, bool fDryRun=false); + void ProcessPendingDsaRequest(CConnman& connman); + void CheckTimeout(); void UpdatedBlockTip(const CBlockIndex *pindex); diff --git a/src/privatesend.cpp b/src/privatesend.cpp index f48f5100c1..eff0a47dd1 100644 --- a/src/privatesend.cpp +++ b/src/privatesend.cpp @@ -455,6 +455,9 @@ void ThreadCheckPrivateSend(CConnman& connman) // make sure to check all masternodes first mnodeman.Check(); + mnodeman.ProcessPendingMnbRequests(connman); + mnodeman.ProcessPendingMnvRequests(connman); + // check if we should activate or ping every few minutes, // slightly postpone first run to give net thread a chance to connect to some peers if(nTick % MASTERNODE_MIN_MNP_SECONDS == 15) diff --git a/src/privatesend.h b/src/privatesend.h index eea4e5acc2..938aeefbad 100644 --- a/src/privatesend.h +++ b/src/privatesend.h @@ -121,6 +121,11 @@ public: READWRITE(nDenom); READWRITE(txCollateral); } + + friend bool operator==(const CDarksendAccept& a, const CDarksendAccept& b) + { + return a.nDenom == b.nDenom && a.txCollateral == b.txCollateral; + } }; // A clients transaction in the mixing pool