merge bitcoin#22829: various RecursiveMutex replacements in CConnman

This commit is contained in:
Kittywhiskers Van Gogh 2024-05-09 08:52:47 +00:00
parent 362e3101ad
commit 23b152cd37
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
3 changed files with 129 additions and 130 deletions

View File

@ -331,8 +331,8 @@ bool IsLocal(const CService& addr)
CNode* CConnman::FindNode(const CNetAddr& ip, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
@ -345,8 +345,8 @@ CNode* CConnman::FindNode(const CNetAddr& ip, bool fExcludeDisconnecting)
CNode* CConnman::FindNode(const CSubNet& subNet, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
@ -359,8 +359,8 @@ CNode* CConnman::FindNode(const CSubNet& subNet, bool fExcludeDisconnecting)
CNode* CConnman::FindNode(const std::string& addrName, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
@ -373,8 +373,8 @@ CNode* CConnman::FindNode(const std::string& addrName, bool fExcludeDisconnectin
CNode* CConnman::FindNode(const CService& addr, bool fExcludeDisconnecting)
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (fExcludeDisconnecting && pnode->fDisconnect) {
continue;
}
@ -392,8 +392,8 @@ bool CConnman::AlreadyConnectedToAddress(const CAddress& addr)
bool CConnman::CheckIncomingNonce(uint64_t nonce)
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce)
return false;
}
@ -458,7 +458,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
}
// It is possible that we already have a connection to the IP/port pszDest resolved to.
// In that case, drop the connection that was just created.
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
CNode* pnode = FindNode(static_cast<CService>(addrConnect));
if (pnode) {
LogPrintf("Failed to open new connection, already connected\n");
@ -538,7 +538,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
void CNode::CloseSocketDisconnect(CConnman* connman)
{
AssertLockHeld(connman->cs_vNodes);
AssertLockHeld(connman->m_nodes_mutex);
fDisconnect = true;
LOCK2(connman->cs_mapSocketToNode, cs_hSocket);
@ -1070,9 +1070,9 @@ bool CConnman::AttemptToEvictConnection()
{
std::vector<NodeEvictionCandidate> vEvictionCandidates;
{
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
for (const CNode* node : vNodes) {
for (const CNode* node : m_nodes) {
if (node->HasPermission(NetPermissionFlags::NoBan))
continue;
if (!node->IsInboundConn())
@ -1112,8 +1112,8 @@ bool CConnman::AttemptToEvictConnection()
if (!node_id_to_evict) {
return false;
}
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (pnode->GetId() == *node_id_to_evict) {
LogPrint(BCLog::NET_NETCONN, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId());
pnode->fDisconnect = true;
@ -1170,8 +1170,8 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
}
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->IsInboundConn()) {
nInbound++;
if (!pnode->GetVerifiedProRegTxHash().IsNull()) {
@ -1271,8 +1271,8 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
}
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode));
RegisterEvents(pnode);
WakeSelect();
@ -1289,8 +1289,8 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
const int max_connections = conn_type == ConnectionType::OUTBOUND_FULL_RELAY ? m_max_outbound_full_relay : m_max_outbound_block_relay;
// Count existing connections
int existing_connections = WITH_LOCK(cs_vNodes,
return std::count_if(vNodes.begin(), vNodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
int existing_connections = WITH_LOCK(m_nodes_mutex,
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
// Max connections of specified type already exist
if (existing_connections >= max_connections) return false;
@ -1306,11 +1306,11 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
void CConnman::DisconnectNodes()
{
{
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
if (!fNetworkActive) {
// Disconnect any connected nodes
for (CNode* pnode : vNodes) {
for (CNode* pnode : m_nodes) {
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
pnode->fDisconnect = true;
@ -1319,7 +1319,7 @@ void CConnman::DisconnectNodes()
}
// Disconnect unused nodes
for (auto it = vNodes.begin(); it != vNodes.end(); )
for (auto it = m_nodes.begin(); it != m_nodes.end(); )
{
CNode* pnode = *it;
if (pnode->fDisconnect)
@ -1362,8 +1362,8 @@ void CConnman::DisconnectNodes()
pnode->GetId(), pnode->GetRefCount(), pnode->IsInboundConn(), pnode->m_masternode_connection, pnode->m_masternode_iqr_connection);
}
// remove from vNodes
it = vNodes.erase(it);
// remove from m_nodes
it = m_nodes.erase(it);
// release outbound grant (if any)
pnode->grantOutbound.Release();
@ -1373,7 +1373,7 @@ void CConnman::DisconnectNodes()
// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
m_nodes_disconnected.push_back(pnode);
} else {
++it;
}
@ -1381,8 +1381,8 @@ void CConnman::DisconnectNodes()
}
{
// Delete disconnected nodes
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
for (auto it = vNodesDisconnected.begin(); it != vNodesDisconnected.end(); )
std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected;
for (auto it = m_nodes_disconnected.begin(); it != m_nodes_disconnected.end(); )
{
CNode* pnode = *it;
// wait until threads are done using it
@ -1395,7 +1395,7 @@ void CConnman::DisconnectNodes()
}
}
if (fDelete) {
it = vNodesDisconnected.erase(it);
it = m_nodes_disconnected.erase(it);
DeleteNode(pnode);
}
}
@ -1408,22 +1408,22 @@ void CConnman::DisconnectNodes()
void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync)
{
size_t vNodesSize;
size_t nodes_size;
{
LOCK(cs_vNodes);
vNodesSize = vNodes.size();
LOCK(m_nodes_mutex);
nodes_size = m_nodes.size();
}
// If we had zero connections before and new connections now or if we just dropped
// to zero connections reset the sync process if its outdated.
if ((vNodesSize > 0 && nPrevNodeCount == 0) || (vNodesSize == 0 && nPrevNodeCount > 0)) {
if ((nodes_size > 0 && nPrevNodeCount == 0) || (nodes_size == 0 && nPrevNodeCount > 0)) {
mn_sync.Reset();
}
if(vNodesSize != nPrevNodeCount) {
nPrevNodeCount = vNodesSize;
if(nodes_size != nPrevNodeCount) {
nPrevNodeCount = nodes_size;
if(clientInterface)
clientInterface->NotifyNumConnectionsChanged(vNodesSize);
clientInterface->NotifyNumConnectionsChanged(nodes_size);
CalculateNumConnectionsChangedStats();
}
@ -1810,7 +1810,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
bool only_poll = [this]() {
// Check if we have work to do and thus should avoid waiting for events
LOCK2(cs_vNodes, cs_sendable_receivable_nodes);
LOCK2(m_nodes_mutex, cs_sendable_receivable_nodes);
if (!mapReceivableNodes.empty()) {
return true;
} else if (!mapSendableNodes.empty()) {
@ -2043,7 +2043,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
{
bool notify = false;
if (!pnode->ReceiveMsgBytes(Span<const uint8_t>(pchBuf, nBytes), notify)) {
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
pnode->CloseSocketDisconnect(this);
}
RecordBytesRecv(nBytes);
@ -2070,7 +2070,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "socket closed for peer=%d\n", pnode->GetId());
}
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
pnode->fOtherSideDisconnected = true; // avoid lingering
pnode->CloseSocketDisconnect(this);
}
@ -2083,7 +2083,7 @@ size_t CConnman::SocketRecvData(CNode *pnode)
if (!pnode->fDisconnect){
LogPrint(BCLog::NET, "socket recv error for peer=%d: %s\n", pnode->GetId(), NetworkErrorString(nErr));
}
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
pnode->fOtherSideDisconnected = true; // avoid lingering
pnode->CloseSocketDisconnect(this);
}
@ -2188,8 +2188,8 @@ void CConnman::ThreadDNSAddressSeed()
int nRelevant = 0;
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->IsFullOutboundConn() && !pnode->m_masternode_probe_connection) ++nRelevant;
}
}
@ -2298,8 +2298,8 @@ int CConnman::GetExtraFullOutboundCount() const
{
int full_outbound_peers = 0;
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
// don't count outbound masternodes
if (pnode->m_masternode_connection) {
continue;
@ -2316,8 +2316,8 @@ int CConnman::GetExtraBlockRelayCount() const
{
int block_relay_peers = 0;
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) {
++block_relay_peers;
}
@ -2388,8 +2388,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
// Checking !dnsseed is cheaper before locking 2 mutexes.
if (!add_fixed_seeds_now && !dnsseed) {
LOCK2(m_addr_fetches_mutex, cs_vAddedNodes);
if (m_addr_fetches.empty() && vAddedNodes.empty()) {
LOCK2(m_addr_fetches_mutex, m_added_nodes_mutex);
if (m_addr_fetches.empty() && m_added_nodes.empty()) {
add_fixed_seeds_now = true;
LogPrintf("Adding fixed seeds as -dnsseed=0, -addnode is not provided and all -seednode(s) attempted\n");
}
@ -2414,8 +2414,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
int nOutboundBlockRelay = 0;
std::set<std::vector<unsigned char> > setConnected;
if (!Params().AllowMultipleAddressesFromGroup()) {
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->IsFullOutboundConn() && !pnode->m_masternode_connection) nOutboundFullRelay++;
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
@ -2439,8 +2439,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
std::set<uint256> setConnectedMasternodes;
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash();
if (!verifiedProRegTxHash.IsNull()) {
setConnectedMasternodes.emplace(verifiedProRegTxHash);
@ -2630,8 +2630,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe
std::vector<CAddress> CConnman::GetCurrentBlockRelayOnlyConns() const
{
std::vector<CAddress> ret;
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->IsBlockRelayOnly()) {
ret.push_back(pnode->addr);
}
@ -2646,9 +2646,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
std::list<std::string> lAddresses(0);
{
LOCK(cs_vAddedNodes);
ret.reserve(vAddedNodes.size());
std::copy(vAddedNodes.cbegin(), vAddedNodes.cend(), std::back_inserter(lAddresses));
LOCK(m_added_nodes_mutex);
ret.reserve(m_added_nodes.size());
std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses));
}
@ -2656,8 +2656,8 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
std::map<CService, bool> mapConnected;
std::map<std::string, std::pair<bool, CService>> mapConnectedByName;
{
LOCK(cs_vNodes);
for (const CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->addr.IsValid()) {
mapConnected[pnode->addr] = pnode->IsInboundConn();
}
@ -2837,7 +2837,7 @@ void CConnman::ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman,
auto getConnectToDmn = [&]() -> CDeterministicMNCPtr {
// don't hold lock while calling OpenMasternodeConnection as cs_main is locked deep inside
LOCK2(cs_vNodes, cs_vPendingMasternodes);
LOCK2(m_nodes_mutex, cs_vPendingMasternodes);
if (!vPendingMasternodes.empty()) {
auto dmn = mnList.GetValidMN(vPendingMasternodes.front());
@ -2970,8 +2970,8 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
m_msgproc->InitializeNode(pnode);
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
RegisterEvents(pnode);
WakeSelect();
}
@ -2999,7 +2999,7 @@ void CConnman::ThreadMessageHandler()
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the vNodes list.
// consecutive connections in the m_nodes list.
const NodesSnapshot snap{*this, /* filter = */ CConnman::AllNodes, /* shuffle = */ true};
for (CNode* pnode : snap.Nodes()) {
@ -3550,10 +3550,10 @@ void CConnman::StopNodes()
}
{
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
// Close sockets
for (CNode *pnode : vNodes)
for (CNode *pnode : m_nodes)
pnode->CloseSocketDisconnect(this);
}
for (ListenSocket& hListenSocket : vhListenSocket)
@ -3576,11 +3576,11 @@ void CConnman::StopNodes()
// clean up some globals (to help leak detection)
std::vector<CNode*> nodes;
WITH_LOCK(cs_vNodes, nodes.swap(vNodes));
WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes));
for (CNode* pnode : nodes) {
DeleteNode(pnode);
}
for (CNode* pnode : vNodesDisconnected) {
for (CNode* pnode : m_nodes_disconnected) {
DeleteNode(pnode);
}
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.clear());
@ -3592,7 +3592,7 @@ void CConnman::StopNodes()
LOCK(cs_mapNodesWithDataToSend);
mapNodesWithDataToSend.clear();
}
vNodesDisconnected.clear();
m_nodes_disconnected.clear();
vhListenSocket.clear();
semOutbound.reset();
semAddnode.reset();
@ -3695,21 +3695,21 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres
bool CConnman::AddNode(const std::string& strNode)
{
LOCK(cs_vAddedNodes);
for (const std::string& it : vAddedNodes) {
LOCK(m_added_nodes_mutex);
for (const std::string& it : m_added_nodes) {
if (strNode == it) return false;
}
vAddedNodes.push_back(strNode);
m_added_nodes.push_back(strNode);
return true;
}
bool CConnman::RemoveAddedNode(const std::string& strNode)
{
LOCK(cs_vAddedNodes);
for(std::vector<std::string>::iterator it = vAddedNodes.begin(); it != vAddedNodes.end(); ++it) {
LOCK(m_added_nodes_mutex);
for(std::vector<std::string>::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) {
if (strNode == *it) {
vAddedNodes.erase(it);
m_added_nodes.erase(it);
return true;
}
}
@ -3782,7 +3782,7 @@ std::set<uint256> CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType)
std::set<NodeId> CConnman::GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const
{
LOCK2(cs_vNodes, cs_vPendingMasternodes);
LOCK2(m_nodes_mutex, cs_vPendingMasternodes);
auto it = masternodeQuorumNodes.find(std::make_pair(llmqType, quorumHash));
if (it == masternodeQuorumNodes.end()) {
return {};
@ -3790,7 +3790,7 @@ std::set<NodeId> CConnman::GetMasternodeQuorumNodes(Consensus::LLMQType llmqType
const auto& proRegTxHashes = it->second;
std::set<NodeId> nodes;
for (const auto pnode : vNodes) {
for (const auto pnode : m_nodes) {
if (pnode->fDisconnect) {
continue;
}
@ -3861,10 +3861,10 @@ void CConnman::AddPendingProbeConnections(const std::set<uint256> &proTxHashes)
size_t CConnman::GetNodeCount(ConnectionDirection flags) const
{
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
int nNum = 0;
for (const auto& pnode : vNodes) {
for (const auto& pnode : m_nodes) {
if (pnode->fDisconnect) {
continue;
}
@ -3889,9 +3889,9 @@ size_t CConnman::GetMaxOutboundNodeCount()
void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
{
vstats.clear();
LOCK(cs_vNodes);
vstats.reserve(vNodes.size());
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
vstats.reserve(m_nodes.size());
for (CNode* pnode : m_nodes) {
if (pnode->fDisconnect) {
continue;
}
@ -3902,7 +3902,7 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
bool CConnman::DisconnectNode(const std::string& strNode)
{
LOCK(cs_vNodes);
LOCK(m_nodes_mutex);
if (CNode* pnode = FindNode(strNode)) {
LogPrint(BCLog::NET_NETCONN, "disconnect by address%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->GetId());
pnode->fDisconnect = true;
@ -3914,8 +3914,8 @@ bool CConnman::DisconnectNode(const std::string& strNode)
bool CConnman::DisconnectNode(const CSubNet& subnet)
{
bool disconnected = false;
LOCK(cs_vNodes);
for (CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (subnet.Match(pnode->addr)) {
LogPrint(BCLog::NET_NETCONN, "disconnect by subnet%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->GetId());
pnode->fDisconnect = true;
@ -3932,8 +3932,8 @@ bool CConnman::DisconnectNode(const CNetAddr& addr)
bool CConnman::DisconnectNode(NodeId id)
{
LOCK(cs_vNodes);
for(CNode* pnode : vNodes) {
LOCK(m_nodes_mutex);
for(CNode* pnode : m_nodes) {
if (id == pnode->GetId()) {
LogPrint(BCLog::NET_NETCONN, "disconnect by id peer=%d; disconnecting\n", pnode->GetId());
pnode->fDisconnect = true;
@ -3945,7 +3945,6 @@ bool CConnman::DisconnectNode(NodeId id)
void CConnman::RecordBytesRecv(uint64_t bytes)
{
LOCK(cs_totalBytesRecv);
nTotalBytesRecv += bytes;
statsClient.count("bandwidth.bytesReceived", bytes, 0.1f);
statsClient.gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f);
@ -4025,7 +4024,6 @@ uint64_t CConnman::GetOutboundTargetBytesLeft() const
uint64_t CConnman::GetTotalBytesRecv() const
{
LOCK(cs_totalBytesRecv);
return nTotalBytesRecv;
}
@ -4112,9 +4110,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
LOCK(cs_mapNodesWithDataToSend);
// we're not holding cs_vNodes here, so there is a chance of this node being disconnected shortly before
// we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before
// we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we
// might end up having an entry in mapNodesWithDataToSend that is not in vNodes anymore. We need to
// might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to
// Add/Release refs when adding/erasing mapNodesWithDataToSend.
if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) {
pnode->AddRef();
@ -4130,8 +4128,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
bool CConnman::ForNode(const CService& addr, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(cs_vNodes);
for (auto&& pnode : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& pnode : m_nodes) {
if((CService)pnode->addr == addr) {
found = pnode;
break;
@ -4143,8 +4141,8 @@ bool CConnman::ForNode(const CService& addr, std::function<bool(const CNode* pno
bool CConnman::ForNode(NodeId id, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(cs_vNodes);
for (auto&& pnode : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& pnode : m_nodes) {
if(pnode->GetId() == id) {
found = pnode;
break;
@ -4179,10 +4177,10 @@ std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::ch
CConnman::NodesSnapshot::NodesSnapshot(const CConnman& connman, std::function<bool(const CNode* pnode)> filter,
bool shuffle)
{
LOCK(connman.cs_vNodes);
m_nodes_copy.reserve(connman.vNodes.size());
LOCK(connman.m_nodes_mutex);
m_nodes_copy.reserve(connman.m_nodes.size());
for (auto& node : connman.vNodes) {
for (auto& node : connman.m_nodes) {
if (!filter(node))
continue;
node->AddRef();

View File

@ -878,8 +878,8 @@ public:
}
vWhitelistedRange = connOptions.vWhitelistedRange;
{
LOCK(cs_vAddedNodes);
vAddedNodes = connOptions.m_added_nodes;
LOCK(m_added_nodes_mutex);
m_added_nodes = connOptions.m_added_nodes;
}
socketEventsMode = connOptions.socketEventsMode;
m_onion_binds = connOptions.onion_binds;
@ -961,8 +961,8 @@ public:
template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func)
{
LOCK(cs_vNodes);
for (auto&& node : vNodes)
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes)
if (cond(node))
if(!func(node))
return false;
@ -978,8 +978,8 @@ public:
template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) const
{
LOCK(cs_vNodes);
for (const auto& node : vNodes)
LOCK(m_nodes_mutex);
for (const auto& node : m_nodes)
if (cond(node))
if(!func(node))
return false;
@ -995,8 +995,8 @@ public:
template<typename Condition, typename Callable>
void ForEachNode(const Condition& cond, Callable&& func)
{
LOCK(cs_vNodes);
for (auto&& node : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (cond(node))
func(node);
}
@ -1011,8 +1011,8 @@ public:
template<typename Condition, typename Callable>
void ForEachNode(const Condition& cond, Callable&& func) const
{
LOCK(cs_vNodes);
for (auto&& node : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (cond(node))
func(node);
}
@ -1027,8 +1027,8 @@ public:
template<typename Condition, typename Callable, typename CallableAfter>
void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post)
{
LOCK(cs_vNodes);
for (auto&& node : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (cond(node))
pre(node);
}
@ -1044,8 +1044,8 @@ public:
template<typename Condition, typename Callable, typename CallableAfter>
void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post) const
{
LOCK(cs_vNodes);
for (auto&& node : vNodes) {
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (cond(node))
pre(node);
}
@ -1180,7 +1180,7 @@ public:
bool ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const;
/**
* RAII helper to atomically create a copy of `vNodes` and add a reference
* RAII helper to atomically create a copy of `m_nodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/
class NodesSnapshot
@ -1226,7 +1226,7 @@ private:
/**
* Create a `CNode` object from a socket that has just been accepted and add the node to
* the `vNodes` member.
* the `m_nodes` member.
* @param[in] hSocket Connected socket to communicate with the peer.
* @param[in] permissionFlags The peer's permissions.
* @param[in] addr_bind The address and port at our side of the connection.
@ -1364,9 +1364,8 @@ private:
void UnregisterEvents(CNode* pnode);
// Network usage totals
mutable RecursiveMutex cs_totalBytesRecv;
mutable RecursiveMutex cs_totalBytesSent;
uint64_t nTotalBytesRecv GUARDED_BY(cs_totalBytesRecv) {0};
std::atomic<uint64_t> nTotalBytesRecv{0};
uint64_t nTotalBytesSent GUARDED_BY(cs_totalBytesSent) {0};
// outbound limit & stats
@ -1389,21 +1388,23 @@ private:
bool fAddressesInitialized{false};
CAddrMan& addrman;
std::deque<std::string> m_addr_fetches GUARDED_BY(m_addr_fetches_mutex);
RecursiveMutex m_addr_fetches_mutex;
std::vector<std::string> vAddedNodes GUARDED_BY(cs_vAddedNodes);
mutable RecursiveMutex cs_vAddedNodes;
Mutex m_addr_fetches_mutex;
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
std::list<CNode*> m_nodes_disconnected;
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
std::vector<uint256> vPendingMasternodes;
mutable RecursiveMutex cs_vPendingMasternodes;
std::map<std::pair<Consensus::LLMQType, uint256>, std::set<uint256>> masternodeQuorumNodes GUARDED_BY(cs_vPendingMasternodes);
std::map<std::pair<Consensus::LLMQType, uint256>, std::set<uint256>> masternodeQuorumRelayMembers GUARDED_BY(cs_vPendingMasternodes);
std::set<uint256> masternodePendingProbes GUARDED_BY(cs_vPendingMasternodes);
std::vector<CNode*> vNodes GUARDED_BY(cs_vNodes);
std::list<CNode*> vNodesDisconnected;
mutable Mutex cs_mapSocketToNode;
std::unordered_map<SOCKET, CNode*> mapSocketToNode GUARDED_BY(cs_mapSocketToNode);
mutable RecursiveMutex cs_vNodes;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
/**
* Cache responses to addr requests to minimize privacy leak.

View File

@ -23,16 +23,16 @@ struct ConnmanTestMsg : public CConnman {
void AddTestNode(CNode& node)
{
LOCK(cs_vNodes);
vNodes.push_back(&node);
LOCK(m_nodes_mutex);
m_nodes.push_back(&node);
}
void ClearTestNodes()
{
LOCK(cs_vNodes);
for (CNode* node : vNodes) {
LOCK(m_nodes_mutex);
for (CNode* node : m_nodes) {
delete node;
}
vNodes.clear();
m_nodes.clear();
}
void ProcessMessagesOnce(CNode& node) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); }