merge bitcoin#21943: Dedup and RAII-fy the creation of a copy of CConnman::vNodes

This commit is contained in:
Kittywhiskers Van Gogh 2024-05-08 16:21:51 +00:00
parent bf98ad6a42
commit 362e3101ad
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
6 changed files with 225 additions and 115 deletions

View File

@ -1223,11 +1223,11 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH
int CGovernanceManager::RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const
{
std::array<CNode*, 1> nodeCopy{&peer};
return RequestGovernanceObjectVotes(nodeCopy, connman);
const std::vector<CNode*> vNodeCopy{&peer};
return RequestGovernanceObjectVotes(vNodeCopy, connman);
}
int CGovernanceManager::RequestGovernanceObjectVotes(Span<CNode*> vNodesCopy, CConnman& connman) const
int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy, CConnman& connman) const
{
static std::map<uint256, std::map<CService, int64_t> > mapAskedRecently;
@ -1501,7 +1501,7 @@ void CGovernanceManager::UpdatedBlockTip(const CBlockIndex* pindex, CConnman& co
void CGovernanceManager::RequestOrphanObjects(CConnman& connman)
{
std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);
const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};
std::vector<uint256> vecHashesFiltered;
{
@ -1517,15 +1517,13 @@ void CGovernanceManager::RequestOrphanObjects(CConnman& connman)
LogPrint(BCLog::GOBJECT, "CGovernanceObject::RequestOrphanObjects -- number objects = %d\n", vecHashesFiltered.size());
for (const uint256& nHash : vecHashesFiltered) {
for (CNode* pnode : vNodesCopy) {
for (CNode* pnode : snap.Nodes()) {
if (!pnode->CanRelay()) {
continue;
}
RequestGovernanceObject(pnode, nHash, connman);
}
}
connman.ReleaseNodeVector(vNodesCopy);
}
void CGovernanceManager::CleanOrphanObjects()

View File

@ -358,7 +358,7 @@ public:
void InitOnLoad();
int RequestGovernanceObjectVotes(CNode& peer, CConnman& connman) const;
int RequestGovernanceObjectVotes(Span<CNode*> vNodesCopy, CConnman& connman) const;
int RequestGovernanceObjectVotes(const std::vector<CNode*>& vNodesCopy, CConnman& connman) const;
/*
* Trigger Management (formerly CGovernanceTriggerManager)

View File

@ -1092,14 +1092,13 @@ bool CSigSharesManager::SendMessages()
return session->sendSessionId;
};
std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);
const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};
{
LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, vNodesCopy);
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());
for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
for (auto& [hash, sigShareInv] : sigShareMap) {
@ -1120,7 +1119,7 @@ bool CSigSharesManager::SendMessages()
bool didSend = false;
for (auto& pnode : vNodesCopy) {
for (auto& pnode : snap.Nodes()) {
CNetMsgMaker msgMaker(pnode->GetCommonVersion());
if (const auto it1 = sigSessionAnnouncements.find(pnode->GetId()); it1 != sigSessionAnnouncements.end()) {
@ -1222,9 +1221,6 @@ bool CSigSharesManager::SendMessages()
}
}
// looped through all nodes, release them
connman.ReleaseNodeVector(vNodesCopy);
return didSend;
}

View File

@ -140,12 +140,11 @@ void CMasternodeSync::ProcessTick()
}
nTimeLastProcess = GetTime();
std::vector<CNode*> vNodesCopy = connman.CopyNodeVector(CConnman::FullyConnectedOnly);
const CConnman::NodesSnapshot snap{connman, /* filter = */ CConnman::FullyConnectedOnly};
// gradually request the rest of the votes after sync finished
if(IsSynced()) {
m_govman.RequestGovernanceObjectVotes(vNodesCopy, connman);
connman.ReleaseNodeVector(vNodesCopy);
m_govman.RequestGovernanceObjectVotes(snap.Nodes(), connman);
return;
}
@ -154,7 +153,7 @@ void CMasternodeSync::ProcessTick()
LogPrint(BCLog::MNSYNC, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTriedPeerCount %d nSyncProgress %f\n", nTick, nCurrentAsset, nTriedPeerCount, nSyncProgress);
uiInterface.NotifyAdditionalDataSyncProgressChanged(nSyncProgress);
for (auto& pnode : vNodesCopy)
for (auto& pnode : snap.Nodes())
{
CNetMsgMaker msgMaker(pnode->GetCommonVersion());
@ -189,7 +188,7 @@ void CMasternodeSync::ProcessTick()
}
if (nCurrentAsset == MASTERNODE_SYNC_BLOCKCHAIN) {
int64_t nTimeSyncTimeout = vNodesCopy.size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS;
int64_t nTimeSyncTimeout = snap.Nodes().size() > 3 ? MASTERNODE_SYNC_TICK_SECONDS : MASTERNODE_SYNC_TIMEOUT_SECONDS;
if (fReachedBestHeader && (GetTime() - nTimeLastBumped > nTimeSyncTimeout)) {
// At this point we know that:
// a) there are peers (because we are looping on at least one of them);
@ -205,7 +204,7 @@ void CMasternodeSync::ProcessTick()
if (gArgs.GetBoolArg("-syncmempool", DEFAULT_SYNC_MEMPOOL)) {
// Now that the blockchain is synced request the mempool from the connected outbound nodes if possible
for (auto pNodeTmp : vNodesCopy) {
for (auto pNodeTmp : snap.Nodes()) {
bool fRequestedEarlier = m_netfulfilledman.HasFulfilledRequest(pNodeTmp->addr, "mempool-sync");
if (pNodeTmp->nVersion >= 70216 && !pNodeTmp->IsInboundConn() && !fRequestedEarlier && !pNodeTmp->IsBlockRelayOnly()) {
m_netfulfilledman.AddFulfilledRequest(pNodeTmp->addr, "mempool-sync");
@ -222,7 +221,6 @@ void CMasternodeSync::ProcessTick()
if(nCurrentAsset == MASTERNODE_SYNC_GOVERNANCE) {
if (!m_govman.IsValid()) {
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}
LogPrint(BCLog::GOBJECT, "CMasternodeSync::ProcessTick -- nTick %d nCurrentAsset %d nTimeLastBumped %lld GetTime() %lld diff %lld\n", nTick, nCurrentAsset, nTimeLastBumped, GetTime(), GetTime() - nTimeLastBumped);
@ -235,7 +233,6 @@ void CMasternodeSync::ProcessTick()
// it's kind of ok to skip this for now, hopefully we'll catch up later?
}
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}
@ -259,12 +256,11 @@ void CMasternodeSync::ProcessTick()
if (nCurrentAsset != MASTERNODE_SYNC_GOVERNANCE) {
// looped through all nodes and not syncing governance yet/already, release them
connman.ReleaseNodeVector(vNodesCopy);
return;
}
// request votes on per-obj basis from each node
for (const auto& pnode : vNodesCopy) {
for (const auto& pnode : snap.Nodes()) {
if(!m_netfulfilledman.HasFulfilledRequest(pnode->addr, "governance-sync")) {
continue; // to early for this node
}
@ -291,16 +287,12 @@ void CMasternodeSync::ProcessTick()
// reset nTimeNoObjectsLeft to be able to use the same condition on resync
nTimeNoObjectsLeft = 0;
SwitchToNextAsset();
connman.ReleaseNodeVector(vNodesCopy);
return;
}
nLastTick = nTick;
nLastVotes = m_govman.GetVoteCount();
}
}
// looped through all nodes, release them
connman.ReleaseNodeVector(vNodesCopy);
}
void CMasternodeSync::SendGovernanceSyncRequest(CNode* pnode) const

View File

@ -1451,8 +1451,8 @@ void CConnman::CalculateNumConnectionsChangedStats()
}
mapRecvBytesMsgStats[NET_MESSAGE_COMMAND_OTHER] = 0;
mapSentBytesMsgStats[NET_MESSAGE_COMMAND_OTHER] = 0;
auto vNodesCopy = CopyNodeVector(CConnman::FullyConnectedOnly);
for (auto pnode : vNodesCopy) {
const NodesSnapshot snap{*this, /* filter = */ CConnman::FullyConnectedOnly};
for (auto pnode : snap.Nodes()) {
{
LOCK(pnode->cs_vRecv);
for (const mapMsgCmdSize::value_type &i : pnode->mapRecvBytesPerMsgCmd)
@ -1481,7 +1481,6 @@ void CConnman::CalculateNumConnectionsChangedStats()
if (last_ping_time > 0)
statsClient.timing("peers.ping_us", last_ping_time, 1.0f);
}
ReleaseNodeVector(vNodesCopy);
for (const std::string &msg : getAllNetMessageTypes()) {
statsClient.gauge("bandwidth.message." + msg + ".totalBytesReceived", mapRecvBytesMsgStats[msg], 1.0f);
statsClient.gauge("bandwidth.message." + msg + ".totalBytesSent", mapSentBytesMsgStats[msg], 1.0f);
@ -1534,15 +1533,16 @@ bool CConnman::InactivityCheck(const CNode& node) const
return false;
}
bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set)
{
for (const ListenSocket& hListenSocket : vhListenSocket) {
recv_set.insert(hListenSocket.socket);
}
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
for (CNode* pnode : nodes)
{
bool select_recv = !pnode->fHasRecvData;
bool select_send = !pnode->fCanSendData;
@ -1559,7 +1559,6 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
recv_set.insert(pnode->hSocket);
}
}
}
#ifdef USE_WAKEUP_PIPE
// We add a pipe to the read set so that the select() call can be woken up from the outside
@ -1574,14 +1573,17 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
}
#ifdef USE_KQUEUE
void CConnman::SocketEventsKqueue(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll)
{
const size_t maxEvents = 64;
struct kevent events[maxEvents];
struct timespec timeout;
timeout.tv_sec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
timeout.tv_nsec = (fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
wakeupSelectNeeded = true;
int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout);
@ -1609,13 +1611,16 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET> &recv_set, std::set<SOCKET> &
#endif
#ifdef USE_EPOLL
void CConnman::SocketEventsEpoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll)
{
const size_t maxEvents = 64;
epoll_event events[maxEvents];
wakeupSelectNeeded = true;
int n = epoll_wait(epollfd, events, maxEvents, fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
for (int i = 0; i < n; i++) {
auto& e = events[i];
@ -1636,11 +1641,15 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
#endif
#ifdef USE_POLL
void CConnman::SocketEventsPoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
void CConnman::SocketEventsPoll(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
if (!fOnlyPoll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
if (!only_poll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@ -1668,7 +1677,7 @@ void CConnman::SocketEventsPoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &se
}
wakeupSelectNeeded = true;
int r = poll(vpollfds.data(), vpollfds.size(), fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
if (r < 0) {
return;
@ -1684,10 +1693,14 @@ void CConnman::SocketEventsPoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &se
}
#endif
void CConnman::SocketEventsSelect(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
void CConnman::SocketEventsSelect(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
@ -1697,7 +1710,7 @@ void CConnman::SocketEventsSelect(std::set<SOCKET> &recv_set, std::set<SOCKET> &
//
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend
timeout.tv_usec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend
fd_set fdsetRecv;
fd_set fdsetSend;
@ -1759,26 +1772,30 @@ void CConnman::SocketEventsSelect(std::set<SOCKET> &recv_set, std::set<SOCKET> &
}
}
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll)
{
switch (socketEventsMode) {
#ifdef USE_KQUEUE
case SOCKETEVENTS_KQUEUE:
SocketEventsKqueue(recv_set, send_set, error_set, fOnlyPoll);
SocketEventsKqueue(recv_set, send_set, error_set, only_poll);
break;
#endif
#ifdef USE_EPOLL
case SOCKETEVENTS_EPOLL:
SocketEventsEpoll(recv_set, send_set, error_set, fOnlyPoll);
SocketEventsEpoll(recv_set, send_set, error_set, only_poll);
break;
#endif
#ifdef USE_POLL
case SOCKETEVENTS_POLL:
SocketEventsPoll(recv_set, send_set, error_set, fOnlyPoll);
SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll);
break;
#endif
case SOCKETEVENTS_SELECT:
SocketEventsSelect(recv_set, send_set, error_set, fOnlyPoll);
SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll);
break;
default:
assert(false);
@ -1787,15 +1804,20 @@ void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_s
void CConnman::SocketHandler(CMasternodeSync& mn_sync)
{
bool fOnlyPoll = [this]() {
// check if we have work to do and thus should avoid waiting for events
std::set<SOCKET> recv_set;
std::set<SOCKET> send_set;
std::set<SOCKET> error_set;
bool only_poll = [this]() {
// Check if we have work to do and thus should avoid waiting for events
LOCK2(cs_vNodes, cs_sendable_receivable_nodes);
if (!mapReceivableNodes.empty()) {
return true;
} else if (!mapSendableNodes.empty()) {
if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) {
// we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single
// node would be able to make the network thread busy with polling
// We must check if at least one of the nodes with pending messages is also
// sendable, as otherwise a single node would be able to make the network
// thread busy with polling.
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
return true;
@ -1807,8 +1829,14 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
return false;
}();
std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set, fOnlyPoll);
{
const NodesSnapshot snap{*this, /* filter = */ CConnman::AllNodes, /* shuffle = */ false};
// Check for the readiness of the already connected sockets and the
// listening sockets in one call ("readiness" as in poll(2) or
// select(2)). If none are ready, wait for a short while and return
// empty sets.
SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll);
#ifdef USE_WAKEUP_PIPE
// drain the wakeup pipe
@ -1823,19 +1851,20 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
}
#endif
if (interruptNet) return;
// Service (send/receive) each of the already connected nodes.
SocketHandlerConnected(recv_set, send_set, error_set);
}
//
// Accept new connections
//
for (const ListenSocket& hListenSocket : vhListenSocket)
{
if (recv_set.count(hListenSocket.socket) > 0)
{
AcceptConnection(hListenSocket, mn_sync);
}
// Accept new connections from listening sockets.
SocketHandlerListening(recv_set, mn_sync);
}
void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
const std::set<SOCKET>& send_set,
const std::set<SOCKET>& error_set)
{
if (interruptNet) return;
std::vector<CNode*> vErrorNodes;
std::vector<CNode*> vReceivableNodes;
std::vector<CNode*> vSendableNodes;
@ -1955,9 +1984,15 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
if (bytes_sent) RecordBytesSent(bytes_sent);
}
ReleaseNodeVector(vErrorNodes);
ReleaseNodeVector(vReceivableNodes);
ReleaseNodeVector(vSendableNodes);
for (auto& node : vErrorNodes) {
node->Release();
}
for (auto& node : vReceivableNodes) {
node->Release();
}
for (auto& node : vSendableNodes) {
node->Release();
}
if (interruptNet) {
return;
@ -1978,6 +2013,18 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
}
}
void CConnman::SocketHandlerListening(const std::set<SOCKET>& recv_set, CMasternodeSync& mn_sync)
{
for (const ListenSocket& listen_socket : vhListenSocket) {
if (interruptNet) {
return;
}
if (recv_set.count(listen_socket.socket) > 0) {
AcceptConnection(listen_socket, mn_sync);
}
}
}
size_t CConnman::SocketRecvData(CNode *pnode)
{
// typical socket buffer is 8K-64K
@ -2942,8 +2989,6 @@ void CConnman::ThreadMessageHandler()
FastRandomContext rng;
while (!flagInterruptMsgProc)
{
std::vector<CNode*> vNodesCopy = CopyNodeVector();
bool fMoreWork = false;
bool fSkipSendMessagesForMasternodes = true;
@ -2951,13 +2996,13 @@ void CConnman::ThreadMessageHandler()
fSkipSendMessagesForMasternodes = false;
nLastSendMessagesTimeMasternodes = GetTimeMillis();
}
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the vNodes list.
Shuffle(vNodesCopy.begin(), vNodesCopy.end(), rng);
const NodesSnapshot snap{*this, /* filter = */ CConnman::AllNodes, /* shuffle = */ true};
for (CNode* pnode : vNodesCopy)
{
for (CNode* pnode : snap.Nodes()) {
if (pnode->fDisconnect)
continue;
@ -2976,8 +3021,6 @@ void CConnman::ThreadMessageHandler()
return;
}
ReleaseNodeVector(vNodesCopy);
WAIT_LOCK(mutexMsgProc, lock);
if (!fMoreWork) {
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this]() EXCLUSIVE_LOCKS_REQUIRED(mutexMsgProc) { return fMsgProcWake; });
@ -4133,26 +4176,28 @@ std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::ch
return now + std::chrono::duration_cast<std::chrono::microseconds>(unscaled * average_interval + 0.5us);
}
std::vector<CNode*> CConnman::CopyNodeVector(std::function<bool(const CNode* pnode)> cond)
CConnman::NodesSnapshot::NodesSnapshot(const CConnman& connman, std::function<bool(const CNode* pnode)> filter,
bool shuffle)
{
std::vector<CNode*> vecNodesCopy;
LOCK(cs_vNodes);
vecNodesCopy.reserve(vNodes.size());
for(size_t i = 0; i < vNodes.size(); ++i) {
CNode* pnode = vNodes[i];
if (!cond(pnode))
LOCK(connman.cs_vNodes);
m_nodes_copy.reserve(connman.vNodes.size());
for (auto& node : connman.vNodes) {
if (!filter(node))
continue;
pnode->AddRef();
vecNodesCopy.push_back(pnode);
}
return vecNodesCopy;
node->AddRef();
m_nodes_copy.push_back(node);
}
void CConnman::ReleaseNodeVector(const std::vector<CNode*>& vecNodes)
if (shuffle) {
Shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{});
}
}
CConnman::NodesSnapshot::~NodesSnapshot()
{
for(size_t i = 0; i < vecNodes.size(); ++i) {
CNode* pnode = vecNodes[i];
pnode->Release();
for (auto& node : m_nodes_copy) {
node->Release();
}
}

View File

@ -1058,9 +1058,6 @@ public:
ForEachNodeThen(FullyConnectedOnly, pre, post);
}
std::vector<CNode*> CopyNodeVector(std::function<bool(const CNode* pnode)> cond = AllNodes);
void ReleaseNodeVector(const std::vector<CNode*>& vecNodes);
// Addrman functions
/**
* Return all or many randomly selected addresses, optionally by network.
@ -1182,6 +1179,26 @@ public:
/** Return true if we should disconnect the peer for failing an inactivity check. */
bool ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const;
/**
* RAII helper to atomically create a copy of `vNodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/
class NodesSnapshot
{
public:
explicit NodesSnapshot(const CConnman& connman, std::function<bool(const CNode* pnode)> cond = AllNodes,
bool shuffle = false);
~NodesSnapshot();
const std::vector<CNode*>& Nodes() const
{
return m_nodes_copy;
}
private:
std::vector<CNode*> m_nodes_copy;
};
private:
struct ListenSocket {
public:
@ -1226,19 +1243,81 @@ private:
void CalculateNumConnectionsChangedStats();
/** Return true if the peer is inactive and should be disconnected. */
bool InactivityCheck(const CNode& node) const;
bool GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
/**
* Generate a collection of sockets to check for IO readiness.
* @param[in] nodes Select from these nodes' sockets.
* @param[out] recv_set Sockets to check for read readiness.
* @param[out] send_set Sockets to check for write readiness.
* @param[out] error_set Sockets to check for errors.
* @return true if at least one socket is to be checked (the returned set is not empty)
*/
bool GenerateSelectSet(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set);
/**
* Check which sockets are ready for IO.
* @param[in] nodes Select from these nodes' sockets (in supported event methods).
* @param[in] only_poll Permit zero timeout polling
* @param[out] recv_set Sockets which are ready for read.
* @param[out] send_set Sockets which are ready for write.
* @param[out] error_set Sockets which have errors.
* This calls `GenerateSelectSet()` to gather a list of sockets to check.
*/
void SocketEvents(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll);
#ifdef USE_KQUEUE
void SocketEventsKqueue(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
void SocketEventsKqueue(std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll);
#endif
#ifdef USE_EPOLL
void SocketEventsEpoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
void SocketEventsEpoll(std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll);
#endif
#ifdef USE_POLL
void SocketEventsPoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
void SocketEventsPoll(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll);
#endif
void SocketEventsSelect(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
void SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
void SocketEventsSelect(const std::vector<CNode*>& nodes,
std::set<SOCKET>& recv_set,
std::set<SOCKET>& send_set,
std::set<SOCKET>& error_set,
bool only_poll);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
void SocketHandler(CMasternodeSync& mn_sync);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] recv_set Sockets that are ready for read.
* @param[in] send_set Sockets that are ready for send.
* @param[in] error_set Sockets that have an exceptional condition (error).
*/
void SocketHandlerConnected(const std::set<SOCKET>& recv_set,
const std::set<SOCKET>& send_set,
const std::set<SOCKET>& error_set);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] recv_set Sockets that are ready for read.
*/
void SocketHandlerListening(const std::set<SOCKET>& recv_set, CMasternodeSync& mn_sync);
void ThreadSocketHandler(CMasternodeSync& mn_sync);
void ThreadDNSAddressSeed();
void ThreadOpenMasternodeConnections(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,