Merge #6007: refactor: move {epoll, kqueue} (de)init logic and wakeup pipes logic out of CConnman and into EdgeTriggeredEvents and WakeupPipes

bd8b5d4007 net: add more details to log information in ETE and `WakeupPipes` (Kittywhiskers Van Gogh)
ec99294976 net: restrict access `EdgeTriggerEvents` members (Kittywhiskers Van Gogh)
f24520a3a2 net: log `close` failures in `EdgeTriggerEvents` and `WakeupPipe` (Kittywhiskers Van Gogh)
b8c3b480eb refactor: introduce `WakeupPipe`, move wakeup select pipe logic there (Kittywhiskers Van Gogh)
ed7d976c3e refactor: move wakeup pipe (de)registration to ETE (Kittywhiskers Van Gogh)
f50c710028 refactor: move `CConnman::`(`Un`)`registerEvents` to ETE (Kittywhiskers Van Gogh)
3a9f386138 refactor: move `SOCKET` addition/removal from interest list to ETE (Kittywhiskers Van Gogh)
212df0677f refactor: introduce `EdgeTriggeredEvents`, move {epoll, kqueue} fd there (Kittywhiskers Van Gogh)
3b11ef9b89 refactor: move `CConnman::SocketEventsMode` to `util/sock.h` (Kittywhiskers Van Gogh)

Pull request description:

  ## Motivation

  `CConnman` is an entity that contains a lot of platform-specific implementation logic, both inherited from upstream and added upon by Dash (support for edge-triggered socket events modes like `epoll` on Linux and `kqueue` on FreeBSD/Darwin).

  Bitcoin has since moved to strip down `CConnman` by moving peer-related logic to the `Peer` struct in `net_processing` (portions of which are backported in #5982 and friends, tracking efforts from https://github.com/bitcoin/bitcoin/issues/19398) and moving socket-related logic to `Sock` (portions of which are aimed to be backported in #6004, tracking efforts from https://github.com/bitcoin/bitcoin/pull/21878).

  Due to the direction being taken and the difference in how edge-triggered events modes operate (utilizing interest lists and events instead of iterating over each socket) in comparison to level-triggered modes (which are inherited from upstream), it would be reasonable to therefore, isolate Dash-specific code into its own entities and minimize the information `CConnman` has about its internal workings.

  One of the visible benefits of this approach is comparing `develop` (as of this writing, d44b0d5dcb) and this pull request for interactions between wakeup pipes logic and {`epoll`, `kqueue`} logic.

  This is what construction looks like:

  d44b0d5dcb/src/net.cpp (L3358-L3397)

  But, if we segment wakeup pipes logic (that work on any platform with POSIX APIs and excludes Windows) and {`epoll`, `kqueue`} logic (calling them `EdgeTriggeredEvents` instead), construction looks different:

  907a351517/src/util/wpipe.cpp (L12-L38)

  Now wakeup pipes logic doesn't need to know what socket events mode is being used nor are the implementation aspects of (de)registering it its concern, that is now `EdgeTriggeredEvents` problem.

  ## Additional Information

  * This pull request will need testing on macOS (FreeBSD isn't a tier-one target) to ensure that lack of breakage in `kqueue`-specific logic.

  ## Breaking Changes

  * Dependency for https://github.com/dashpay/dash/pull/6018
  * More logging has been introduced and existing log messages have been made more exhaustive. If there is parsing that relies on a particular template, they will have to be updated.
  * If `EdgeTriggeredEvents` or `WakeupPipes` fail to initialize or are incorrectly initialized and not destroyed immediately, any further attempts at calling any of its functions will result in an `assert`-induced crash. Earlier behavior may have allowed for silent failure but segmentation of logic from `CConnman` means the newly created instances must only exist if the circumstances needed for it to initialize correctly are present.

    This is to ensure that `CConnman` doesn't have to concern itself with internal workings of either entities.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas
  - [x] I have added or updated relevant unit/integration/functional/e2e tests **(note: N/A)**
  - [x] I have made corresponding changes to the documentation **(note: N/A)**
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  PastaPastaPasta:
    utACK bd8b5d4007

Tree-SHA512: 8f793d4b4f2d8091e05bb9cc108013e924bbfbf19081290d9c0dfd91b0f2c80652ccf853f1596562942b4433509149c526e111396937988db605707ae1fe7366
This commit is contained in:
pasta 2024-05-14 11:31:06 -05:00
commit 3b0323a683
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
10 changed files with 601 additions and 313 deletions

View File

@ -327,6 +327,7 @@ BITCOIN_CORE_H = \
util/bip32.h \ util/bip32.h \
util/bytevectorhash.h \ util/bytevectorhash.h \
util/check.h \ util/check.h \
util/edge.h \
util/enumerate.h \ util/enumerate.h \
util/epochguard.h \ util/epochguard.h \
util/error.h \ util/error.h \
@ -362,6 +363,7 @@ BITCOIN_CORE_H = \
util/ui_change_type.h \ util/ui_change_type.h \
util/url.h \ util/url.h \
util/vector.h \ util/vector.h \
util/wpipe.h \
validation.h \ validation.h \
validationinterface.h \ validationinterface.h \
versionbits.h \ versionbits.h \
@ -776,6 +778,7 @@ libbitcoin_util_a_SOURCES = \
util/bip32.cpp \ util/bip32.cpp \
util/bytevectorhash.cpp \ util/bytevectorhash.cpp \
util/check.cpp \ util/check.cpp \
util/edge.cpp \
util/error.cpp \ util/error.cpp \
util/fees.cpp \ util/fees.cpp \
util/hasher.cpp \ util/hasher.cpp \
@ -795,6 +798,7 @@ libbitcoin_util_a_SOURCES = \
util/thread.cpp \ util/thread.cpp \
util/threadnames.cpp \ util/threadnames.cpp \
util/tokenpipe.cpp \ util/tokenpipe.cpp \
util/wpipe.cpp \
$(BITCOIN_CORE_H) $(BITCOIN_CORE_H)
if USE_LIBEVENT if USE_LIBEVENT

View File

@ -2515,23 +2515,9 @@ bool AppInitMain(const CoreContext& context, NodeContext& node, interfaces::Bloc
} }
} }
std::string strSocketEventsMode = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS);
if (strSocketEventsMode == "select") { if (SEMFromString(sem_str) == SocketEventsMode::Unknown) {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_SELECT; return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr()));
#ifdef USE_POLL
} else if (strSocketEventsMode == "poll") {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_POLL;
#endif
#ifdef USE_EPOLL
} else if (strSocketEventsMode == "epoll") {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_EPOLL;
#endif
#ifdef USE_KQUEUE
} else if (strSocketEventsMode == "kqueue") {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_KQUEUE;
#endif
} else {
return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), strSocketEventsMode, GetSupportedSocketEventsStr()));
} }
const std::string& i2psam_arg = args.GetArg("-i2psam", ""); const std::string& i2psam_arg = args.GetArg("-i2psam", "");

