diff --git a/src/checkqueue.h b/src/checkqueue.h index 68357c2177..e94103f416 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -65,7 +65,7 @@ private: bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster) + bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; @@ -139,7 +139,7 @@ public: } //! Create a pool of new worker threads. - void StartWorkerThreads(const int threads_num) + void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { { LOCK(m_mutex); @@ -157,13 +157,13 @@ public: } //! Wait until execution finishes, and return whether all evaluations were successful. - bool Wait() + bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { return Loop(true /* master thread */); } //! Add a batch of checks to the queue - void Add(std::vector& vChecks) + void Add(std::vector& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { if (vChecks.empty()) { return; @@ -186,7 +186,7 @@ public: } //! Stop all of the worker threads. - void StopWorkerThreads() + void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { WITH_LOCK(m_mutex, m_request_stop = true); m_worker_cv.notify_all(); diff --git a/src/coinjoin/coinjoin.h b/src/coinjoin/coinjoin.h index bb42917d32..06dd934a83 100644 --- a/src/coinjoin/coinjoin.h +++ b/src/coinjoin/coinjoin.h @@ -368,15 +368,22 @@ public: void AddDSTX(const CCoinJoinBroadcastTx& dstx) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); CCoinJoinBroadcastTx GetDSTX(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); - void UpdatedBlockTip(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, const CMasternodeSync& mn_sync); - void NotifyChainLock(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, const CMasternodeSync& mn_sync); + void UpdatedBlockTip(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, + const CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); + void NotifyChainLock(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler, + const CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); void TransactionAddedToMempool(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); - void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); - void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex*) EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); + void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) + EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); + void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex*) + EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); private: - void CheckDSTXes(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler); + void CheckDSTXes(const CBlockIndex* pindex, const llmq::CChainLocksHandler& clhandler) + EXCLUSIVE_LOCKS_REQUIRED(!cs_mapdstx); void UpdateDSTXConfirmedHeight(const CTransactionRef& tx, std::optional nHeight); }; diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 023d0603e9..bb1f654cc0 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -1223,11 +1223,11 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const { - std::array nodeCopy{&peer}; - return RequestGovernanceObjectVotes(nodeCopy, connman); + const std::vector vNodeCopy{&peer}; + return RequestGovernanceObjectVotes(vNodeCopy, connman); } -int CGovernanceManager::RequestGovernanceObjectVotes(Span vNodesCopy, CConnman& connman) const +int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman) const { static std::map > mapAskedRecently; @@ -1501,7 +1501,7 @@ void CGovernanceManager::UpdatedBlockTip(const CBlockIndex* pindex, CConnman& co void CGovernanceManager::RequestOrphanObjects(CConnman& connman) { - std::vector vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly); + const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly}; std::vector vecHashesFiltered; { @@ -1517,15 +1517,13 @@ void CGovernanceManager::RequestOrphanObjects(CConnman& connman) LogPrint(BCLog::GOBJECT, "CGovernanceObject::RequestOrphanObjects -- number objects = %d\n", vecHashesFiltered.size()); for (const uint256& nHash : vecHashesFiltered) { - for (CNode* pnode : vNodesCopy) { + for (CNode* pnode : snap.Nodes()) { if (!pnode->CanRelay()) { continue; } RequestGovernanceObject(pnode, nHash, connman); } } - - connman.ReleaseNodeVector(vNodesCopy); } void CGovernanceManager::CleanOrphanObjects() diff --git a/src/governance/governance.h b/src/governance/governance.h index 0747e5dafa..03e8d01b56 100644 --- a/src/governance/governance.h +++ b/src/governance/governance.h @@ -358,7 +358,7 @@ public: void InitOnLoad(); int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const; - int RequestGovernanceObjectVotes(Span vNodesCopy, CConnman& connman) const; + int RequestGovernanceObjectVotes(const std::vector& vNodesCopy, CConnman& connman) const; /* * Trigger Management (formerly CGovernanceTriggerManager) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 2572df0cf3..a749a02699 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -83,7 +83,7 @@ public: { } /** Enqueue a work item */ - bool Enqueue(WorkItem* item) + bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs) { LOCK(cs); if (!running || queue.size() >= maxDepth) { @@ -94,7 +94,7 @@ public: return true; } /** Thread function */ - void Run() + void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs) { while (true) { std::unique_ptr i; @@ -111,7 +111,7 @@ public: } } /** Interrupt and exit loops */ - void Interrupt() + void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!cs) { LOCK(cs); running = false; diff --git a/src/i2p.h b/src/i2p.h index cb2efedba8..3899dbf81f 100644 --- a/src/i2p.h +++ b/src/i2p.h @@ -84,7 +84,7 @@ public: * to the listening socket and address. * @return true on success */ - bool Listen(Connection& conn); + bool Listen(Connection& conn) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); /** * Wait for and accept a new incoming connection. @@ -103,7 +103,7 @@ public: * it is set to `false`. Only set if `false` is returned. * @return true on success */ - bool Connect(const CService& to, Connection& conn, bool& proxy_error); + bool Connect(const CService& to, Connection& conn, bool& proxy_error) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); private: /** @@ -172,7 +172,7 @@ private: /** * Check the control socket for errors and possibly disconnect. */ - void CheckControlSock(); + void CheckControlSock() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); /** * Generate a new destination with the SAM proxy and set `m_private_key` to it. diff --git a/src/index/blockfilterindex.h b/src/index/blockfilterindex.h index 96c4936f8d..760a712349 100644 --- a/src/index/blockfilterindex.h +++ b/src/index/blockfilterindex.h @@ -63,7 +63,7 @@ public: bool LookupFilter(const CBlockIndex* block_index, BlockFilter& filter_out) const; /** Get a single filter header by block. */ - bool LookupFilterHeader(const CBlockIndex* block_index, uint256& header_out); + bool LookupFilterHeader(const CBlockIndex* block_index, uint256& header_out) EXCLUSIVE_LOCKS_REQUIRED(!m_cs_headers_cache); /** Get a range of filters between two heights on a chain. */ bool LookupFilterRange(int start_height, const CBlockIndex* stop_index, diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index bcbf91fe7e..167be04380 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -1336,7 +1336,7 @@ void CDKGSession::RelayInvToParticipants(const CInv& inv) const if (pnode->GetVerifiedProRegTxHash().IsNull()) { logger.Batch("node[%d:%s] not mn", pnode->GetId(), - pnode->GetAddrName()); + pnode->m_addr_name); } else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) { ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | "; } diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 548602e622..3667e593bc 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -1092,14 +1092,13 @@ bool CSigSharesManager::SendMessages() return session->sendSessionId; }; - std::vector vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly); - + const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly}; { LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigShareBatchesToSend); CollectSigSharesToAnnounce(sigSharesToAnnounce); - CollectSigSharesToSendConcentrated(sigSharesToSend, vNodesCopy); + CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes()); for (auto& [nodeId, sigShareMap] : sigSharesToRequest) { for (auto& [hash, sigShareInv] : sigShareMap) { @@ -1120,7 +1119,7 @@ bool CSigSharesManager::SendMessages() bool didSend = false; - for (auto& pnode : vNodesCopy) { + for (auto& pnode : snap.Nodes()) { CNetMsgMaker msgMaker(pnode->GetCommonVersion()); if (const auto it1 = sigSessionAnnouncements.find(pnode->GetId()); it1 != sigSessionAnnouncements.end()) { @@ -1222,9 +1221,6 @@ bool CSigSharesManager::SendMessages() } } - // looped through all nodes, release them - connman.ReleaseNodeVector(vNodesCopy); - return didSend; } diff --git a/src/masternode/sync.cpp b/src/masternode/sync.cpp index f690693b45..f067fcb800 100644 --- a/src/masternode/sync.cpp +++ b/src/masternode/sync.cpp @@ -140,12 +140,11 @@ void CMasternodeSync::ProcessTick() } nTimeLastProcess = GetTime(); - std::vector vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly); + const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly}; // gradually request the rest of the votes after sync finished if(IsSynced()) { - m_govman.RequestGovernanceObjectVotes(vNodesCopy, connman); - connman.ReleaseNodeVector(vNodesCopy); + m_govman.RequestGovernanceObjectVotes(snap.Nodes(), connman); return; } @@ -154,7 +153,7 @@ void CMasternodeSync::ProcessTick() LogPrint(BCLog::MNSYNC, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTriedPeerCount %d nSyncProgress %f\n", nTick, nCurrentAsset, nTriedPeerCount, nSyncProgress); uiInterface.NotifyAdditionalDataSyncProgressChanged(nSyncProgress); - for (auto& pnode : vNodesCopy) + for (auto& pnode : snap.Nodes()) { CNetMsgMaker msgMaker(pnode->GetCommonVersion()); @@ -189,7 +188,7 @@ void CMasternodeSync::ProcessTick() } if (nCurrentAsset == MASTERNODE_SYNC_BLOCKCHAIN) { - int64_t nTimeSyncTimeout = vNodesCopy.size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS; + int64_t nTimeSyncTimeout = snap.Nodes().size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS; if (fReachedBestHeader && (GetTime() - nTimeLastBumped > nTimeSyncTimeout)) { // At this point we know that: // a) there are peers (because we are looping on at least one of them); @@ -205,7 +204,7 @@ void CMasternodeSync::ProcessTick() if (gArgs.GetBoolArg("-syncmempool", DEFAULT_SYNC_MEMPOOL)) { // Now that the blockchain is synced request the mempool from the connected outbound nodes if possible - for (auto pNodeTmp : vNodesCopy) { + for (auto pNodeTmp : snap.Nodes()) { bool fRequestedEarlier = m_netfulfilledman.HasFulfilledRequest(pNodeTmp->addr, "mempool-sync"); if (pNodeTmp->nVersion >= 70216 && !pNodeTmp->IsInboundConn() && !fRequestedEarlier && !pNodeTmp->IsBlockRelayOnly()) { m_netfulfilledman.AddFulfilledRequest(pNodeTmp->addr, "mempool-sync"); @@ -222,7 +221,6 @@ void CMasternodeSync::ProcessTick() if(nCurrentAsset == MASTERNODE_SYNC_GOVERNANCE) { if (!m_govman.IsValid()) { SwitchToNextAsset(); - connman.ReleaseNodeVector(vNodesCopy); return; } LogPrint(BCLog::GOBJECT, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTimeLastBumped %lld GetTime() %lld diff %lld\n", nTick, nCurrentAsset, nTimeLastBumped, GetTime(), GetTime() - nTimeLastBumped); @@ -235,7 +233,6 @@ void CMasternodeSync::ProcessTick() // it's kind of ok to skip this for now, hopefully we'll catch up later? } SwitchToNextAsset(); - connman.ReleaseNodeVector(vNodesCopy); return; } @@ -259,12 +256,11 @@ void CMasternodeSync::ProcessTick() if (nCurrentAsset != MASTERNODE_SYNC_GOVERNANCE) { // looped through all nodes and not syncing governance yet/already, release them - connman.ReleaseNodeVector(vNodesCopy); return; } // request votes on per-obj basis from each node - for (const auto& pnode : vNodesCopy) { + for (const auto& pnode : snap.Nodes()) { if(!m_netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) { continue; // to early for this node } @@ -291,16 +287,12 @@ void CMasternodeSync::ProcessTick() // reset nTimeNoObjectsLeft to be able to use the same condition on resync nTimeNoObjectsLeft = 0; SwitchToNextAsset(); - connman.ReleaseNodeVector(vNodesCopy); return; } nLastTick = nTick; nLastVotes = m_govman.GetVoteCount(); } } - - // looped through all nodes, release them - connman.ReleaseNodeVector(vNodesCopy); } void CMasternodeSync::SendGovernanceSyncRequest(CNode* pnode) const diff --git a/src/net.cpp b/src/net.cpp index e4680255e2..90957d5f53 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -331,8 +331,8 @@ bool IsLocal(const CService& addr) CNode* CConnman::FindNode(const CNetAddr& ip, bool fExcludeDisconnecting) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (fExcludeDisconnecting && pnode->fDisconnect) { continue; } @@ -345,8 +345,8 @@ CNode* CConnman::FindNode(const CNetAddr& ip, bool fExcludeDisconnecting) CNode* CConnman::FindNode(const CSubNet& subNet, bool fExcludeDisconnecting) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (fExcludeDisconnecting && pnode->fDisconnect) { continue; } @@ -359,12 +359,12 @@ CNode* CConnman::FindNode(const CSubNet& subNet, bool fExcludeDisconnecting) CNode* CConnman::FindNode(const std::string& addrName, bool fExcludeDisconnecting) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (fExcludeDisconnecting && pnode->fDisconnect) { continue; } - if (pnode->GetAddrName() == addrName) { + if (pnode->m_addr_name == addrName) { return pnode; } } @@ -373,8 +373,8 @@ CNode* CConnman::FindNode(const std::string& addrName, bool fExcludeDisconnectin CNode* CConnman::FindNode(const CService& addr, bool fExcludeDisconnecting) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (fExcludeDisconnecting && pnode->fDisconnect) { continue; } @@ -392,8 +392,8 @@ bool CConnman::AlreadyConnectedToAddress(const CAddress& addr) bool CConnman::CheckIncomingNonce(uint64_t nonce) { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) return false; } @@ -457,14 +457,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return nullptr; } // It is possible that we already have a connection to the IP/port pszDest resolved to. - // In that case, drop the connection that was just created, and return the existing CNode instead. - // Also store the name we used to connect in that CNode, so that future FindNode() calls to that - // name catch this early. - LOCK(cs_vNodes); + // In that case, drop the connection that was just created. + LOCK(m_nodes_mutex); CNode* pnode = FindNode(static_cast(addrConnect)); - if (pnode) - { - pnode->MaybeSetAddrName(std::string(pszDest)); + if (pnode) { LogPrintf("Failed to open new connection, already connected\n"); return nullptr; } @@ -530,7 +526,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo if (!addr_bind.IsValid()) { addr_bind = GetBindAddress(sock->Get()); } - CNode* pnode = new CNode(id, nLocalServices, sock->Release(), addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, addr_bind, pszDest ? pszDest : "", conn_type); + CNode* pnode = new CNode(id, nLocalServices, sock->Release(), addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, addr_bind, pszDest ? pszDest : "", conn_type, /* inbound_onion */ false); pnode->AddRef(); statsClient.inc("peers.connect", 1.0f); @@ -542,7 +538,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo void CNode::CloseSocketDisconnect(CConnman* connman) { - AssertLockHeld(connman->cs_vNodes); + AssertLockHeld(connman->m_nodes_mutex); fDisconnect = true; LOCK2(connman->cs_mapSocketToNode, cs_hSocket); @@ -608,25 +604,16 @@ std::string ConnectionTypeAsString(ConnectionType conn_type) assert(false); } -std::string CNode::GetAddrName() const { - LOCK(cs_addrName); - return addrName; -} - -void CNode::MaybeSetAddrName(const std::string& addrNameIn) { - LOCK(cs_addrName); - if (addrName.empty()) { - addrName = addrNameIn; - } -} - -CService CNode::GetAddrLocal() const { - LOCK(cs_addrLocal); +CService CNode::GetAddrLocal() const +{ + AssertLockNotHeld(m_addr_local_mutex); + LOCK(m_addr_local_mutex); return addrLocal; } void CNode::SetAddrLocal(const CService& addrLocalIn) { - LOCK(cs_addrLocal); + AssertLockNotHeld(m_addr_local_mutex); + LOCK(m_addr_local_mutex); if (addrLocal.IsValid()) { error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToString(), addrLocalIn.ToString()); } else { @@ -660,10 +647,10 @@ void CNode::copyStats(CNodeStats &stats, const std::vector &m_asmap) X(nLastBlockTime); X(nTimeConnected); X(nTimeOffset); - stats.addrName = GetAddrName(); + X(m_addr_name); X(nVersion); { - LOCK(cs_SubVer); + LOCK(m_subver_mutex); X(cleanSubVer); } stats.fInbound = IsInboundConn(); @@ -1085,9 +1072,9 @@ bool CConnman::AttemptToEvictConnection() { std::vector vEvictionCandidates; { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); - for (const CNode* node : vNodes) { + for (const CNode* node : m_nodes) { if (node->HasPermission(NetPermissionFlags::NoBan)) continue; if (!node->IsInboundConn()) @@ -1127,8 +1114,8 @@ bool CConnman::AttemptToEvictConnection() if (!node_id_to_evict) { return false; } - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (pnode->GetId() == *node_id_to_evict) { LogPrint(BCLog::NET_NETCONN, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId()); pnode->fDisconnect = true; @@ -1185,8 +1172,8 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, } { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->IsInboundConn()) { nInbound++; if (!pnode->GetVerifiedProRegTxHash().IsNull()) { @@ -1286,8 +1273,8 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, } { - LOCK(cs_vNodes); - vNodes.push_back(pnode); + LOCK(m_nodes_mutex); + m_nodes.push_back(pnode); WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode)); RegisterEvents(pnode); WakeSelect(); @@ -1304,8 +1291,8 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ const int max_connections = conn_type == ConnectionType::OUTBOUND_FULL_RELAY ? m_max_outbound_full_relay : m_max_outbound_block_relay; // Count existing connections - int existing_connections = WITH_LOCK(cs_vNodes, - return std::count_if(vNodes.begin(), vNodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); + int existing_connections = WITH_LOCK(m_nodes_mutex, + return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); // Max connections of specified type already exist if (existing_connections >= max_connections) return false; @@ -1321,11 +1308,11 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ void CConnman::DisconnectNodes() { { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); if (!fNetworkActive) { // Disconnect any connected nodes - for (CNode* pnode : vNodes) { + for (CNode* pnode : m_nodes) { if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); pnode->fDisconnect = true; @@ -1334,7 +1321,7 @@ void CConnman::DisconnectNodes() } // Disconnect unused nodes - for (auto it = vNodes.begin(); it != vNodes.end(); ) + for (auto it = m_nodes.begin(); it != m_nodes.end(); ) { CNode* pnode = *it; if (pnode->fDisconnect) @@ -1377,8 +1364,8 @@ void CConnman::DisconnectNodes() pnode->GetId(), pnode->GetRefCount(), pnode->IsInboundConn(), pnode->m_masternode_connection, pnode->m_masternode_iqr_connection); } - // remove from vNodes - it = vNodes.erase(it); + // remove from m_nodes + it = m_nodes.erase(it); // release outbound grant (if any) pnode->grantOutbound.Release(); @@ -1388,7 +1375,7 @@ void CConnman::DisconnectNodes() // hold in disconnected pool until all refs are released pnode->Release(); - vNodesDisconnected.push_back(pnode); + m_nodes_disconnected.push_back(pnode); } else { ++it; } @@ -1396,8 +1383,8 @@ void CConnman::DisconnectNodes() } { // Delete disconnected nodes - std::list vNodesDisconnectedCopy = vNodesDisconnected; - for (auto it = vNodesDisconnected.begin(); it != vNodesDisconnected.end(); ) + std::list nodes_disconnected_copy = m_nodes_disconnected; + for (auto it = m_nodes_disconnected.begin(); it != m_nodes_disconnected.end(); ) { CNode* pnode = *it; // wait until threads are done using it @@ -1410,7 +1397,7 @@ void CConnman::DisconnectNodes() } } if (fDelete) { - it = vNodesDisconnected.erase(it); + it = m_nodes_disconnected.erase(it); DeleteNode(pnode); } } @@ -1423,22 +1410,22 @@ void CConnman::DisconnectNodes() void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync) { - size_t vNodesSize; + size_t nodes_size; { - LOCK(cs_vNodes); - vNodesSize = vNodes.size(); + LOCK(m_nodes_mutex); + nodes_size = m_nodes.size(); } // If we had zero connections before and new connections now or if we just dropped // to zero connections reset the sync process if its outdated. - if ((vNodesSize > 0 && nPrevNodeCount == 0) || (vNodesSize == 0 && nPrevNodeCount > 0)) { + if ((nodes_size > 0 && nPrevNodeCount == 0) || (nodes_size == 0 && nPrevNodeCount > 0)) { mn_sync.Reset(); } - if(vNodesSize != nPrevNodeCount) { - nPrevNodeCount = vNodesSize; + if(nodes_size != nPrevNodeCount) { + nPrevNodeCount = nodes_size; if(clientInterface) - clientInterface->NotifyNumConnectionsChanged(vNodesSize); + clientInterface->NotifyNumConnectionsChanged(nodes_size); CalculateNumConnectionsChangedStats(); } @@ -1466,8 +1453,8 @@ void CConnman::CalculateNumConnectionsChangedStats() } mapRecvBytesMsgStats[NET_MESSAGE_COMMAND_OTHER] = 0; mapSentBytesMsgStats[NET_MESSAGE_COMMAND_OTHER] = 0; - auto vNodesCopy = CopyNodeVector(CConnman::FullyConnectedOnly); - for (auto pnode : vNodesCopy) { + const NodesSnapshot snap{*this, /* filter = */ CConnman::FullyConnectedOnly}; + for (auto pnode : snap.Nodes()) { { LOCK(pnode->cs_vRecv); for (const mapMsgCmdSize::value_type &i : pnode->mapRecvBytesPerMsgCmd) @@ -1496,7 +1483,6 @@ void CConnman::CalculateNumConnectionsChangedStats() if (last_ping_time > 0) statsClient.timing("peers.ping_us", last_ping_time, 1.0f); } - ReleaseNodeVector(vNodesCopy); for (const std::string &msg : getAllNetMessageTypes()) { statsClient.gauge("bandwidth.message." + msg + ".totalBytesReceived", mapRecvBytesMsgStats[msg], 1.0f); statsClient.gauge("bandwidth.message." + msg + ".totalBytesSent", mapSentBytesMsgStats[msg], 1.0f); @@ -1549,30 +1535,30 @@ bool CConnman::InactivityCheck(const CNode& node) const return false; } -bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &send_set, std::set &error_set) +bool CConnman::GenerateSelectSet(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set) { for (const ListenSocket& hListenSocket : vhListenSocket) { recv_set.insert(hListenSocket.socket); } + for (CNode* pnode : nodes) { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) - { - bool select_recv = !pnode->fHasRecvData; - bool select_send = !pnode->fCanSendData; + bool select_recv = !pnode->fHasRecvData; + bool select_send = !pnode->fCanSendData; - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + continue; - error_set.insert(pnode->hSocket); - if (select_send) { - send_set.insert(pnode->hSocket); - } - if (select_recv) { - recv_set.insert(pnode->hSocket); - } + error_set.insert(pnode->hSocket); + if (select_send) { + send_set.insert(pnode->hSocket); + } + if (select_recv) { + recv_set.insert(pnode->hSocket); } } @@ -1589,14 +1575,17 @@ bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &s } #ifdef USE_KQUEUE -void CConnman::SocketEventsKqueue(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) +void CConnman::SocketEventsKqueue(std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll) { const size_t maxEvents = 64; struct kevent events[maxEvents]; struct timespec timeout; - timeout.tv_sec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; - timeout.tv_nsec = (fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; + timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; + timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; wakeupSelectNeeded = true; int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout); @@ -1624,13 +1613,16 @@ void CConnman::SocketEventsKqueue(std::set &recv_set, std::set & #endif #ifdef USE_EPOLL -void CConnman::SocketEventsEpoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) +void CConnman::SocketEventsEpoll(std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll) { const size_t maxEvents = 64; epoll_event events[maxEvents]; wakeupSelectNeeded = true; - int n = epoll_wait(epollfd, events, maxEvents, fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); wakeupSelectNeeded = false; for (int i = 0; i < n; i++) { auto& e = events[i]; @@ -1651,11 +1643,15 @@ void CConnman::SocketEventsEpoll(std::set &recv_set, std::set &s #endif #ifdef USE_POLL -void CConnman::SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) +void CConnman::SocketEventsPoll(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll) { std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { - if (!fOnlyPoll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { + if (!only_poll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); return; } @@ -1683,7 +1679,7 @@ void CConnman::SocketEventsPoll(std::set &recv_set, std::set &se } wakeupSelectNeeded = true; - int r = poll(vpollfds.data(), vpollfds.size(), fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); wakeupSelectNeeded = false; if (r < 0) { return; @@ -1699,10 +1695,14 @@ void CConnman::SocketEventsPoll(std::set &recv_set, std::set &se } #endif -void CConnman::SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) +void CConnman::SocketEventsSelect(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll) { std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { + if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); return; } @@ -1712,7 +1712,7 @@ void CConnman::SocketEventsSelect(std::set &recv_set, std::set & // struct timeval timeout; timeout.tv_sec = 0; - timeout.tv_usec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend + timeout.tv_usec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend fd_set fdsetRecv; fd_set fdsetSend; @@ -1774,26 +1774,30 @@ void CConnman::SocketEventsSelect(std::set &recv_set, std::set & } } -void CConnman::SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) +void CConnman::SocketEvents(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll) { switch (socketEventsMode) { #ifdef USE_KQUEUE case SOCKETEVENTS_KQUEUE: - SocketEventsKqueue(recv_set, send_set, error_set, fOnlyPoll); + SocketEventsKqueue(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_EPOLL case SOCKETEVENTS_EPOLL: - SocketEventsEpoll(recv_set, send_set, error_set, fOnlyPoll); + SocketEventsEpoll(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_POLL case SOCKETEVENTS_POLL: - SocketEventsPoll(recv_set, send_set, error_set, fOnlyPoll); + SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); break; #endif case SOCKETEVENTS_SELECT: - SocketEventsSelect(recv_set, send_set, error_set, fOnlyPoll); + SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); break; default: assert(false); @@ -1802,15 +1806,22 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s void CConnman::SocketHandler(CMasternodeSync& mn_sync) { - bool fOnlyPoll = [this]() { - // check if we have work to do and thus should avoid waiting for events - LOCK2(cs_vNodes, cs_sendable_receivable_nodes); + AssertLockNotHeld(m_total_bytes_sent_mutex); + + std::set recv_set; + std::set send_set; + std::set error_set; + + bool only_poll = [this]() { + // Check if we have work to do and thus should avoid waiting for events + LOCK2(m_nodes_mutex, cs_sendable_receivable_nodes); if (!mapReceivableNodes.empty()) { return true; } else if (!mapSendableNodes.empty()) { if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) { - // we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single - // node would be able to make the network thread busy with polling + // We must check if at least one of the nodes with pending messages is also + // sendable, as otherwise a single node would be able to make the network + // thread busy with polling. for (auto& p : mapNodesWithDataToSend) { if (mapSendableNodes.count(p.first)) { return true; @@ -1822,8 +1833,14 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) return false; }(); - std::set recv_set, send_set, error_set; - SocketEvents(recv_set, send_set, error_set, fOnlyPoll); + { + const NodesSnapshot snap{*this, /* filter = */ CConnman::AllNodes, /* shuffle = */ false}; + + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); #ifdef USE_WAKEUP_PIPE // drain the wakeup pipe @@ -1838,19 +1855,22 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) } #endif - if (interruptNet) return; - - // - // Accept new connections - // - for (const ListenSocket& hListenSocket : vhListenSocket) - { - if (recv_set.count(hListenSocket.socket) > 0) - { - AcceptConnection(hListenSocket, mn_sync); - } + // Service (send/receive) each of the already connected nodes. + SocketHandlerConnected(recv_set, send_set, error_set); } + // Accept new connections from listening sockets. + SocketHandlerListening(recv_set, mn_sync); +} + +void CConnman::SocketHandlerConnected(const std::set& recv_set, + const std::set& send_set, + const std::set& error_set) +{ + AssertLockNotHeld(m_total_bytes_sent_mutex); + + if (interruptNet) return; + std::vector vErrorNodes; std::vector vReceivableNodes; std::vector vSendableNodes; @@ -1970,9 +1990,15 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) if (bytes_sent) RecordBytesSent(bytes_sent); } - ReleaseNodeVector(vErrorNodes); - ReleaseNodeVector(vReceivableNodes); - ReleaseNodeVector(vSendableNodes); + for (auto& node : vErrorNodes) { + node->Release(); + } + for (auto& node : vReceivableNodes) { + node->Release(); + } + for (auto& node : vSendableNodes) { + node->Release(); + } if (interruptNet) { return; @@ -1993,6 +2019,18 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) } } +void CConnman::SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) +{ + for (const ListenSocket& listen_socket : vhListenSocket) { + if (interruptNet) { + return; + } + if (recv_set.count(listen_socket.socket) > 0) { + AcceptConnection(listen_socket, mn_sync); + } + } +} + size_t CConnman::SocketRecvData(CNode *pnode) { // typical socket buffer is 8K-64K @@ -2011,7 +2049,7 @@ size_t CConnman::SocketRecvData(CNode *pnode) { bool notify = false; if (!pnode->ReceiveMsgBytes(Span(pchBuf, nBytes), notify)) { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); pnode->CloseSocketDisconnect(this); } RecordBytesRecv(nBytes); @@ -2038,7 +2076,7 @@ size_t CConnman::SocketRecvData(CNode *pnode) if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed for peer=%d\n", pnode->GetId()); } - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); pnode->fOtherSideDisconnected = true; // avoid lingering pnode->CloseSocketDisconnect(this); } @@ -2051,7 +2089,7 @@ size_t CConnman::SocketRecvData(CNode *pnode) if (!pnode->fDisconnect){ LogPrint(BCLog::NET, "socket recv error for peer=%d: %s\n", pnode->GetId(), NetworkErrorString(nErr)); } - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); pnode->fOtherSideDisconnected = true; // avoid lingering pnode->CloseSocketDisconnect(this); } @@ -2064,6 +2102,8 @@ size_t CConnman::SocketRecvData(CNode *pnode) void CConnman::ThreadSocketHandler(CMasternodeSync& mn_sync) { + AssertLockNotHeld(m_total_bytes_sent_mutex); + int64_t nLastCleanupNodes = 0; while (!interruptNet) @@ -2156,8 +2196,8 @@ void CConnman::ThreadDNSAddressSeed() int nRelevant = 0; { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->IsFullOutboundConn() && !pnode->m_masternode_probe_connection) ++nRelevant; } } @@ -2266,8 +2306,8 @@ int CConnman::GetExtraFullOutboundCount() const { int full_outbound_peers = 0; { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { // don't count outbound masternodes if (pnode->m_masternode_connection) { continue; @@ -2284,8 +2324,8 @@ int CConnman::GetExtraBlockRelayCount() const { int block_relay_peers = 0; { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) { ++block_relay_peers; } @@ -2356,8 +2396,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe // Checking !dnsseed is cheaper before locking 2 mutexes. if (!add_fixed_seeds_now && !dnsseed) { - LOCK2(m_addr_fetches_mutex, cs_vAddedNodes); - if (m_addr_fetches.empty() && vAddedNodes.empty()) { + LOCK2(m_addr_fetches_mutex, m_added_nodes_mutex); + if (m_addr_fetches.empty() && m_added_nodes.empty()) { add_fixed_seeds_now = true; LogPrintf("Adding fixed seeds as -dnsseed=0, -addnode is not provided and all -seednode(s) attempted\n"); } @@ -2382,8 +2422,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe int nOutboundBlockRelay = 0; std::set > setConnected; if (!Params().AllowMultipleAddressesFromGroup()) { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->IsFullOutboundConn() && !pnode->m_masternode_connection) nOutboundFullRelay++; if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++; @@ -2407,8 +2447,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe std::set setConnectedMasternodes; { - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash(); if (!verifiedProRegTxHash.IsNull()) { setConnectedMasternodes.emplace(verifiedProRegTxHash); @@ -2598,8 +2638,8 @@ void CConnman::ThreadOpenConnections(const std::vector connect, CDe std::vector CConnman::GetCurrentBlockRelayOnlyConns() const { std::vector ret; - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->IsBlockRelayOnly()) { ret.push_back(pnode->addr); } @@ -2614,9 +2654,9 @@ std::vector CConnman::GetAddedNodeInfo() const std::list lAddresses(0); { - LOCK(cs_vAddedNodes); - ret.reserve(vAddedNodes.size()); - std::copy(vAddedNodes.cbegin(), vAddedNodes.cend(), std::back_inserter(lAddresses)); + LOCK(m_added_nodes_mutex); + ret.reserve(m_added_nodes.size()); + std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses)); } @@ -2624,12 +2664,12 @@ std::vector CConnman::GetAddedNodeInfo() const std::map mapConnected; std::map> mapConnectedByName; { - LOCK(cs_vNodes); - for (const CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (const CNode* pnode : m_nodes) { if (pnode->addr.IsValid()) { mapConnected[pnode->addr] = pnode->IsInboundConn(); } - std::string addrName = pnode->GetAddrName(); + std::string addrName{pnode->m_addr_name}; if (!addrName.empty()) { mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->IsInboundConn(), static_cast(pnode->addr)); } @@ -2805,7 +2845,7 @@ void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, auto getConnectToDmn = [&]() -> CDeterministicMNCPtr { // don't hold lock while calling OpenMasternodeConnection as cs_main is locked deep inside - LOCK2(cs_vNodes, cs_vPendingMasternodes); + LOCK2(m_nodes_mutex, cs_vPendingMasternodes); if (!vPendingMasternodes.empty()) { auto dmn = mnList.GetValidMN(vPendingMasternodes.front()); @@ -2938,15 +2978,14 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai m_msgproc->InitializeNode(pnode); { - LOCK(cs_vNodes); - vNodes.push_back(pnode); + LOCK(m_nodes_mutex); + m_nodes.push_back(pnode); RegisterEvents(pnode); WakeSelect(); } } void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeProbeConn probe) { - OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe); } @@ -2957,8 +2996,6 @@ void CConnman::ThreadMessageHandler() FastRandomContext rng; while (!flagInterruptMsgProc) { - std::vector vNodesCopy = CopyNodeVector(); - bool fMoreWork = false; bool fSkipSendMessagesForMasternodes = true; @@ -2966,13 +3003,13 @@ void CConnman::ThreadMessageHandler() fSkipSendMessagesForMasternodes = false; nLastSendMessagesTimeMasternodes = GetTimeMillis(); } + // Randomize the order in which we process messages from/to our peers. // This prevents attacks in which an attacker exploits having multiple - // consecutive connections in the vNodes list. - Shuffle(vNodesCopy.begin(), vNodesCopy.end(), rng); + // consecutive connections in the m_nodes list. + const NodesSnapshot snap{*this, /* filter = */ CConnman::AllNodes, /* shuffle = */ true}; - for (CNode* pnode : vNodesCopy) - { + for (CNode* pnode : snap.Nodes()) { if (pnode->fDisconnect) continue; @@ -2991,8 +3028,6 @@ void CConnman::ThreadMessageHandler() return; } - ReleaseNodeVector(vNodesCopy); - WAIT_LOCK(mutexMsgProc, lock); if (!fMoreWork) { condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this]() EXCLUSIVE_LOCKS_REQUIRED(mutexMsgProc) { return fMsgProcWake; }); @@ -3263,6 +3298,7 @@ bool CConnman::InitBinds( bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, CScheduler& scheduler, const Options& connOptions) { + AssertLockNotHeld(m_total_bytes_sent_mutex); Init(connOptions); #ifdef USE_KQUEUE @@ -3522,10 +3558,10 @@ void CConnman::StopNodes() } { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); // Close sockets - for (CNode *pnode : vNodes) + for (CNode *pnode : m_nodes) pnode->CloseSocketDisconnect(this); } for (ListenSocket& hListenSocket : vhListenSocket) @@ -3548,11 +3584,11 @@ void CConnman::StopNodes() // clean up some globals (to help leak detection) std::vector nodes; - WITH_LOCK(cs_vNodes, nodes.swap(vNodes)); + WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes)); for (CNode* pnode : nodes) { DeleteNode(pnode); } - for (CNode* pnode : vNodesDisconnected) { + for (CNode* pnode : m_nodes_disconnected) { DeleteNode(pnode); } WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear()); @@ -3564,7 +3600,7 @@ void CConnman::StopNodes() LOCK(cs_mapNodesWithDataToSend); mapNodesWithDataToSend.clear(); } - vNodesDisconnected.clear(); + m_nodes_disconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); @@ -3667,21 +3703,21 @@ std::vector CConnman::GetAddresses(CNode& requestor, size_t max_addres bool CConnman::AddNode(const std::string& strNode) { - LOCK(cs_vAddedNodes); - for (const std::string& it : vAddedNodes) { + LOCK(m_added_nodes_mutex); + for (const std::string& it : m_added_nodes) { if (strNode == it) return false; } - vAddedNodes.push_back(strNode); + m_added_nodes.push_back(strNode); return true; } bool CConnman::RemoveAddedNode(const std::string& strNode) { - LOCK(cs_vAddedNodes); - for(std::vector::iterator it = vAddedNodes.begin(); it != vAddedNodes.end(); ++it) { + LOCK(m_added_nodes_mutex); + for(std::vector::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) { if (strNode == *it) { - vAddedNodes.erase(it); + m_added_nodes.erase(it); return true; } } @@ -3754,7 +3790,7 @@ std::set CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) std::set CConnman::GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const { - LOCK2(cs_vNodes, cs_vPendingMasternodes); + LOCK2(m_nodes_mutex, cs_vPendingMasternodes); auto it = masternodeQuorumNodes.find(std::make_pair(llmqType, quorumHash)); if (it == masternodeQuorumNodes.end()) { return {}; @@ -3762,7 +3798,7 @@ std::set CConnman::GetMasternodeQuorumNodes(Consensus::LLMQType llmqType const auto& proRegTxHashes = it->second; std::set nodes; - for (const auto pnode : vNodes) { + for (const auto pnode : m_nodes) { if (pnode->fDisconnect) { continue; } @@ -3833,10 +3869,10 @@ void CConnman::AddPendingProbeConnections(const std::set &proTxHashes) size_t CConnman::GetNodeCount(ConnectionDirection flags) const { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); int nNum = 0; - for (const auto& pnode : vNodes) { + for (const auto& pnode : m_nodes) { if (pnode->fDisconnect) { continue; } @@ -3861,9 +3897,9 @@ size_t CConnman::GetMaxOutboundNodeCount() void CConnman::GetNodeStats(std::vector& vstats) const { vstats.clear(); - LOCK(cs_vNodes); - vstats.reserve(vNodes.size()); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + vstats.reserve(m_nodes.size()); + for (CNode* pnode : m_nodes) { if (pnode->fDisconnect) { continue; } @@ -3874,7 +3910,7 @@ void CConnman::GetNodeStats(std::vector& vstats) const bool CConnman::DisconnectNode(const std::string& strNode) { - LOCK(cs_vNodes); + LOCK(m_nodes_mutex); if (CNode* pnode = FindNode(strNode)) { LogPrint(BCLog::NET_NETCONN, "disconnect by address%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->GetId()); pnode->fDisconnect = true; @@ -3886,8 +3922,8 @@ bool CConnman::DisconnectNode(const std::string& strNode) bool CConnman::DisconnectNode(const CSubNet& subnet) { bool disconnected = false; - LOCK(cs_vNodes); - for (CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* pnode : m_nodes) { if (subnet.Match(pnode->addr)) { LogPrint(BCLog::NET_NETCONN, "disconnect by subnet%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->GetId()); pnode->fDisconnect = true; @@ -3904,8 +3940,8 @@ bool CConnman::DisconnectNode(const CNetAddr& addr) bool CConnman::DisconnectNode(NodeId id) { - LOCK(cs_vNodes); - for(CNode* pnode : vNodes) { + LOCK(m_nodes_mutex); + for(CNode* pnode : m_nodes) { if (id == pnode->GetId()) { LogPrint(BCLog::NET_NETCONN, "disconnect by id peer=%d; disconnecting\n", pnode->GetId()); pnode->fDisconnect = true; @@ -3917,7 +3953,6 @@ bool CConnman::DisconnectNode(NodeId id) void CConnman::RecordBytesRecv(uint64_t bytes) { - LOCK(cs_totalBytesRecv); nTotalBytesRecv += bytes; statsClient.count("bandwidth.bytesReceived", bytes, 0.1f); statsClient.gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f); @@ -3925,7 +3960,9 @@ void CConnman::RecordBytesRecv(uint64_t bytes) void CConnman::RecordBytesSent(uint64_t bytes) { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); + nTotalBytesSent += bytes; statsClient.count("bandwidth.bytesSent", bytes, 0.01f); statsClient.gauge("bandwidth.totalBytesSent", nTotalBytesSent, 0.01f); @@ -3943,7 +3980,8 @@ void CConnman::RecordBytesSent(uint64_t bytes) uint64_t CConnman::GetMaxOutboundTarget() const { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); return nMaxOutboundLimit; } @@ -3954,7 +3992,15 @@ std::chrono::seconds CConnman::GetMaxOutboundTimeframe() const std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle() const { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); + return GetMaxOutboundTimeLeftInCycle_(); +} + +std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle_() const +{ + AssertLockHeld(m_total_bytes_sent_mutex); + if (nMaxOutboundLimit == 0) return 0s; @@ -3968,14 +4014,15 @@ std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle() const bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit) const { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); if (nMaxOutboundLimit == 0) return false; if (historicalBlockServingLimit) { // keep a large enough buffer to at least relay each block once - const std::chrono::seconds timeLeftInCycle = GetMaxOutboundTimeLeftInCycle(); + const std::chrono::seconds timeLeftInCycle = GetMaxOutboundTimeLeftInCycle_(); const uint64_t buffer = timeLeftInCycle / std::chrono::minutes{10} * MaxBlockSize(fDIP0001ActiveAtTip); if (buffer >= nMaxOutboundLimit || nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit - buffer) return true; @@ -3988,7 +4035,8 @@ bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit) const uint64_t CConnman::GetOutboundTargetBytesLeft() const { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); if (nMaxOutboundLimit == 0) return 0; @@ -3997,13 +4045,13 @@ uint64_t CConnman::GetOutboundTargetBytesLeft() const uint64_t CConnman::GetTotalBytesRecv() const { - LOCK(cs_totalBytesRecv); return nTotalBytesRecv; } uint64_t CConnman::GetTotalBytesSent() const { - LOCK(cs_totalBytesSent); + AssertLockNotHeld(m_total_bytes_sent_mutex); + LOCK(m_total_bytes_sent_mutex); return nTotalBytesSent; } @@ -4016,25 +4064,25 @@ unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress& addrBindIn, const std::string& addrNameIn, ConnectionType conn_type_in, bool inbound_onion) : nTimeConnected(GetTimeSeconds()), - addr(addrIn), - addrBind(addrBindIn), - nKeyedNetGroup(nKeyedNetGroupIn), - id(idIn), - nLocalHostNonce(nLocalHostNonceIn), - m_conn_type(conn_type_in), - nLocalServices(nLocalServicesIn), - m_inbound_onion(inbound_onion) + addr(addrIn), + addrBind(addrBindIn), + m_addr_name{addrNameIn.empty() ? addr.ToStringIPPort() : addrNameIn}, + m_inbound_onion(inbound_onion), + nKeyedNetGroup(nKeyedNetGroupIn), + id(idIn), + nLocalHostNonce(nLocalHostNonceIn), + m_conn_type(conn_type_in), + nLocalServices(nLocalServicesIn) { if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); hSocket = hSocketIn; - addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; for (const std::string &msg : getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; mapRecvBytesPerMsgCmd[NET_MESSAGE_COMMAND_OTHER] = 0; if (fLogIPs) { - LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", addrName, id); + LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", m_addr_name, id); } else { LogPrint(BCLog::NET, "Added connection peer=%d\n", id); } @@ -4055,6 +4103,7 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { + AssertLockNotHeld(m_total_bytes_sent_mutex); size_t nMessageSize = msg.data.size(); LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.m_type), nMessageSize, pnode->GetId()); if (gArgs.GetBoolArg("-capturemessages", false)) { @@ -4084,9 +4133,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { LOCK(cs_mapNodesWithDataToSend); - // we're not holding cs_vNodes here, so there is a chance of this node being disconnected shortly before + // we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before // we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we - // might end up having an entry in mapNodesWithDataToSend that is not in vNodes anymore. We need to + // might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to // Add/Release refs when adding/erasing mapNodesWithDataToSend. if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) { pnode->AddRef(); @@ -4102,8 +4151,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) bool CConnman::ForNode(const CService& addr, std::function cond, std::function func) { CNode* found = nullptr; - LOCK(cs_vNodes); - for (auto&& pnode : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& pnode : m_nodes) { if((CService)pnode->addr == addr) { found = pnode; break; @@ -4115,8 +4164,8 @@ bool CConnman::ForNode(const CService& addr, std::function cond, std::function func) { CNode* found = nullptr; - LOCK(cs_vNodes); - for (auto&& pnode : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& pnode : m_nodes) { if(pnode->GetId() == id) { found = pnode; break; @@ -4148,26 +4197,28 @@ std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::ch return now + std::chrono::duration_cast(unscaled * average_interval + 0.5us); } -std::vector CConnman::CopyNodeVector(std::function cond) +CConnman::NodesSnapshot::NodesSnapshot(const CConnman& connman, std::function filter, + bool shuffle) { - std::vector vecNodesCopy; - LOCK(cs_vNodes); - vecNodesCopy.reserve(vNodes.size()); - for(size_t i = 0; i < vNodes.size(); ++i) { - CNode* pnode = vNodes[i]; - if (!cond(pnode)) + LOCK(connman.m_nodes_mutex); + m_nodes_copy.reserve(connman.m_nodes.size()); + + for (auto& node : connman.m_nodes) { + if (!filter(node)) continue; - pnode->AddRef(); - vecNodesCopy.push_back(pnode); + node->AddRef(); + m_nodes_copy.push_back(node); + } + + if (shuffle) { + Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{}); } - return vecNodesCopy; } -void CConnman::ReleaseNodeVector(const std::vector& vecNodes) +CConnman::NodesSnapshot::~NodesSnapshot() { - for(size_t i = 0; i < vecNodes.size(); ++i) { - CNode* pnode = vecNodes[i]; - pnode->Release(); + for (auto& node : m_nodes_copy) { + node->Release(); } } diff --git a/src/net.h b/src/net.h index 5905760325..a8b6505099 100644 --- a/src/net.h +++ b/src/net.h @@ -281,7 +281,7 @@ public: int64_t nLastBlockTime; int64_t nTimeConnected; int64_t nTimeOffset; - std::string addrName; + std::string m_addr_name; int nVersion; std::string cleanSubVer; bool fInbound; @@ -471,14 +471,17 @@ public: const CAddress addr; // Bind address of our side of the connection const CAddress addrBind; + const std::string m_addr_name; + //! Whether this peer is an inbound onion, i.e. connected via our Tor onion service. + const bool m_inbound_onion; std::atomic nNumWarningsSkipped{0}; std::atomic nVersion{0}; + Mutex m_subver_mutex; /** * cleanSubVer is a sanitized string of the user agent byte array we read * from the wire. This cleaned string can safely be logged or displayed. */ - std::string cleanSubVer GUARDED_BY(cs_SubVer){}; - RecursiveMutex cs_SubVer; // used for both cleanSubVer and strSubVer + std::string cleanSubVer GUARDED_BY(m_subver_mutex){}; bool m_prefer_evict{false}; // This peer is preferred for eviction. bool HasPermission(NetPermissionFlags permission) const { return NetPermissions::HasFlag(m_permissionFlags, permission); @@ -621,7 +624,7 @@ public: bool IsBlockRelayOnly() const; - CNode(NodeId id, ServiceFlags nLocalServicesIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn, ConnectionType conn_type_in, bool inbound_onion = false); + CNode(NodeId id, ServiceFlags nLocalServicesIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn, ConnectionType conn_type_in, bool inbound_onion); ~CNode(); CNode(const CNode&) = delete; CNode& operator=(const CNode&) = delete; @@ -649,7 +652,7 @@ public: * @return True if the peer should stay connected, * False if the peer should be disconnected from. */ - bool ReceiveMsgBytes(Span msg_bytes, bool& complete); + bool ReceiveMsgBytes(Span msg_bytes, bool& complete) EXCLUSIVE_LOCKS_REQUIRED(!cs_vRecv); void SetCommonVersion(int greatest_common_version) { @@ -661,9 +664,9 @@ public: return m_greatest_common_version; } - CService GetAddrLocal() const; + CService GetAddrLocal() const EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex); //! May not be called more than once - void SetAddrLocal(const CService& addrLocalIn); + void SetAddrLocal(const CService& addrLocalIn) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex); CNode* AddRef() { @@ -676,20 +679,15 @@ public: nRefCount--; } - void CloseSocketDisconnect(CConnman* connman); + void CloseSocketDisconnect(CConnman* connman) EXCLUSIVE_LOCKS_REQUIRED(!cs_hSocket); - void copyStats(CNodeStats &stats, const std::vector &m_asmap); + void copyStats(CNodeStats &stats, const std::vector &m_asmap) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv); ServiceFlags GetLocalServices() const { return nLocalServices; } - std::string GetAddrName() const; - //! Sets the addrName only if it was not previously set - void MaybeSetAddrName(const std::string& addrNameIn); - - std::string ConnectionTypeAsString() const { return ::ConnectionTypeAsString(m_conn_type); } /** A ping-pong round trip has completed successfully. Update latest and minimum ping times. */ @@ -698,8 +696,6 @@ public: m_min_ping_time = std::min(m_min_ping_time.load(), ping_time); } - /** Whether this peer is an inbound onion, e.g. connected via our Tor onion service. */ - bool IsInboundOnion() const { return m_inbound_onion; } std::string GetLogString() const; bool CanRelay() const { return !m_masternode_connection || m_masternode_iqr_connection; } @@ -769,15 +765,9 @@ private: std::list vRecvMsg; // Used only by SocketHandler thread - mutable RecursiveMutex cs_addrName; - std::string addrName GUARDED_BY(cs_addrName); - // Our address, as reported by the peer - CService addrLocal GUARDED_BY(cs_addrLocal); - mutable RecursiveMutex cs_addrLocal; - - //! Whether this peer is an inbound onion, e.g. connected via our Tor onion service. - const bool m_inbound_onion{false}; + CService addrLocal GUARDED_BY(m_addr_local_mutex); + mutable Mutex m_addr_local_mutex; // Challenge sent in VERSION to be answered with MNAUTH (only happens between MNs) mutable Mutex cs_mnauth; @@ -867,7 +857,10 @@ public: bool m_i2p_accept_incoming; }; - void Init(const Options& connOptions) { + void Init(const Options& connOptions) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_total_bytes_sent_mutex) + { + AssertLockNotHeld(m_total_bytes_sent_mutex); + nLocalServices = connOptions.nLocalServices; nMaxConnections = connOptions.nMaxConnections; m_max_outbound_full_relay = std::min(connOptions.m_max_outbound_full_relay, connOptions.nMaxConnections); @@ -883,13 +876,13 @@ public: nReceiveFloodSize = connOptions.nReceiveFloodSize; m_peer_connect_timeout = std::chrono::seconds{connOptions.m_peer_connect_timeout}; { - LOCK(cs_totalBytesSent); + LOCK(m_total_bytes_sent_mutex); nMaxOutboundLimit = connOptions.nMaxOutboundLimit; } vWhitelistedRange = connOptions.vWhitelistedRange; { - LOCK(cs_vAddedNodes); - vAddedNodes = connOptions.m_added_nodes; + LOCK(m_added_nodes_mutex); + m_added_nodes = connOptions.m_added_nodes; } socketEventsMode = connOptions.socketEventsMode; m_onion_binds = connOptions.onion_binds; @@ -898,7 +891,8 @@ public: CConnman(uint64_t seed0, uint64_t seed1, CAddrMan& addrman, bool network_active = true); ~CConnman(); bool Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, - CScheduler& scheduler, const Options& options); + CScheduler& scheduler, const Options& options) + EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !m_added_nodes_mutex, !m_addr_fetches_mutex, !mutexMsgProc); void StopThreads(); void StopNodes(); @@ -908,7 +902,7 @@ public: StopNodes(); }; - void Interrupt(); + void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); bool GetNetworkActive() const { return fNetworkActive; }; bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; }; void SetNetworkActive(bool active, CMasternodeSync* const mn_sync); @@ -924,8 +918,13 @@ public: IsConnection, }; - void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type, MasternodeConn masternode_connection = MasternodeConn::IsNotConnection, MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection); - void OpenMasternodeConnection(const CAddress& addrConnect, MasternodeProbeConn probe = MasternodeProbeConn::IsConnection); + void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, + const char* strDest, ConnectionType conn_type, + MasternodeConn masternode_connection = MasternodeConn::IsNotConnection, + MasternodeProbeConn masternode_probe_connection = MasternodeProbeConn::IsNotConnection) + EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void OpenMasternodeConnection(const CAddress& addrConnect, MasternodeProbeConn probe = MasternodeProbeConn::IsConnection) + EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); bool CheckIncomingNonce(uint64_t nonce); struct CFullyConnectedOnly { @@ -966,13 +965,14 @@ public: bool IsMasternodeOrDisconnectRequested(const CService& addr); - void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); + void PushMessage(CNode* pnode, CSerializedNetMsg&& msg) + EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc, !m_total_bytes_sent_mutex); template bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) { - LOCK(cs_vNodes); - for (auto&& node : vNodes) + LOCK(m_nodes_mutex); + for (auto&& node : m_nodes) if (cond(node)) if(!func(node)) return false; @@ -988,8 +988,8 @@ public: template bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) const { - LOCK(cs_vNodes); - for (const auto& node : vNodes) + LOCK(m_nodes_mutex); + for (const auto& node : m_nodes) if (cond(node)) if(!func(node)) return false; @@ -1005,8 +1005,8 @@ public: template void ForEachNode(const Condition& cond, Callable&& func) { - LOCK(cs_vNodes); - for (auto&& node : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& node : m_nodes) { if (cond(node)) func(node); } @@ -1021,8 +1021,8 @@ public: template void ForEachNode(const Condition& cond, Callable&& func) const { - LOCK(cs_vNodes); - for (auto&& node : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& node : m_nodes) { if (cond(node)) func(node); } @@ -1037,8 +1037,8 @@ public: template void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post) { - LOCK(cs_vNodes); - for (auto&& node : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& node : m_nodes) { if (cond(node)) pre(node); } @@ -1054,8 +1054,8 @@ public: template void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post) const { - LOCK(cs_vNodes); - for (auto&& node : vNodes) { + LOCK(m_nodes_mutex); + for (auto&& node : m_nodes) { if (cond(node)) pre(node); } @@ -1068,9 +1068,6 @@ public: ForEachNodeThen(FullyConnectedOnly, pre, post); } - std::vector CopyNodeVector(std::function cond = AllNodes); - void ReleaseNodeVector(const std::vector& vecNodes); - // Addrman functions /** * Return all or many randomly selected addresses, optionally by network. @@ -1109,9 +1106,9 @@ public: // Count the number of block-relay-only peers we have over our limit. int GetExtraBlockRelayCount() const; - bool AddNode(const std::string& node); - bool RemoveAddedNode(const std::string& node); - std::vector GetAddedNodeInfo() const; + bool AddNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); + bool RemoveAddedNode(const std::string& node) EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); + std::vector GetAddedNodeInfo() const EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); /** * Attempts to open a connection. Currently only used from tests. @@ -1124,7 +1121,7 @@ public: * - Max total outbound connection capacity filled * - Max connection capacity for type is filled */ - bool AddConnection(const std::string& address, ConnectionType conn_type); + bool AddConnection(const std::string& address, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); bool AddPendingMasternode(const uint256& proTxHash); void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set& proTxHashes); @@ -1154,32 +1151,30 @@ public: //! that peer during `net_processing.cpp:PushNodeVersion()`. ServiceFlags GetLocalServices() const; - uint64_t GetMaxOutboundTarget() const; + uint64_t GetMaxOutboundTarget() const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); std::chrono::seconds GetMaxOutboundTimeframe() const; //! check if the outbound target is reached //! if param historicalBlockServingLimit is set true, the function will //! response true if the limit for serving historical blocks has been reached - bool OutboundTargetReached(bool historicalBlockServingLimit) const; + bool OutboundTargetReached(bool historicalBlockServingLimit) const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); //! response the bytes left in the current max outbound cycle //! in case of no limit, it will always response 0 - uint64_t GetOutboundTargetBytesLeft() const; + uint64_t GetOutboundTargetBytesLeft() const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); - //! returns the time left in the current max outbound cycle - //! in case of no limit, it will always return 0 - std::chrono::seconds GetMaxOutboundTimeLeftInCycle() const; + std::chrono::seconds GetMaxOutboundTimeLeftInCycle() const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); uint64_t GetTotalBytesRecv() const; - uint64_t GetTotalBytesSent() const; + uint64_t GetTotalBytesSent() const EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); /** Get a unique deterministic randomizer. */ CSipHasher GetDeterministicRandomizer(uint64_t id) const; unsigned int GetReceiveFloodSize() const; - void WakeMessageHandler(); - void WakeSelect(); + void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** Attempts to obfuscate tx time through exponentially distributed emitting. Works assuming that a single interval is used. @@ -1192,6 +1187,26 @@ public: /** Return true if we should disconnect the peer for failing an inactivity check. */ bool ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const; + /** + * RAII helper to atomically create a copy of `m_nodes` and add a reference + * to each of the nodes. The nodes are released when this object is destroyed. + */ + class NodesSnapshot + { + public: + explicit NodesSnapshot(const CConnman& connman, std::function cond = AllNodes, + bool shuffle = false); + ~NodesSnapshot(); + + const std::vector& Nodes() const + { + return m_nodes_copy; + } + + private: + std::vector m_nodes_copy; + }; + private: struct ListenSocket { public: @@ -1202,6 +1217,10 @@ private: NetPermissionFlags m_permissions; }; + //! returns the time left in the current max outbound cycle + //! in case of no limit, it will always return 0 + std::chrono::seconds GetMaxOutboundTimeLeftInCycle_() const EXCLUSIVE_LOCKS_REQUIRED(m_total_bytes_sent_mutex); + bool BindListenPort(const CService& bindAddr, bilingual_str& strError, NetPermissionFlags permissions); bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions); bool InitBinds( @@ -1209,17 +1228,19 @@ private: const std::vector& whiteBinds, const std::vector& onion_binds); - void ThreadOpenAddedConnections(); - void AddAddrFetch(const std::string& strDest); - void ProcessAddrFetch(); - void ThreadOpenConnections(const std::vector connect, CDeterministicMNManager& dmnman); - void ThreadMessageHandler(); - void ThreadI2PAcceptIncoming(CMasternodeSync& mn_sync); - void AcceptConnection(const ListenSocket& hListenSocket, CMasternodeSync& mn_sync); + void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !mutexMsgProc); + void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); + void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !mutexMsgProc); + void ThreadOpenConnections(const std::vector connect, CDeterministicMNManager& dmnman) + EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !mutexMsgProc); + void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void ThreadI2PAcceptIncoming(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void AcceptConnection(const ListenSocket& hListenSocket, CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** * Create a `CNode` object from a socket that has just been accepted and add the node to - * the `vNodes` member. + * the `m_nodes` member. * @param[in] hSocket Connected socket to communicate with the peer. * @param[in] permissionFlags The peer's permissions. * @param[in] addr_bind The address and port at our side of the connection. @@ -1229,30 +1250,95 @@ private: NetPermissionFlags permissionFlags, const CAddress& addr_bind, const CAddress& addr, - CMasternodeSync& mn_sync); + CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void DisconnectNodes(); void NotifyNumConnectionsChanged(CMasternodeSync& mn_sync); void CalculateNumConnectionsChangedStats(); /** Return true if the peer is inactive and should be disconnected. */ bool InactivityCheck(const CNode& node) const; - bool GenerateSelectSet(std::set &recv_set, std::set &send_set, std::set &error_set); + + /** + * Generate a collection of sockets to check for IO readiness. + * @param[in] nodes Select from these nodes' sockets. + * @param[out] recv_set Sockets to check for read readiness. + * @param[out] send_set Sockets to check for write readiness. + * @param[out] error_set Sockets to check for errors. + * @return true if at least one socket is to be checked (the returned set is not empty) + */ + bool GenerateSelectSet(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set); + + /** + * Check which sockets are ready for IO. + * @param[in] nodes Select from these nodes' sockets (in supported event methods). + * @param[in] only_poll Permit zero timeout polling + * @param[out] recv_set Sockets which are ready for read. + * @param[out] send_set Sockets which are ready for write. + * @param[out] error_set Sockets which have errors. + * This calls `GenerateSelectSet()` to gather a list of sockets to check. + */ + void SocketEvents(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll); + #ifdef USE_KQUEUE - void SocketEventsKqueue(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); + void SocketEventsKqueue(std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll); #endif #ifdef USE_EPOLL - void SocketEventsEpoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); + void SocketEventsEpoll(std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll); #endif #ifdef USE_POLL - void SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); + void SocketEventsPoll(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll); #endif - void SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); - void SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); - void SocketHandler(CMasternodeSync& mn_sync); - void ThreadSocketHandler(CMasternodeSync& mn_sync); - void ThreadDNSAddressSeed(); + void SocketEventsSelect(const std::vector& nodes, + std::set& recv_set, + std::set& send_set, + std::set& error_set, + bool only_poll); + + /** + * Check connected and listening sockets for IO readiness and process them accordingly. + */ + void SocketHandler(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); + + /** + * Do the read/write for connected sockets that are ready for IO. + * @param[in] recv_set Sockets that are ready for read. + * @param[in] send_set Sockets that are ready for send. + * @param[in] error_set Sockets that have an exceptional condition (error). + */ + void SocketHandlerConnected(const std::set& recv_set, + const std::set& send_set, + const std::set& error_set) + EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); + + /** + * Accept incoming connections, one from each read-ready listening socket. + * @param[in] recv_set Sockets that are ready for read. + */ + void SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + + void ThreadSocketHandler(CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); + void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex); void ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, - CMasternodeSync& mn_sync); + CMasternodeSync& mn_sync) + EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex, !mutexMsgProc); uint64_t CalculateKeyedNetGroup(const CAddress& ad) const; @@ -1276,12 +1362,12 @@ private: NodeId GetNewNodeId(); size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); - size_t SocketRecvData(CNode* pnode); + size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void DumpAddresses(); // Network stats void RecordBytesRecv(uint64_t bytes); - void RecordBytesSent(uint64_t bytes); + void RecordBytesSent(uint64_t bytes) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); /** * Return vector of current BLOCK_RELAY peers. @@ -1295,15 +1381,14 @@ private: void UnregisterEvents(CNode* pnode); // Network usage totals - mutable RecursiveMutex cs_totalBytesRecv; - mutable RecursiveMutex cs_totalBytesSent; - uint64_t nTotalBytesRecv GUARDED_BY(cs_totalBytesRecv) {0}; - uint64_t nTotalBytesSent GUARDED_BY(cs_totalBytesSent) {0}; + mutable Mutex m_total_bytes_sent_mutex; + std::atomic nTotalBytesRecv{0}; + uint64_t nTotalBytesSent GUARDED_BY(m_total_bytes_sent_mutex) {0}; // outbound limit & stats - uint64_t nMaxOutboundTotalBytesSentInCycle GUARDED_BY(cs_totalBytesSent) {0}; - std::chrono::seconds nMaxOutboundCycleStartTime GUARDED_BY(cs_totalBytesSent) {0}; - uint64_t nMaxOutboundLimit GUARDED_BY(cs_totalBytesSent); + uint64_t nMaxOutboundTotalBytesSentInCycle GUARDED_BY(m_total_bytes_sent_mutex) {0}; + std::chrono::seconds nMaxOutboundCycleStartTime GUARDED_BY(m_total_bytes_sent_mutex) {0}; + uint64_t nMaxOutboundLimit GUARDED_BY(m_total_bytes_sent_mutex); // P2P timeout in seconds std::chrono::seconds m_peer_connect_timeout; @@ -1320,21 +1405,23 @@ private: bool fAddressesInitialized{false}; CAddrMan& addrman; std::deque m_addr_fetches GUARDED_BY(m_addr_fetches_mutex); - RecursiveMutex m_addr_fetches_mutex; - std::vector vAddedNodes GUARDED_BY(cs_vAddedNodes); - mutable RecursiveMutex cs_vAddedNodes; + Mutex m_addr_fetches_mutex; + std::vector m_added_nodes GUARDED_BY(m_added_nodes_mutex); + mutable Mutex m_added_nodes_mutex; + std::vector m_nodes GUARDED_BY(m_nodes_mutex); + std::list m_nodes_disconnected; + mutable RecursiveMutex m_nodes_mutex; + std::atomic nLastNodeId{0}; + unsigned int nPrevNodeCount{0}; + std::vector vPendingMasternodes; mutable RecursiveMutex cs_vPendingMasternodes; std::map, std::set> masternodeQuorumNodes GUARDED_BY(cs_vPendingMasternodes); std::map, std::set> masternodeQuorumRelayMembers GUARDED_BY(cs_vPendingMasternodes); std::set masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes); - std::vector vNodes GUARDED_BY(cs_vNodes); - std::list vNodesDisconnected; + mutable Mutex cs_mapSocketToNode; std::unordered_map mapSocketToNode GUARDED_BY(cs_mapSocketToNode); - mutable RecursiveMutex cs_vNodes; - std::atomic nLastNodeId{0}; - unsigned int nPrevNodeCount{0}; /** * Cache responses to addr requests to minimize privacy leak. diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7f79ab4892..3a33be97c7 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -374,38 +374,45 @@ public: bool ignore_incoming_txs); /** Overridden from CValidationInterface. */ - void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) override; - void BlockDisconnected(const std::shared_ptr &block, const CBlockIndex* pindex) override; - void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; - void BlockChecked(const CBlock& block, const BlockValidationState& state) override; + void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); + void BlockDisconnected(const std::shared_ptr &block, const CBlockIndex* pindex) override + EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex); + void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void BlockChecked(const CBlock& block, const BlockValidationState& state) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr& pblock) override; /** Implement NetEventsInterface */ - void InitializeNode(CNode* pnode) override; - void FinalizeNode(const CNode& node) override; - bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override; - bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing); + void InitializeNode(CNode* pnode) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); + bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); /** Implement PeerManager */ void CheckForStaleTipAndEvictPeers() override; - bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override; + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); bool IgnoresIncomingTxs() override { return m_ignore_incoming_txs; } - void SendPings() override; - void PushInventory(NodeId nodeid, const CInv& inv) override; - void RelayInv(CInv &inv, const int minProtoVersion) override; - void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override; - void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override; - void RelayTransaction(const uint256& txid) override; + void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);; + void PushInventory(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayInv(CInv &inv, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestHeight(int height) override { m_best_height = height; }; - void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override; + void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, - const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override; - bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main); - bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override; + const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex); + bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); + bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); private: /** Helper to process result of external handlers of message */ - void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom); + void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ void ConsiderEviction(CNode& pto, int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -414,15 +421,15 @@ private: void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ - void ReattemptInitialBroadcast(CScheduler& scheduler); + void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Get a shared pointer to the Peer object. * May return an empty shared_ptr if the Peer object can't be found. */ - PeerRef GetPeerRef(NodeId id) const; + PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Get a shared pointer to the Peer object and remove it from m_peer_map. * May return an empty shared_ptr if the Peer object can't be found. */ - PeerRef RemovePeer(NodeId id); + PeerRef RemovePeer(NodeId id) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** * Potentially mark a node discouraged based on the contents of a BlockValidationState object @@ -435,7 +442,8 @@ private: * @return Returns true if the peer was punished (probably disconnected) */ bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, - bool via_compact_block, const std::string& message = ""); + bool via_compact_block, const std::string& message = "") + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** * Potentially ban a node based on the contents of a TxValidationState object @@ -444,7 +452,8 @@ private: * * Changes here may need to be reflected in TxRelayMayResultInDisconnect(). */ - bool MaybePunishNodeForTx(NodeId nodeid, const TxValidationState& state, const std::string& message = ""); + bool MaybePunishNodeForTx(NodeId nodeid, const TxValidationState& state, const std::string& message = "") + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Maybe disconnect a peer and discourage future connections from its address. * @@ -454,14 +463,16 @@ private: */ bool MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer); - void ProcessOrphanTx(std::set& orphan_work_set) - EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); + void ProcessOrphanTx(std::set& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Process a single headers message from a peer. */ void ProcessHeadersMessage(CNode& pfrom, const Peer& peer, const std::vector& headers, - bool via_compact_block); + bool via_compact_block) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req); + void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); /** Send a version message to a peer */ void PushNodeVersion(CNode& pnode, const Peer& peer); @@ -482,7 +493,7 @@ private: * @param[in] fReachable Whether the address' network is reachable. We relay unreachable * addresses less. */ - void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable); + void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); const CChainParams& m_chainparams; CConnman& m_connman; @@ -608,7 +619,8 @@ private: /** Number of outbound peers with m_chain_sync.m_protect. */ int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0; - bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool AlreadyHave(const CInv& inv) + EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); /** * Filter for transactions that were recently rejected by @@ -3357,7 +3369,7 @@ void PeerManagerImpl::ProcessMessage( pfrom.nServices = nServices; pfrom.SetAddrLocal(addrMe); { - LOCK(pfrom.cs_SubVer); + LOCK(pfrom.m_subver_mutex); pfrom.cleanSubVer = cleanSubVer; } peer->m_starting_height = starting_height; diff --git a/src/qt/clientmodel.h b/src/qt/clientmodel.h index f328ed7519..46068ca264 100644 --- a/src/qt/clientmodel.h +++ b/src/qt/clientmodel.h @@ -63,7 +63,7 @@ public: //! Return number of connections, default is in- and outbound (total) int getNumConnections(unsigned int flags = CONNECTIONS_ALL) const; int getNumBlocks() const; - uint256 getBestBlockHash(); + uint256 getBestBlockHash() EXCLUSIVE_LOCKS_REQUIRED(!m_cached_tip_mutex); int getHeaderTipHeight() const; int64_t getHeaderTipTime() const; diff --git a/src/qt/peertablemodel.cpp b/src/qt/peertablemodel.cpp index e1a4a1f46f..a85958769d 100644 --- a/src/qt/peertablemodel.cpp +++ b/src/qt/peertablemodel.cpp @@ -28,7 +28,7 @@ bool NodeLessThan::operator()(const CNodeCombinedStats &left, const CNodeCombine case PeerTableModel::NetNodeId: return pLeft->nodeid < pRight->nodeid; case PeerTableModel::Address: - return pLeft->addrName.compare(pRight->addrName) < 0; + return pLeft->m_addr_name.compare(pRight->m_addr_name) < 0; case PeerTableModel::Network: return pLeft->m_network < pRight->m_network; case PeerTableModel::Ping: @@ -163,7 +163,7 @@ QVariant PeerTableModel::data(const QModelIndex &index, int role) const return (qint64)rec->nodeStats.nodeid; case Address: // prepend to peer address down-arrow symbol for inbound connection and up-arrow for outbound connection - return QString(rec->nodeStats.fInbound ? "↓ " : "↑ ") + QString::fromStdString(rec->nodeStats.addrName); + return QString(rec->nodeStats.fInbound ? "↓ " : "↑ ") + QString::fromStdString(rec->nodeStats.m_addr_name); case Network: return GUIUtil::NetworkToQString(rec->nodeStats.m_network); case Ping: diff --git a/src/qt/rpcconsole.cpp b/src/qt/rpcconsole.cpp index 26d2acb1e0..627b59f3fe 100644 --- a/src/qt/rpcconsole.cpp +++ b/src/qt/rpcconsole.cpp @@ -1231,7 +1231,7 @@ void RPCConsole::updateDetailWidget() } const CNodeCombinedStats *stats = clientModel->getPeerTableModel()->getNodeStats(selected_rows.first().row()); // update the detail ui with latest node information - QString peerAddrDetails(QString::fromStdString(stats->nodeStats.addrName) + " "); + QString peerAddrDetails(QString::fromStdString(stats->nodeStats.m_addr_name) + " "); peerAddrDetails += tr("(peer id: %1)").arg(QString::number(stats->nodeStats.nodeid)); if (!stats->nodeStats.addrLocal.empty()) peerAddrDetails += "
" + tr("via %1").arg(QString::fromStdString(stats->nodeStats.addrLocal)); diff --git a/src/random.cpp b/src/random.cpp index 0c6129ea25..92ea4976e1 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -377,7 +377,7 @@ public: { } - void AddEvent(uint32_t event_info) noexcept + void AddEvent(uint32_t event_info) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_events_mutex) { LOCK(m_events_mutex); @@ -391,7 +391,7 @@ public: /** * Feed (the hash of) all events added through AddEvent() to hasher. */ - void SeedEvents(CSHA512& hasher) noexcept + void SeedEvents(CSHA512& hasher) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_events_mutex) { // We use only SHA256 for the events hashing to get the ASM speedups we have for SHA256, // since we want it to be fast as network peers may be able to trigger it repeatedly. @@ -410,7 +410,7 @@ public: * * If this function has never been called with strong_seed = true, false is returned. */ - bool MixExtract(unsigned char* out, size_t num, CSHA512&& hasher, bool strong_seed) noexcept + bool MixExtract(unsigned char* out, size_t num, CSHA512&& hasher, bool strong_seed) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { assert(num <= 32); unsigned char buf[64]; diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index b88e9de9c9..a5e4244995 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -200,7 +200,7 @@ static RPCHelpMan getpeerinfo() CNodeStateStats statestats; bool fStateStats = peerman.GetNodeStateStats(stats.nodeid, statestats); obj.pushKV("id", stats.nodeid); - obj.pushKV("addr", stats.addrName); + obj.pushKV("addr", stats.m_addr_name); if (stats.addrBind.IsValid()) { obj.pushKV("addrbind", stats.addrBind.ToString()); } diff --git a/src/scheduler.h b/src/scheduler.h index c2396da742..438fb8c126 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -46,10 +46,10 @@ public: typedef std::function Function; /** Call func at/after time t */ - void schedule(Function f, std::chrono::system_clock::time_point t); + void schedule(Function f, std::chrono::system_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); /** Call f once after the delta has passed */ - void scheduleFromNow(Function f, std::chrono::milliseconds delta) + void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) { schedule(std::move(f), std::chrono::system_clock::now() + delta); } @@ -60,29 +60,29 @@ public: * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more * accurate scheduling, don't use this method. */ - void scheduleEvery(Function f, std::chrono::milliseconds delta); + void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); /** * Mock the scheduler to fast forward in time. * Iterates through items on taskQueue and reschedules them * to be delta_seconds sooner. */ - void MockForward(std::chrono::seconds delta_seconds); + void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); /** * Services the queue 'forever'. Should be run in a thread. */ - void serviceQueue(); + void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); /** Tell any threads running serviceQueue to stop as soon as the current task is done */ - void stop() + void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) { WITH_LOCK(newTaskMutex, stopRequested = true); newTaskScheduled.notify_all(); if (m_service_thread.joinable()) m_service_thread.join(); } /** Tell any threads running serviceQueue to stop when there is no work left to be done */ - void StopWhenDrained() + void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) { WITH_LOCK(newTaskMutex, stopWhenEmpty = true); newTaskScheduled.notify_all(); @@ -94,10 +94,11 @@ public: * and first and last task times */ size_t getQueueInfo(std::chrono::system_clock::time_point& first, - std::chrono::system_clock::time_point& last) const; + std::chrono::system_clock::time_point& last) const + EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); /** Returns true if there are threads actively running in serviceQueue() */ - bool AreThreadsServicingQueue() const; + bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); private: mutable Mutex newTaskMutex; @@ -128,8 +129,8 @@ private: std::list> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; - void MaybeScheduleProcessQueue(); - void ProcessQueue(); + void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); public: explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} @@ -140,15 +141,15 @@ public: * Practically, this means that callbacks can behave as if they are executed * in order by a single thread. */ - void AddToProcessQueue(std::function func); + void AddToProcessQueue(std::function func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); /** * Processes all remaining queue members on the calling thread, blocking until queue is empty * Must be called after the CScheduler has no remaining processing threads! */ - void EmptyQueue(); + void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); - size_t CallbacksPending(); + size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); }; #endif diff --git a/src/sync.h b/src/sync.h index 2a046a78d8..c75b6554cf 100644 --- a/src/sync.h +++ b/src/sync.h @@ -77,8 +77,6 @@ inline void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, inline void DeleteLock(void* cs) {} inline bool LockStackEmpty() { return true; } #endif -#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs) -#define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs) /** * Template mixin that adds -Wthread-safety locking annotations and lock order @@ -138,10 +136,18 @@ public: using RecursiveMutex = AnnotatedMixin; /** Wrapped mutex: supports waiting but not recursive locking */ -typedef AnnotatedMixin Mutex; +using Mutex = AnnotatedMixin; + /** Wrapped shared mutex: supports read locking via .shared_lock, exlusive locking via .lock; * does not support recursive locking */ -typedef SharedAnnotatedMixin SharedMutex; +using SharedMutex = SharedAnnotatedMixin; + +#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs) + +inline void AssertLockNotHeldInline(const char* name, const char* file, int line, Mutex* cs) EXCLUSIVE_LOCKS_REQUIRED(!cs) { AssertLockNotHeldInternal(name, file, line, cs); } +inline void AssertLockNotHeldInline(const char* name, const char* file, int line, RecursiveMutex* cs) LOCKS_EXCLUDED(cs) { AssertLockNotHeldInternal(name, file, line, cs); } +inline void AssertLockNotHeldInline(const char* name, const char* file, int line, SharedMutex* cs) LOCKS_EXCLUDED(cs) { AssertLockNotHeldInternal(name, file, line, cs); } +#define AssertLockNotHeld(cs) AssertLockNotHeldInline(#cs, __FILE__, __LINE__, &cs) /** Prints a lock contention to the log */ void LockContention(const char* pszName, const char* pszFile, int nLine); diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index fb1675e20d..c45d20be37 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) // Mock an outbound peer CAddress addr1(ip(0xa0b0c001), NODE_NONE); - CNode dummyNode1(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr1, 0, 0, CAddress(), "", ConnectionType::OUTBOUND_FULL_RELAY); + CNode dummyNode1(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr1, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, CAddress(), /* pszDest */ "", ConnectionType::OUTBOUND_FULL_RELAY, /* inbound_onion */ false); dummyNode1.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode1); @@ -124,7 +124,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) static void AddRandomOutboundPeer(std::vector& vNodes, PeerManager& peerLogic, ConnmanTestMsg& connman) { CAddress addr(ip(g_insecure_rand_ctx.randbits(32)), NODE_NONE); - vNodes.emplace_back(new CNode(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr, 0, 0, CAddress(), "", ConnectionType::OUTBOUND_FULL_RELAY)); + vNodes.emplace_back(new CNode(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, CAddress(), /* pszDest */ "", ConnectionType::OUTBOUND_FULL_RELAY, /* inbound_onion */ false)); CNode &node = *vNodes.back(); node.SetCommonVersion(PROTOCOL_VERSION); @@ -220,7 +220,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) banman->ClearBanned(); CAddress addr1(ip(0xa0b0c001), NODE_NONE); - CNode dummyNode1(id++, NODE_NETWORK, INVALID_SOCKET, addr1, 0, 0, CAddress(), "", ConnectionType::INBOUND); + CNode dummyNode1(id++, NODE_NETWORK, INVALID_SOCKET, addr1, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, CAddress(), /* pszDest */ "", ConnectionType::INBOUND, /* inbound_onion */ false); dummyNode1.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode1); dummyNode1.fSuccessfullyConnected = true; @@ -233,7 +233,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) BOOST_CHECK(!banman->IsDiscouraged(ip(0xa0b0c001|0x0000ff00))); // Different IP, not discouraged CAddress addr2(ip(0xa0b0c002), NODE_NONE); - CNode dummyNode2(id++, NODE_NETWORK, INVALID_SOCKET, addr2, 1, 1, CAddress(), "", ConnectionType::INBOUND); + CNode dummyNode2(id++, NODE_NETWORK, INVALID_SOCKET, addr2, /* nKeyedNetGroupIn */ 1, /* nLocalHostNonceIn */ 1, CAddress(), /* pszDest */ "", ConnectionType::INBOUND, /* inbound_onion */ false); dummyNode2.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode2); dummyNode2.fSuccessfullyConnected = true; @@ -271,7 +271,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) SetMockTime(nStartTime); // Overrides future calls to GetTime() CAddress addr(ip(0xa0b0c001), NODE_NONE); - CNode dummyNode(id++, NODE_NETWORK, INVALID_SOCKET, addr, 4, 4, CAddress(), "", ConnectionType::INBOUND); + CNode dummyNode(id++, NODE_NETWORK, INVALID_SOCKET, addr, /* nKeyedNetGroupIn */ 4, /* nLocalHostNonceIn */ 4, CAddress(), /* pszDest */ "", ConnectionType::INBOUND, /* inbound_onion */ false); dummyNode.SetCommonVersion(PROTOCOL_VERSION); peerLogic->InitializeNode(&dummyNode); dummyNode.fSuccessfullyConnected = true; diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index 84d6834af8..4d18a81754 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -40,9 +40,6 @@ FUZZ_TARGET_INIT(net, initialize_net) CConnman connman{fuzzed_data_provider.ConsumeIntegral(), fuzzed_data_provider.ConsumeIntegral(), addrman}; node.CloseSocketDisconnect(&connman); }, - [&] { - node.MaybeSetAddrName(fuzzed_data_provider.ConsumeRandomLengthString(32)); - }, [&] { const std::vector asmap = ConsumeRandomLengthBitVector(fuzzed_data_provider); if (!SanityCheckASMap(asmap)) { @@ -75,7 +72,6 @@ FUZZ_TARGET_INIT(net, initialize_net) } (void)node.GetAddrLocal(); - (void)node.GetAddrName(); (void)node.GetId(); (void)node.GetLocalNonce(); (void)node.GetLocalServices(); diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index 68c266694a..29b714c584 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -194,14 +194,15 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) id++, NODE_NETWORK, hSocket, addr, /* nKeyedNetGroupIn = */ 0, /* nLocalHostNonceIn = */ 0, - CAddress(), pszDest, ConnectionType::OUTBOUND_FULL_RELAY); + CAddress(), pszDest, ConnectionType::OUTBOUND_FULL_RELAY, + /* inbound_onion = */ false); BOOST_CHECK(pnode1->IsFullOutboundConn() == true); BOOST_CHECK(pnode1->IsManualConn() == false); BOOST_CHECK(pnode1->IsBlockOnlyConn() == false); BOOST_CHECK(pnode1->IsFeelerConn() == false); BOOST_CHECK(pnode1->IsAddrFetchConn() == false); BOOST_CHECK(pnode1->IsInboundConn() == false); - BOOST_CHECK(pnode1->IsInboundOnion() == false); + BOOST_CHECK(pnode1->m_inbound_onion == false); BOOST_CHECK_EQUAL(pnode1->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode2 = std::make_unique( @@ -216,7 +217,7 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK(pnode2->IsFeelerConn() == false); BOOST_CHECK(pnode2->IsAddrFetchConn() == false); BOOST_CHECK(pnode2->IsInboundConn() == true); - BOOST_CHECK(pnode2->IsInboundOnion() == false); + BOOST_CHECK(pnode2->m_inbound_onion == false); BOOST_CHECK_EQUAL(pnode2->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode3 = std::make_unique( @@ -231,7 +232,7 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK(pnode3->IsFeelerConn() == false); BOOST_CHECK(pnode3->IsAddrFetchConn() == false); BOOST_CHECK(pnode3->IsInboundConn() == false); - BOOST_CHECK(pnode3->IsInboundOnion() == false); + BOOST_CHECK(pnode3->m_inbound_onion == false); BOOST_CHECK_EQUAL(pnode3->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode4 = std::make_unique( @@ -246,7 +247,7 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK(pnode4->IsFeelerConn() == false); BOOST_CHECK(pnode4->IsAddrFetchConn() == false); BOOST_CHECK(pnode4->IsInboundConn() == true); - BOOST_CHECK(pnode4->IsInboundOnion() == true); + BOOST_CHECK(pnode4->m_inbound_onion == true); BOOST_CHECK_EQUAL(pnode4->ConnectedThroughNetwork(), Network::NET_ONION); } @@ -740,7 +741,7 @@ BOOST_AUTO_TEST_CASE(ipv4_peer_with_ipv6_addrMe_test) in_addr ipv4AddrPeer; ipv4AddrPeer.s_addr = 0xa0b0c001; CAddress addr = CAddress(CService(ipv4AddrPeer, 7777), NODE_NETWORK); - std::unique_ptr pnode = std::make_unique(0, NODE_NETWORK, INVALID_SOCKET, addr, 0, 0, CAddress{}, std::string{}, ConnectionType::OUTBOUND_FULL_RELAY); + std::unique_ptr pnode = std::make_unique(0, NODE_NETWORK, INVALID_SOCKET, addr, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, CAddress{}, /* pszDest */ std::string{}, ConnectionType::OUTBOUND_FULL_RELAY, /* inbound_onion */ false); pnode->fSuccessfullyConnected.store(true); // the peer claims to be reaching us via IPv6 diff --git a/src/test/util/net.h b/src/test/util/net.h index b27364fee8..26a67a3e9b 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -23,16 +23,16 @@ struct ConnmanTestMsg : public CConnman { void AddTestNode(CNode& node) { - LOCK(cs_vNodes); - vNodes.push_back(&node); + LOCK(m_nodes_mutex); + m_nodes.push_back(&node); } void ClearTestNodes() { - LOCK(cs_vNodes); - for (CNode* node : vNodes) { + LOCK(m_nodes_mutex); + for (CNode* node : m_nodes) { delete node; } - vNodes.clear(); + m_nodes.clear(); } void ProcessMessagesOnce(CNode& node) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); } diff --git a/src/threadinterrupt.h b/src/threadinterrupt.h index c60ed3b336..c053c01499 100644 --- a/src/threadinterrupt.h +++ b/src/threadinterrupt.h @@ -22,7 +22,7 @@ public: using Clock = std::chrono::steady_clock; CThreadInterrupt(); explicit operator bool() const; - void operator()(); + void operator()() EXCLUSIVE_LOCKS_REQUIRED(!mut); void reset(); bool sleep_for(Clock::duration rel_time) EXCLUSIVE_LOCKS_REQUIRED(!mut); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index cbee7d7cb8..ac8b19dcdc 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -47,7 +47,7 @@ public: explicit MainSignalsInstance(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {} - void Register(std::shared_ptr callbacks) + void Register(std::shared_ptr callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { LOCK(m_mutex); auto inserted = m_map.emplace(callbacks.get(), m_list.end()); @@ -55,7 +55,7 @@ public: inserted.first->second->callbacks = std::move(callbacks); } - void Unregister(CValidationInterface* callbacks) + void Unregister(CValidationInterface* callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { LOCK(m_mutex); auto it = m_map.find(callbacks); @@ -69,7 +69,7 @@ public: //! map entry. After this call, the list may still contain callbacks that //! are currently executing, but it will be cleared when they are done //! executing. - void Clear() + void Clear() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { LOCK(m_mutex); for (const auto& entry : m_map) { @@ -78,7 +78,7 @@ public: m_map.clear(); } - template void Iterate(F&& f) + template void Iterate(F&& f) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { WAIT_LOCK(m_mutex, lock); for (auto it = m_list.begin(); it != m_list.end();) { diff --git a/src/versionbits.h b/src/versionbits.h index c02769809e..a02fc8cce5 100644 --- a/src/versionbits.h +++ b/src/versionbits.h @@ -90,16 +90,16 @@ public: static uint32_t Mask(const Consensus::Params& params, Consensus::DeploymentPos pos); /** Get the BIP9 state for a given deployment for the block after pindexPrev. */ - ThresholdState State(const CBlockIndex* pindexPrev, const Consensus::Params& params, Consensus::DeploymentPos pos); + ThresholdState State(const CBlockIndex* pindexPrev, const Consensus::Params& params, Consensus::DeploymentPos pos) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); /** Get the block height at which the BIP9 deployment switched into the state for the block after pindexPrev. */ - int StateSinceHeight(const CBlockIndex* pindexPrev, const Consensus::Params& params, Consensus::DeploymentPos pos); + int StateSinceHeight(const CBlockIndex* pindexPrev, const Consensus::Params& params, Consensus::DeploymentPos pos) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); /** Determine what nVersion a new block should use */ - int32_t ComputeBlockVersion(const CBlockIndex* pindexPrev, const Consensus::Params& params); + int32_t ComputeBlockVersion(const CBlockIndex* pindexPrev, const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); - void Clear(); + void Clear() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); }; class AbstractEHFManager