View File

@ -29,6 +29,7 @@
#include <util/thread.h> #include <util/thread.h>
#include <util/time.h> #include <util/time.h>
#include <util/translation.h> #include <util/translation.h>
#include <util/wpipe.h>
#include <validation.h> // for fDIP0001ActiveAtTip #include <validation.h> // for fDIP0001ActiveAtTip
#include <masternode/meta.h> #include <masternode/meta.h>
@ -119,7 +120,7 @@ static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50;
// We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in // We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in
// the socket handler thread // the socket handler thread
static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500; static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500;
#endif #endif /* USE_WAKEUP_PIPE */
const std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; const std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
@ -563,7 +564,9 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
} }
} }
connman->UnregisterEvents(this); if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n");
}
LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
CloseSocket(hSocket); CloseSocket(hSocket);
@ -1276,8 +1279,15 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
LOCK(m_nodes_mutex); LOCK(m_nodes_mutex);
m_nodes.push_back(pnode); m_nodes.push_back(pnode);
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode)); WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode));
RegisterEvents(pnode); if (m_edge_trig_events) {
WakeSelect(); LOCK(pnode->cs_hSocket);
if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
if (m_wakeup_pipe) {
m_wakeup_pipe->Write();
}
} }
// We received a new connection, harvest entropy from the time (and our peer count) // We received a new connection, harvest entropy from the time (and our peer count)
@ -1562,14 +1572,14 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
} }
} }
#ifdef USE_WAKEUP_PIPE if (m_wakeup_pipe) {
// We add a pipe to the read set so that the select() call can be woken up from the outside // We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is added to send buffers (vSendMsg) or when new peers are added // This is done when data is added to send buffers (vSendMsg) or when new peers are added
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends. // run on Linux and friends.
recv_set.insert(wakeupPipe[0]); recv_set.insert(m_wakeup_pipe->m_pipe[0]);
#endif }
return !recv_set.empty() || !send_set.empty() || !error_set.empty(); return !recv_set.empty() || !send_set.empty() || !error_set.empty();
} }
@ -1587,9 +1597,8 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
wakeupSelectNeeded = true; int n{-1};
int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout); ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events, maxEvents, &timeout);});
wakeupSelectNeeded = false;
if (n == -1) { if (n == -1) {
LogPrintf("kevent wait error\n"); LogPrintf("kevent wait error\n");
} else if (n > 0) { } else if (n > 0) {
@ -1621,9 +1630,8 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
const size_t maxEvents = 64; const size_t maxEvents = 64;
epoll_event events[maxEvents]; epoll_event events[maxEvents];
wakeupSelectNeeded = true; int n{-1};
int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
wakeupSelectNeeded = false;
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto& e = events[i]; auto& e = events[i];
if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) {
@ -1678,9 +1686,8 @@ void CConnman::SocketEventsPoll(const std::vector<CNode*>& nodes,
vpollfds.push_back(std::move(it.second)); vpollfds.push_back(std::move(it.second));
} }
wakeupSelectNeeded = true; int r{-1};
int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
wakeupSelectNeeded = false;
if (r < 0) { if (r < 0) {
return; return;
} }
@ -1737,9 +1744,8 @@ void CConnman::SocketEventsSelect(const std::vector<CNode*>& nodes,
hSocketMax = std::max(hSocketMax, hSocket); hSocketMax = std::max(hSocketMax, hSocket);
} }
wakeupSelectNeeded = true; int nSelect{-1};
int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);});
wakeupSelectNeeded = false;
if (interruptNet) if (interruptNet)
return; return;
@ -1782,21 +1788,21 @@ void CConnman::SocketEvents(const std::vector<CNode*>& nodes,
{ {
switch (socketEventsMode) { switch (socketEventsMode) {
#ifdef USE_KQUEUE #ifdef USE_KQUEUE
case SOCKETEVENTS_KQUEUE: case SocketEventsMode::KQueue:
SocketEventsKqueue(recv_set, send_set, error_set, only_poll); SocketEventsKqueue(recv_set, send_set, error_set, only_poll);
break; break;
#endif #endif
#ifdef USE_EPOLL #ifdef USE_EPOLL
case SOCKETEVENTS_EPOLL: case SocketEventsMode::EPoll:
SocketEventsEpoll(recv_set, send_set, error_set, only_poll); SocketEventsEpoll(recv_set, send_set, error_set, only_poll);
break; break;
#endif #endif
#ifdef USE_POLL #ifdef USE_POLL
case SOCKETEVENTS_POLL: case SocketEventsMode::Poll:
SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll);
break; break;
#endif #endif
case SOCKETEVENTS_SELECT: case SocketEventsMode::Select:
SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll);
break; break;
default: default:
@ -1842,18 +1848,10 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
// empty sets. // empty sets.
SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll);
#ifdef USE_WAKEUP_PIPE // Drain the wakeup pipe
// drain the wakeup pipe if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) {
if (recv_set.count(wakeupPipe[0])) { m_wakeup_pipe->Drain();
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
} }
#endif
// Service (send/receive) each of the already connected nodes. // Service (send/receive) each of the already connected nodes.
SocketHandlerConnected(recv_set, send_set, error_set); SocketHandlerConnected(recv_set, send_set, error_set);
@ -2131,22 +2129,6 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one(); condMsgProc.notify_one();
} }
void CConnman::WakeSelect()
{
#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[1] == -1) {
return;
}
char buf{0};
if (write(wakeupPipe[1], &buf, sizeof(buf)) != 1) {
LogPrint(BCLog::NET, "write to wakeupPipe failed\n");
}
#endif
wakeupSelectNeeded = false;
}
void CConnman::ThreadDNSAddressSeed() void CConnman::ThreadDNSAddressSeed()
{ {
FastRandomContext rng; FastRandomContext rng;
@ -2980,8 +2962,15 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
{ {
LOCK(m_nodes_mutex); LOCK(m_nodes_mutex);
m_nodes.push_back(pnode); m_nodes.push_back(pnode);
RegisterEvents(pnode); if (m_edge_trig_events) {
WakeSelect(); LOCK(pnode->cs_hSocket);
if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
if (m_wakeup_pipe) {
m_wakeup_pipe->Write();
}
} }
} }
@ -3132,30 +3121,10 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
return false; return false;
} }
#ifdef USE_KQUEUE if (m_edge_trig_events && !m_edge_trig_events->AddSocket(sock->Get())) {
if (socketEventsMode == SOCKETEVENTS_KQUEUE) { LogPrintf("Error: EdgeTriggeredEvents::AddSocket() failed\n");
struct kevent event; return false;
EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
} }
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_event event;
event.data.fd = sock->Get();
event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) {
strError = strprintf(_("Error: failed to add socket to epollfd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
}
#endif
vhListenSocket.push_back(ListenSocket(sock->Release(), permissions)); vhListenSocket.push_back(ListenSocket(sock->Release(), permissions));
@ -3301,25 +3270,14 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
AssertLockNotHeld(m_total_bytes_sent_mutex); AssertLockNotHeld(m_total_bytes_sent_mutex);
Init(connOptions); Init(connOptions);
#ifdef USE_KQUEUE if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) {
if (socketEventsMode == SOCKETEVENTS_KQUEUE) { m_edge_trig_events = std::make_unique<EdgeTriggeredEvents>(socketEventsMode);
kqueuefd = kqueue(); if (!m_edge_trig_events->IsValid()) {
if (kqueuefd == -1) { LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n");
LogPrintf("kqueue failed\n"); m_edge_trig_events.reset();
return false; return false;
} }
} }
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epollfd = epoll_create1(0);
if (epollfd == -1) {
LogPrintf("epoll_create1 failed\n");
return false;
}
}
#endif
if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) { if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) {
if (clientInterface) { if (clientInterface) {
@ -3392,45 +3350,13 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
} }
#ifdef USE_WAKEUP_PIPE #ifdef USE_WAKEUP_PIPE
if (pipe(wakeupPipe) != 0) { m_wakeup_pipe = std::make_unique<WakeupPipe>(m_edge_trig_events.get());
wakeupPipe[0] = wakeupPipe[1] = -1; if (!m_wakeup_pipe->IsValid()) {
LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n"); /* We log the error but do not halt initialization */
} else { LogPrintf("Unable to initialize WakeupPipe instance\n");
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); m_wakeup_pipe.reset();
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_event event;
event.events = EPOLLIN;
event.data.fd = wakeupPipe[0];
int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, wakeupPipe[0], &event);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
#endif
} }
#endif #endif /* USE_WAKEUP_PIPE */
// Send and receive from sockets, accept connections // Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); }); threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); });
@ -3564,23 +3490,15 @@ void CConnman::StopNodes()
for (CNode *pnode : m_nodes) for (CNode *pnode : m_nodes)
pnode->CloseSocketDisconnect(this); pnode->CloseSocketDisconnect(this);
} }
for (ListenSocket& hListenSocket : vhListenSocket) for (ListenSocket& hListenSocket : vhListenSocket) {
if (hListenSocket.socket != INVALID_SOCKET) { if (hListenSocket.socket != INVALID_SOCKET) {
#ifdef USE_KQUEUE if (m_edge_trig_events && !m_edge_trig_events->RemoveSocket(hListenSocket.socket)) {
if (socketEventsMode == SOCKETEVENTS_KQUEUE) { LogPrintf("EdgeTriggeredEvents::RemoveSocket() failed\n");
struct kevent event;
EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
} }
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
}
#endif
if (!CloseSocket(hListenSocket.socket)) if (!CloseSocket(hListenSocket.socket))
LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError())); LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError()));
} }
}
// clean up some globals (to help leak detection) // clean up some globals (to help leak detection)
std::vector<CNode*> nodes; std::vector<CNode*> nodes;
@ -3604,33 +3522,12 @@ void CConnman::StopNodes()
vhListenSocket.clear(); vhListenSocket.clear();
semOutbound.reset(); semOutbound.reset();
semAddnode.reset(); semAddnode.reset();
/**
#ifdef USE_KQUEUE * m_wakeup_pipe must be reset *before* m_edge_trig_events as it may
if (socketEventsMode == SOCKETEVENTS_KQUEUE && kqueuefd != -1) { * attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor
#ifdef USE_WAKEUP_PIPE */
struct kevent event; m_wakeup_pipe.reset();
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr); m_edge_trig_events.reset();
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
#endif
close(kqueuefd);
}
kqueuefd = -1;
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL && epollfd != -1) {
#ifdef USE_WAKEUP_PIPE
epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr);
#endif
close(epollfd);
}
epollfd = -1;
#endif
#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
} }
void CConnman::DeleteNode(CNode* pnode) void CConnman::DeleteNode(CNode* pnode)
@ -4143,8 +4040,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
} }
// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
if (!hasPendingData && wakeupSelectNeeded) if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
WakeSelect(); m_wakeup_pipe->Write();
} }
} }
@ -4234,79 +4131,6 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
} }
void CConnman::RegisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
LOCK(pnode->cs_hSocket);
assert(pnode->hSocket != INVALID_SOCKET);
struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
LOCK(pnode->cs_hSocket);
assert(pnode->hSocket != INVALID_SOCKET);
epoll_event e;
// We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = pnode->hSocket;
int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pnode->hSocket, &e);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
}
void CConnman::UnregisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
AssertLockHeld(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
return;
}
struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
AssertLockHeld(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
return;
}
int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
}
void CaptureMessageToFile(const CAddress& addr, void CaptureMessageToFile(const CAddress& addr,
const std::string& msg_type, const std::string& msg_type,
Span<const unsigned char> data, Span<const unsigned char> data,

View File

@ -28,9 +28,11 @@
#include <sync.h> #include <sync.h>
#include <threadinterrupt.h> #include <threadinterrupt.h>
#include <uint256.h> #include <uint256.h>
#include <util/system.h>
#include <consensus/params.h>
#include <util/check.h> #include <util/check.h>
#include <util/edge.h>
#include <util/system.h>
#include <util/wpipe.h>
#include <consensus/params.h>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
@ -44,10 +46,6 @@
#include <queue> #include <queue>
#include <vector> #include <vector>
#ifndef WIN32
#define USE_WAKEUP_PIPE
#endif
class CConnman; class CConnman;
class CDeterministicMNList; class CDeterministicMNList;
class CDeterministicMNManager; class CDeterministicMNManager;
@ -822,13 +820,6 @@ class CConnman
{ {
friend class CNode; friend class CNode;
public: public:
enum SocketEventsMode {
SOCKETEVENTS_SELECT = 0,
SOCKETEVENTS_POLL = 1,
SOCKETEVENTS_EPOLL = 2,
SOCKETEVENTS_KQUEUE = 3,
};
struct Options struct Options
{ {
ServiceFlags nLocalServices = NODE_NONE; ServiceFlags nLocalServices = NODE_NONE;
@ -852,7 +843,7 @@ public:
bool m_use_addrman_outgoing = true; bool m_use_addrman_outgoing = true;
std::vector<std::string> m_specified_outgoing; std::vector<std::string> m_specified_outgoing;
std::vector<std::string> m_added_nodes; std::vector<std::string> m_added_nodes;
SocketEventsMode socketEventsMode = SOCKETEVENTS_SELECT; SocketEventsMode socketEventsMode = SocketEventsMode::Select;
std::vector<bool> m_asmap; std::vector<bool> m_asmap;
bool m_i2p_accept_incoming; bool m_i2p_accept_incoming;
}; };
@ -1174,7 +1165,6 @@ public:
unsigned int GetReceiveFloodSize() const; unsigned int GetReceiveFloodSize() const;
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
/** Attempts to obfuscate tx time through exponentially distributed emitting. /** Attempts to obfuscate tx time through exponentially distributed emitting.
Works assuming that a single interval is used. Works assuming that a single interval is used.
@ -1377,9 +1367,6 @@ private:
// Whether the node should be passed out in ForEach* callbacks // Whether the node should be passed out in ForEach* callbacks
static bool NodeFullyConnected(const CNode* pnode); static bool NodeFullyConnected(const CNode* pnode);
void RegisterEvents(CNode* pnode);
void UnregisterEvents(CNode* pnode);
// Network usage totals // Network usage totals
mutable Mutex m_total_bytes_sent_mutex; mutable Mutex m_total_bytes_sent_mutex;
std::atomic<uint64_t> nTotalBytesRecv{0}; std::atomic<uint64_t> nTotalBytesRecv{0};
@ -1514,19 +1501,19 @@ private:
*/ */
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session; std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
#ifdef USE_WAKEUP_PIPE
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif
std::atomic<bool> wakeupSelectNeeded{false};
SocketEventsMode socketEventsMode; SocketEventsMode socketEventsMode;
#ifdef USE_KQUEUE std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};
int kqueuefd{-1}; std::unique_ptr<WakeupPipe> m_wakeup_pipe{nullptr};
#endif
#ifdef USE_EPOLL template <typename Callable>
int epollfd{-1}; void ToggleWakeupPipe(Callable&& func)
#endif {
if (m_wakeup_pipe) {
m_wakeup_pipe->Toggle(func);
} else {
func();
}
}
Mutex cs_sendable_receivable_nodes; Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);

View File

@ -669,24 +669,9 @@ static RPCHelpMan getnetworkinfo()
obj.pushKV("connections_mn", (int)node.connman->GetNodeCount(ConnectionDirection::Verified)); obj.pushKV("connections_mn", (int)node.connman->GetNodeCount(ConnectionDirection::Verified));
obj.pushKV("connections_mn_in", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedIn)); obj.pushKV("connections_mn_in", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedIn));
obj.pushKV("connections_mn_out", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedOut)); obj.pushKV("connections_mn_out", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedOut));
std::string strSocketEvents; std::string sem_str = SEMToString(node.connman->GetSocketEventsMode());
switch (node.connman->GetSocketEventsMode()) { CHECK_NONFATAL(sem_str != "unknown");
case CConnman::SOCKETEVENTS_SELECT: obj.pushKV("socketevents", sem_str);
strSocketEvents = "select";
break;
case CConnman::SOCKETEVENTS_POLL:
strSocketEvents = "poll";
break;
case CConnman::SOCKETEVENTS_EPOLL:
strSocketEvents = "epoll";
break;
case CConnman::SOCKETEVENTS_KQUEUE:
strSocketEvents = "kqueue";
break;
default:
CHECK_NONFATAL(false);
}
obj.pushKV("socketevents", strSocketEvents);
} }
obj.pushKV("networks", GetNetworksInfo()); obj.pushKV("networks", GetNetworksInfo());
obj.pushKV("relayfee", ValueFromAmount(::minRelayTxFee.GetFeePerK())); obj.pushKV("relayfee", ValueFromAmount(::minRelayTxFee.GetFeePerK()));

239
src/util/edge.cpp Normal file
View File

@ -0,0 +1,239 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <util/edge.h>
#include <logging.h>
#include <util/sock.h>
#ifdef USE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef USE_KQUEUE
#include <sys/event.h>
#endif
EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode)
: m_mode(events_mode)
{
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
m_fd = epoll_create1(0);
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1 with error %s\n",
NetworkErrorString(WSAGetLastError()));
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for epoll without support compiled in!\n");
return;
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
m_fd = kqueue();
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1 with error %s\n",
NetworkErrorString(WSAGetLastError()));
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for kqueue without support compiled in!\n");
return;
#endif /* USE_KQUEUE */
} else {
assert(false);
}
m_valid = true;
}
EdgeTriggeredEvents::~EdgeTriggeredEvents()
{
if (m_valid) {
#if defined(USE_KQUEUE) || defined(USE_EPOLL)
if (close(m_fd) != 0) {
LogPrintf("Destroying EdgeTriggeredEvents instance, close() failed for m_fd = %d with error %s\n", m_fd,
NetworkErrorString(WSAGetLastError()));
}
#else
assert(false);
#endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */
}
}
bool EdgeTriggeredEvents::RegisterEntity(int entity, std::string entity_name) const
{
assert(m_valid);
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
epoll_event event;
event.data.fd = entity;
event.events = EPOLLIN;
if (epoll_ctl(m_fd, EPOLL_CTL_ADD, entity, &event) != 0) {
LogPrintf("Failed to add %s to epoll fd (epoll_ctl returned error %s)\n", entity_name,
NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent event;
EV_SET(&event, entity, EVFILT_READ, EV_ADD, 0, 0, nullptr);
if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to add %s to kqueue fd (kevent returned error %s)\n", entity_name,
NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}
bool EdgeTriggeredEvents::UnregisterEntity(int entity, std::string entity_name) const
{
assert(m_valid);
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
if (epoll_ctl(m_fd, EPOLL_CTL_DEL, entity, nullptr) != 0) {
LogPrintf("Failed to remove %s from epoll fd (epoll_ctl returned error %s)\n", entity_name,
NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent event;
EV_SET(&event, entity, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to remove %s from kqueue fd (kevent returned error %s)\n", entity_name,
NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}
bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const
{
return RegisterEntity(socket, "socket");
}
bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const
{
return UnregisterEntity(socket, "socket");
}
bool EdgeTriggeredEvents::RegisterPipe(int wakeup_pipe)
{
if (m_pipe_registered) {
LogPrintf("Pipe already registered, ignoring new registration request\n");
return false;
}
bool ret = RegisterEntity(wakeup_pipe, "wakeup pipe");
if (ret) m_pipe_registered = true;
return ret;
}
bool EdgeTriggeredEvents::UnregisterPipe(int wakeup_pipe)
{
if (!m_pipe_registered) {
LogPrintf("No pipe currently registered to unregister, ignoring request\n");
return false;
}
bool ret = UnregisterEntity(wakeup_pipe, "wakeup pipe");
if (ret) m_pipe_registered = false;
return ret;
}
bool EdgeTriggeredEvents::RegisterEvents(SOCKET socket) const
{
assert(m_valid && socket != INVALID_SOCKET);
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
epoll_event e;
// We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = socket;
if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &e) != 0) {
LogPrintf("Failed to register events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n",
m_fd, EPOLL_CTL_ADD, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent events[2];
EV_SET(&events[0], socket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], socket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to register events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n",
m_fd, EV_ADD, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}
bool EdgeTriggeredEvents::UnregisterEvents(SOCKET socket) const
{
assert(m_valid);
if (socket == INVALID_SOCKET) {
LogPrintf("Cannot unregister events for invalid socket\n");
return false;
}
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) {
LogPrintf("Failed to unregister events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n",
m_fd, EPOLL_CTL_DEL, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent events[2];
EV_SET(&events[0], socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], socket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to unregister events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n",
m_fd, EV_DELETE, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}

61
src/util/edge.h Normal file
View File

@ -0,0 +1,61 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_EDGE_H
#define BITCOIN_UTIL_EDGE_H
#include <compat.h>
#include <assert.h>
#include <cstdint>
#include <string>
enum class SocketEventsMode : int8_t;
/**
* A manager for abstracting logic surrounding edge-triggered socket events
* modes like kqueue and epoll.
*/
class EdgeTriggeredEvents
{
public:
explicit EdgeTriggeredEvents(SocketEventsMode events_mode);
~EdgeTriggeredEvents();
bool IsValid() const { return m_valid; }
int GetFileDescriptor() const { assert(m_fd != -1); return m_fd; }
/* Add socket to interest list */
bool AddSocket(SOCKET socket) const;
/* Remove socket from interest list */
bool RemoveSocket(SOCKET socket) const;
/* Register events for socket */
bool RegisterEvents(SOCKET socket) const;
/* Unregister events for socket */
bool UnregisterEvents(SOCKET socket) const;
private:
friend class WakeupPipe;
/* Register wakeup pipe with EdgeTriggeredEvents instance */
bool RegisterPipe(int wakeup_pipe);
/* Unregister wakeup pipe with EdgeTriggeredEvents instance */
bool UnregisterPipe(int wakeup_pipe);
private:
bool RegisterEntity(int entity, std::string entity_name) const;
bool UnregisterEntity(int entity, std::string entity_name) const;
private:
/* Flag set if pipe has been registered with instance */
bool m_pipe_registered{false};
/* Instance validity flag set during construction */
bool m_valid{false};
/* Flag for storing selected socket events mode */
SocketEventsMode m_mode;
/* File descriptor used to interact with events mode */
int m_fd{-1};
};
#endif /* BITCOIN_UTIL_EDGE_H */

View File

@ -18,6 +18,54 @@
*/ */
static constexpr auto MAX_WAIT_FOR_IO = 1s; static constexpr auto MAX_WAIT_FOR_IO = 1s;
enum class SocketEventsMode : int8_t {
Select = 0,
Poll = 1,
EPoll = 2,
KQueue = 3,
Unknown = -1
};
/* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */
static std::string SEMToString(const SocketEventsMode val)
{
switch (val) {
case (SocketEventsMode::Select):
return "select";
#ifdef USE_POLL
case (SocketEventsMode::Poll):
return "poll";
#endif /* USE_POLL */
#ifdef USE_EPOLL
case (SocketEventsMode::EPoll):
return "epoll";
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
case (SocketEventsMode::KQueue):
return "kqueue";
#endif /* USE_KQUEUE */
default:
return "unknown";
};
}
/* Converts string to SocketEventsMode value with additional check to report modes not compiled for as unknown */
static SocketEventsMode SEMFromString(const std::string str)
{
if (str == "select") { return SocketEventsMode::Select; }
#ifdef USE_POLL
else if (str == "poll") { return SocketEventsMode::Poll; }
#endif /* USE_POLL */
#ifdef USE_EPOLL
else if (str == "epoll") { return SocketEventsMode::EPoll; }
#endif /* USE_EPOLL */
#ifdef USE_KQUEUE
else if (str == "kqueue") { return SocketEventsMode::KQueue; }
#endif /* USE_KQUEUE */
else { return SocketEventsMode::Unknown; }
}
/** /**
* RAII helper class that manages a socket. Mimics `std::unique_ptr`, but instead of a pointer it * RAII helper class that manages a socket. Mimics `std::unique_ptr`, but instead of a pointer it
* contains a socket and closes it automatically when it goes out of scope. * contains a socket and closes it automatically when it goes out of scope.

95
src/util/wpipe.cpp Normal file
View File

@ -0,0 +1,95 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <util/wpipe.h>
#include <logging.h>
#include <util/edge.h>
#include <util/sock.h>
static constexpr int EXPECTED_PIPE_WRITTEN_BYTES = 1;
WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events)
: m_edge_trig_events{edge_trig_events}
{
#ifdef USE_WAKEUP_PIPE
if (pipe(m_pipe.data()) != 0) {
LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed with error %s\n",
NetworkErrorString(WSAGetLastError()));
return;
}
for (size_t idx = 0; idx < m_pipe.size(); idx++) {
int flags = fcntl(m_pipe[idx], F_GETFL, 0);
if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) {
LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed with error %s\n", idx,
NetworkErrorString(WSAGetLastError()));
return;
}
}
if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) {
LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed for m_pipe[0] = %d\n",
m_pipe[0]);
return;
}
m_valid = true;
#else
LogPrintf("Attempting to initialize WakeupPipe without support compiled in!\n");
#endif /* USE_WAKEUP_PIPE */
}
WakeupPipe::~WakeupPipe()
{
if (m_valid) {
#ifdef USE_WAKEUP_PIPE
if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) {
LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n",
m_pipe[0]);
}
for (size_t idx = 0; idx < m_pipe.size(); idx++) {
if (close(m_pipe[idx]) != 0) {
LogPrintf("Destroying WakeupPipe instance, close() failed for m_pipe[%d] = %d with error %s\n",
idx, m_pipe[idx], NetworkErrorString(WSAGetLastError()));
}
}
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}
}
void WakeupPipe::Drain() const
{
#ifdef USE_WAKEUP_PIPE
assert(m_valid && m_pipe[0] != -1);
int ret{0};
std::array<uint8_t, 128> buf;
do {
ret = read(m_pipe[0], buf.data(), buf.size());
} while (ret > 0);
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}
void WakeupPipe::Write()
{
#ifdef USE_WAKEUP_PIPE
assert(m_valid && m_pipe[1] != -1);
std::array<uint8_t, EXPECTED_PIPE_WRITTEN_BYTES> buf;
int ret = write(m_pipe[1], buf.data(), buf.size());
if (ret == -1) {
LogPrintf("write() to m_pipe[1] = %d failed with error %s\n", m_pipe[1], NetworkErrorString(WSAGetLastError()));
}
if (ret != EXPECTED_PIPE_WRITTEN_BYTES) {
LogPrintf("write() to m_pipe[1] = %d succeeded with unexpected result %d (expected %d)\n", m_pipe[1], ret,
EXPECTED_PIPE_WRITTEN_BYTES);
}
m_need_wakeup = false;
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}

59
src/util/wpipe.h Normal file
View File

@ -0,0 +1,59 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_WPIPE_H
#define BITCOIN_UTIL_WPIPE_H
#include <array>
#include <assert.h>
#include <atomic>
#ifndef WIN32
#define USE_WAKEUP_PIPE
#endif
class EdgeTriggeredEvents;
/**
* A manager for abstracting logic surrounding wakeup pipes. Supported only on
* platforms with a POSIX API. Disabled on Windows.
*/
class WakeupPipe
{
public:
explicit WakeupPipe(EdgeTriggeredEvents* edge_trig_events);
~WakeupPipe();
bool IsValid() const { return m_valid; };
/* Drain pipe of all contents */
void Drain() const;
/* Write a byte to the pipe */
void Write();
/* Used to wrap calls around m_need_wakeup toggling */
template <typename Callable>
void Toggle(Callable&& func)
{
assert(m_valid);
m_need_wakeup = true;
func();
m_need_wakeup = false;
}
public:
/* File descriptors for read and write data channels */
std::array<int, 2> m_pipe{{ -1, -1 }};
/* Flag used to determine if Write() needs to be called. Used occasionally */
std::atomic<bool> m_need_wakeup{false};
private:
/* Instance validity flag set during construction */
bool m_valid{false};
/* Pointer to EdgeTriggeredEvents instance used for pipe (de)registration if using supported events modes */
EdgeTriggeredEvents* m_edge_trig_events{nullptr};
};
#endif /* BITCOIN_UTIL_WPIPE_H */