diff --git a/src/Makefile.am b/src/Makefile.am index 472d88aa2c..03e2458813 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -327,6 +327,7 @@ BITCOIN_CORE_H = \ util/bip32.h \ util/bytevectorhash.h \ util/check.h \ + util/edge.h \ util/enumerate.h \ util/epochguard.h \ util/error.h \ @@ -362,6 +363,7 @@ BITCOIN_CORE_H = \ util/ui_change_type.h \ util/url.h \ util/vector.h \ + util/wpipe.h \ validation.h \ validationinterface.h \ versionbits.h \ @@ -776,6 +778,7 @@ libbitcoin_util_a_SOURCES = \ util/bip32.cpp \ util/bytevectorhash.cpp \ util/check.cpp \ + util/edge.cpp \ util/error.cpp \ util/fees.cpp \ util/hasher.cpp \ @@ -795,6 +798,7 @@ libbitcoin_util_a_SOURCES = \ util/thread.cpp \ util/threadnames.cpp \ util/tokenpipe.cpp \ + util/wpipe.cpp \ $(BITCOIN_CORE_H) if USE_LIBEVENT diff --git a/src/init.cpp b/src/init.cpp index d52ab19225..c88a6c0af2 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -2515,23 +2515,9 @@ bool AppInitMain(const CoreContext& context, NodeContext& node, interfaces::Bloc } } - std::string strSocketEventsMode = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); - if (strSocketEventsMode == "select") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_SELECT; -#ifdef USE_POLL - } else if (strSocketEventsMode == "poll") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_POLL; -#endif -#ifdef USE_EPOLL - } else if (strSocketEventsMode == "epoll") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_EPOLL; -#endif -#ifdef USE_KQUEUE - } else if (strSocketEventsMode == "kqueue") { - connOptions.socketEventsMode = CConnman::SOCKETEVENTS_KQUEUE; -#endif - } else { - return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), strSocketEventsMode, GetSupportedSocketEventsStr())); + std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + if (SEMFromString(sem_str) == SocketEventsMode::Unknown) { + return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); } const std::string& i2psam_arg = args.GetArg("-i2psam", ""); diff --git a/src/net.cpp b/src/net.cpp index 90957d5f53..0426396257 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include // for fDIP0001ActiveAtTip #include @@ -119,7 +120,7 @@ static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50; // We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in // the socket handler thread static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500; -#endif +#endif /* USE_WAKEUP_PIPE */ const std::string NET_MESSAGE_COMMAND_OTHER = "*other*"; @@ -563,7 +564,9 @@ void CNode::CloseSocketDisconnect(CConnman* connman) } } - connman->UnregisterEvents(this); + if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n"); + } LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); @@ -1276,8 +1279,15 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket, LOCK(m_nodes_mutex); m_nodes.push_back(pnode); WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode)); - RegisterEvents(pnode); - WakeSelect(); + if (m_edge_trig_events) { + LOCK(pnode->cs_hSocket); + if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); + } + } + if (m_wakeup_pipe) { + m_wakeup_pipe->Write(); + } } // We received a new connection, harvest entropy from the time (and our peer count) @@ -1562,14 +1572,14 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, } } -#ifdef USE_WAKEUP_PIPE - // We add a pipe to the read set so that the select() call can be woken up from the outside - // This is done when data is added to send buffers (vSendMsg) or when new peers are added - // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to - // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually - // run on Linux and friends. - recv_set.insert(wakeupPipe[0]); -#endif + if (m_wakeup_pipe) { + // We add a pipe to the read set so that the select() call can be woken up from the outside + // This is done when data is added to send buffers (vSendMsg) or when new peers are added + // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to + // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually + // run on Linux and friends. + recv_set.insert(m_wakeup_pipe->m_pipe[0]); + } return !recv_set.empty() || !send_set.empty() || !error_set.empty(); } @@ -1587,9 +1597,8 @@ void CConnman::SocketEventsKqueue(std::set& recv_set, timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; - wakeupSelectNeeded = true; - int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout); - wakeupSelectNeeded = false; + int n{-1}; + ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events, maxEvents, &timeout);}); if (n == -1) { LogPrintf("kevent wait error\n"); } else if (n > 0) { @@ -1621,9 +1630,8 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, const size_t maxEvents = 64; epoll_event events[maxEvents]; - wakeupSelectNeeded = true; - int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - wakeupSelectNeeded = false; + int n{-1}; + ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); for (int i = 0; i < n; i++) { auto& e = events[i]; if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { @@ -1678,9 +1686,8 @@ void CConnman::SocketEventsPoll(const std::vector& nodes, vpollfds.push_back(std::move(it.second)); } - wakeupSelectNeeded = true; - int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - wakeupSelectNeeded = false; + int r{-1}; + ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); if (r < 0) { return; } @@ -1737,9 +1744,8 @@ void CConnman::SocketEventsSelect(const std::vector& nodes, hSocketMax = std::max(hSocketMax, hSocket); } - wakeupSelectNeeded = true; - int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - wakeupSelectNeeded = false; + int nSelect{-1}; + ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);}); if (interruptNet) return; @@ -1782,21 +1788,21 @@ void CConnman::SocketEvents(const std::vector& nodes, { switch (socketEventsMode) { #ifdef USE_KQUEUE - case SOCKETEVENTS_KQUEUE: + case SocketEventsMode::KQueue: SocketEventsKqueue(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_EPOLL - case SOCKETEVENTS_EPOLL: + case SocketEventsMode::EPoll: SocketEventsEpoll(recv_set, send_set, error_set, only_poll); break; #endif #ifdef USE_POLL - case SOCKETEVENTS_POLL: + case SocketEventsMode::Poll: SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); break; #endif - case SOCKETEVENTS_SELECT: + case SocketEventsMode::Select: SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); break; default: @@ -1842,18 +1848,10 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) // empty sets. SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); -#ifdef USE_WAKEUP_PIPE - // drain the wakeup pipe - if (recv_set.count(wakeupPipe[0])) { - char buf[128]; - while (true) { - int r = read(wakeupPipe[0], buf, sizeof(buf)); - if (r <= 0) { - break; - } - } + // Drain the wakeup pipe + if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) { + m_wakeup_pipe->Drain(); } -#endif // Service (send/receive) each of the already connected nodes. SocketHandlerConnected(recv_set, send_set, error_set); @@ -2131,22 +2129,6 @@ void CConnman::WakeMessageHandler() condMsgProc.notify_one(); } -void CConnman::WakeSelect() -{ -#ifdef USE_WAKEUP_PIPE - if (wakeupPipe[1] == -1) { - return; - } - - char buf{0}; - if (write(wakeupPipe[1], &buf, sizeof(buf)) != 1) { - LogPrint(BCLog::NET, "write to wakeupPipe failed\n"); - } -#endif - - wakeupSelectNeeded = false; -} - void CConnman::ThreadDNSAddressSeed() { FastRandomContext rng; @@ -2980,8 +2962,15 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai { LOCK(m_nodes_mutex); m_nodes.push_back(pnode); - RegisterEvents(pnode); - WakeSelect(); + if (m_edge_trig_events) { + LOCK(pnode->cs_hSocket); + if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) { + LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n"); + } + } + if (m_wakeup_pipe) { + m_wakeup_pipe->Write(); + } } } @@ -3132,30 +3121,10 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, return false; } -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - struct kevent event; - EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr); - if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) { - strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError())); - LogPrintf("%s\n", strError.original); - return false; - } + if (m_edge_trig_events && !m_edge_trig_events->AddSocket(sock->Get())) { + LogPrintf("Error: EdgeTriggeredEvents::AddSocket() failed\n"); + return false; } -#endif - -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - epoll_event event; - event.data.fd = sock->Get(); - event.events = EPOLLIN; - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) { - strError = strprintf(_("Error: failed to add socket to epollfd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError())); - LogPrintf("%s\n", strError.original); - return false; - } - } -#endif vhListenSocket.push_back(ListenSocket(sock->Release(), permissions)); @@ -3301,25 +3270,14 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met AssertLockNotHeld(m_total_bytes_sent_mutex); Init(connOptions); -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - kqueuefd = kqueue(); - if (kqueuefd == -1) { - LogPrintf("kqueue failed\n"); + if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) { + m_edge_trig_events = std::make_unique(socketEventsMode); + if (!m_edge_trig_events->IsValid()) { + LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n"); + m_edge_trig_events.reset(); return false; } } -#endif - -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - epollfd = epoll_create1(0); - if (epollfd == -1) { - LogPrintf("epoll_create1 failed\n"); - return false; - } - } -#endif if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) { if (clientInterface) { @@ -3392,45 +3350,13 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met } #ifdef USE_WAKEUP_PIPE - if (pipe(wakeupPipe) != 0) { - wakeupPipe[0] = wakeupPipe[1] = -1; - LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n"); - } else { - int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0); - if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) { - LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); - } - fFlags = fcntl(wakeupPipe[1], F_GETFL, 0); - if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) { - LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n"); - } -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - struct kevent event; - EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr); - int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); - return false; - } - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - epoll_event event; - event.events = EPOLLIN; - event.data.fd = wakeupPipe[0]; - int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, wakeupPipe[0], &event); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError())); - return false; - } - } -#endif + m_wakeup_pipe = std::make_unique(m_edge_trig_events.get()); + if (!m_wakeup_pipe->IsValid()) { + /* We log the error but do not halt initialization */ + LogPrintf("Unable to initialize WakeupPipe instance\n"); + m_wakeup_pipe.reset(); } -#endif +#endif /* USE_WAKEUP_PIPE */ // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); }); @@ -3564,23 +3490,15 @@ void CConnman::StopNodes() for (CNode *pnode : m_nodes) pnode->CloseSocketDisconnect(this); } - for (ListenSocket& hListenSocket : vhListenSocket) + for (ListenSocket& hListenSocket : vhListenSocket) { if (hListenSocket.socket != INVALID_SOCKET) { -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - struct kevent event; - EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); + if (m_edge_trig_events && !m_edge_trig_events->RemoveSocket(hListenSocket.socket)) { + LogPrintf("EdgeTriggeredEvents::RemoveSocket() failed\n"); } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr); - } -#endif if (!CloseSocket(hListenSocket.socket)) LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError())); } + } // clean up some globals (to help leak detection) std::vector nodes; @@ -3604,33 +3522,12 @@ void CConnman::StopNodes() vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); - -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE && kqueuefd != -1) { -#ifdef USE_WAKEUP_PIPE - struct kevent event; - EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueuefd, &event, 1, nullptr, 0, nullptr); -#endif - close(kqueuefd); - } - kqueuefd = -1; -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL && epollfd != -1) { -#ifdef USE_WAKEUP_PIPE - epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr); -#endif - close(epollfd); - } - epollfd = -1; -#endif - -#ifdef USE_WAKEUP_PIPE - if (wakeupPipe[0] != -1) close(wakeupPipe[0]); - if (wakeupPipe[1] != -1) close(wakeupPipe[1]); - wakeupPipe[0] = wakeupPipe[1] = -1; -#endif + /** + * m_wakeup_pipe must be reset *before* m_edge_trig_events as it may + * attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor + */ + m_wakeup_pipe.reset(); + m_edge_trig_events.reset(); } void CConnman::DeleteNode(CNode* pnode) @@ -4143,8 +4040,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) } // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - if (!hasPendingData && wakeupSelectNeeded) - WakeSelect(); + if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) + m_wakeup_pipe->Write(); } } @@ -4234,79 +4131,6 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); } -void CConnman::RegisterEvents(CNode *pnode) -{ -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - LOCK(pnode->cs_hSocket); - assert(pnode->hSocket != INVALID_SOCKET); - - struct kevent events[2]; - EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr); - EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr); - - int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - LOCK(pnode->cs_hSocket); - assert(pnode->hSocket != INVALID_SOCKET); - - epoll_event e; - // We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in - e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; - e.data.fd = pnode->hSocket; - - int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pnode->hSocket, &e); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -} - -void CConnman::UnregisterEvents(CNode *pnode) -{ -#ifdef USE_KQUEUE - if (socketEventsMode == SOCKETEVENTS_KQUEUE) { - AssertLockHeld(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - return; - } - - struct kevent events[2]; - EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); - - int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__, - kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -#ifdef USE_EPOLL - if (socketEventsMode == SOCKETEVENTS_EPOLL) { - AssertLockHeld(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) { - return; - } - - int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, pnode->hSocket, nullptr); - if (r != 0) { - LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__, - epollfd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError())); - } - } -#endif -} - void CaptureMessageToFile(const CAddress& addr, const std::string& msg_type, Span data, diff --git a/src/net.h b/src/net.h index a8b6505099..4e40817027 100644 --- a/src/net.h +++ b/src/net.h @@ -28,9 +28,11 @@ #include #include #include -#include -#include #include +#include +#include +#include +#include #include #include @@ -44,10 +46,6 @@ #include #include -#ifndef WIN32 -#define USE_WAKEUP_PIPE -#endif - class CConnman; class CDeterministicMNList; class CDeterministicMNManager; @@ -822,13 +820,6 @@ class CConnman { friend class CNode; public: - enum SocketEventsMode { - SOCKETEVENTS_SELECT = 0, - SOCKETEVENTS_POLL = 1, - SOCKETEVENTS_EPOLL = 2, - SOCKETEVENTS_KQUEUE = 3, - }; - struct Options { ServiceFlags nLocalServices = NODE_NONE; @@ -852,7 +843,7 @@ public: bool m_use_addrman_outgoing = true; std::vector m_specified_outgoing; std::vector m_added_nodes; - SocketEventsMode socketEventsMode = SOCKETEVENTS_SELECT; + SocketEventsMode socketEventsMode = SocketEventsMode::Select; std::vector m_asmap; bool m_i2p_accept_incoming; }; @@ -1174,7 +1165,6 @@ public: unsigned int GetReceiveFloodSize() const; void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); - void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** Attempts to obfuscate tx time through exponentially distributed emitting. Works assuming that a single interval is used. @@ -1377,9 +1367,6 @@ private: // Whether the node should be passed out in ForEach* callbacks static bool NodeFullyConnected(const CNode* pnode); - void RegisterEvents(CNode* pnode); - void UnregisterEvents(CNode* pnode); - // Network usage totals mutable Mutex m_total_bytes_sent_mutex; std::atomic nTotalBytesRecv{0}; @@ -1514,19 +1501,19 @@ private: */ std::unique_ptr m_i2p_sam_session; -#ifdef USE_WAKEUP_PIPE - /** a pipe which is added to select() calls to wakeup before the timeout */ - int wakeupPipe[2]{-1,-1}; -#endif - std::atomic wakeupSelectNeeded{false}; - SocketEventsMode socketEventsMode; -#ifdef USE_KQUEUE - int kqueuefd{-1}; -#endif -#ifdef USE_EPOLL - int epollfd{-1}; -#endif + std::unique_ptr m_edge_trig_events{nullptr}; + std::unique_ptr m_wakeup_pipe{nullptr}; + + template + void ToggleWakeupPipe(Callable&& func) + { + if (m_wakeup_pipe) { + m_wakeup_pipe->Toggle(func); + } else { + func(); + } + } Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index a5e4244995..5794c7c05b 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -669,24 +669,9 @@ static RPCHelpMan getnetworkinfo() obj.pushKV("connections_mn", (int)node.connman->GetNodeCount(ConnectionDirection::Verified)); obj.pushKV("connections_mn_in", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedIn)); obj.pushKV("connections_mn_out", (int)node.connman->GetNodeCount(ConnectionDirection::VerifiedOut)); - std::string strSocketEvents; - switch (node.connman->GetSocketEventsMode()) { - case CConnman::SOCKETEVENTS_SELECT: - strSocketEvents = "select"; - break; - case CConnman::SOCKETEVENTS_POLL: - strSocketEvents = "poll"; - break; - case CConnman::SOCKETEVENTS_EPOLL: - strSocketEvents = "epoll"; - break; - case CConnman::SOCKETEVENTS_KQUEUE: - strSocketEvents = "kqueue"; - break; - default: - CHECK_NONFATAL(false); - } - obj.pushKV("socketevents", strSocketEvents); + std::string sem_str = SEMToString(node.connman->GetSocketEventsMode()); + CHECK_NONFATAL(sem_str != "unknown"); + obj.pushKV("socketevents", sem_str); } obj.pushKV("networks", GetNetworksInfo()); obj.pushKV("relayfee", ValueFromAmount(::minRelayTxFee.GetFeePerK())); diff --git a/src/util/edge.cpp b/src/util/edge.cpp new file mode 100644 index 0000000000..bb7f867127 --- /dev/null +++ b/src/util/edge.cpp @@ -0,0 +1,239 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include + +#ifdef USE_EPOLL +#include +#endif + +#ifdef USE_KQUEUE +#include +#endif + +EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode) + : m_mode(events_mode) +{ + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + m_fd = epoll_create1(0); + if (m_fd == -1) { + LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1 with error %s\n", + NetworkErrorString(WSAGetLastError())); + return; + } +#else + LogPrintf("Attempting to initialize EdgeTriggeredEvents for epoll without support compiled in!\n"); + return; +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + m_fd = kqueue(); + if (m_fd == -1) { + LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1 with error %s\n", + NetworkErrorString(WSAGetLastError())); + return; + } +#else + LogPrintf("Attempting to initialize EdgeTriggeredEvents for kqueue without support compiled in!\n"); + return; +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + m_valid = true; +} + +EdgeTriggeredEvents::~EdgeTriggeredEvents() +{ + if (m_valid) { +#if defined(USE_KQUEUE) || defined(USE_EPOLL) + if (close(m_fd) != 0) { + LogPrintf("Destroying EdgeTriggeredEvents instance, close() failed for m_fd = %d with error %s\n", m_fd, + NetworkErrorString(WSAGetLastError())); + } +#else + assert(false); +#endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */ + } +} + +bool EdgeTriggeredEvents::RegisterEntity(int entity, std::string entity_name) const +{ + assert(m_valid); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + epoll_event event; + event.data.fd = entity; + event.events = EPOLLIN; + if (epoll_ctl(m_fd, EPOLL_CTL_ADD, entity, &event) != 0) { + LogPrintf("Failed to add %s to epoll fd (epoll_ctl returned error %s)\n", entity_name, + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent event; + EV_SET(&event, entity, EVFILT_READ, EV_ADD, 0, 0, nullptr); + if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to add %s to kqueue fd (kevent returned error %s)\n", entity_name, + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} + +bool EdgeTriggeredEvents::UnregisterEntity(int entity, std::string entity_name) const +{ + assert(m_valid); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + if (epoll_ctl(m_fd, EPOLL_CTL_DEL, entity, nullptr) != 0) { + LogPrintf("Failed to remove %s from epoll fd (epoll_ctl returned error %s)\n", entity_name, + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent event; + EV_SET(&event, entity, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + if (kevent(m_fd, &event, 1, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to remove %s from kqueue fd (kevent returned error %s)\n", entity_name, + NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} + +bool EdgeTriggeredEvents::AddSocket(SOCKET socket) const +{ + return RegisterEntity(socket, "socket"); +} + +bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const +{ + return UnregisterEntity(socket, "socket"); +} + +bool EdgeTriggeredEvents::RegisterPipe(int wakeup_pipe) +{ + if (m_pipe_registered) { + LogPrintf("Pipe already registered, ignoring new registration request\n"); + return false; + } + bool ret = RegisterEntity(wakeup_pipe, "wakeup pipe"); + if (ret) m_pipe_registered = true; + return ret; +} + +bool EdgeTriggeredEvents::UnregisterPipe(int wakeup_pipe) +{ + if (!m_pipe_registered) { + LogPrintf("No pipe currently registered to unregister, ignoring request\n"); + return false; + } + bool ret = UnregisterEntity(wakeup_pipe, "wakeup pipe"); + if (ret) m_pipe_registered = false; + return ret; +} + +bool EdgeTriggeredEvents::RegisterEvents(SOCKET socket) const +{ + assert(m_valid && socket != INVALID_SOCKET); + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + epoll_event e; + // We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in + e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; + e.data.fd = socket; + + if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &e) != 0) { + LogPrintf("Failed to register events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n", + m_fd, EPOLL_CTL_ADD, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent events[2]; + EV_SET(&events[0], socket, EVFILT_READ, EV_ADD, 0, 0, nullptr); + EV_SET(&events[1], socket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr); + + if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to register events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n", + m_fd, EV_ADD, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} + +bool EdgeTriggeredEvents::UnregisterEvents(SOCKET socket) const +{ + assert(m_valid); + + if (socket == INVALID_SOCKET) { + LogPrintf("Cannot unregister events for invalid socket\n"); + return false; + } + + if (m_mode == SocketEventsMode::EPoll) { +#ifdef USE_EPOLL + if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) { + LogPrintf("Failed to unregister events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n", + m_fd, EPOLL_CTL_DEL, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_EPOLL */ + } else if (m_mode == SocketEventsMode::KQueue) { +#ifdef USE_KQUEUE + struct kevent events[2]; + EV_SET(&events[0], socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + EV_SET(&events[1], socket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) { + LogPrintf("Failed to unregister events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n", + m_fd, EV_DELETE, socket, NetworkErrorString(WSAGetLastError())); + return false; + } +#else + assert(false); +#endif /* USE_KQUEUE */ + } else { + assert(false); + } + return true; +} diff --git a/src/util/edge.h b/src/util/edge.h new file mode 100644 index 0000000000..b605cf817c --- /dev/null +++ b/src/util/edge.h @@ -0,0 +1,61 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_EDGE_H +#define BITCOIN_UTIL_EDGE_H + +#include + +#include +#include +#include + +enum class SocketEventsMode : int8_t; + +/** + * A manager for abstracting logic surrounding edge-triggered socket events + * modes like kqueue and epoll. + */ +class EdgeTriggeredEvents +{ +public: + explicit EdgeTriggeredEvents(SocketEventsMode events_mode); + ~EdgeTriggeredEvents(); + + bool IsValid() const { return m_valid; } + int GetFileDescriptor() const { assert(m_fd != -1); return m_fd; } + + /* Add socket to interest list */ + bool AddSocket(SOCKET socket) const; + /* Remove socket from interest list */ + bool RemoveSocket(SOCKET socket) const; + + /* Register events for socket */ + bool RegisterEvents(SOCKET socket) const; + /* Unregister events for socket */ + bool UnregisterEvents(SOCKET socket) const; + +private: + friend class WakeupPipe; + /* Register wakeup pipe with EdgeTriggeredEvents instance */ + bool RegisterPipe(int wakeup_pipe); + /* Unregister wakeup pipe with EdgeTriggeredEvents instance */ + bool UnregisterPipe(int wakeup_pipe); + +private: + bool RegisterEntity(int entity, std::string entity_name) const; + bool UnregisterEntity(int entity, std::string entity_name) const; + +private: + /* Flag set if pipe has been registered with instance */ + bool m_pipe_registered{false}; + /* Instance validity flag set during construction */ + bool m_valid{false}; + /* Flag for storing selected socket events mode */ + SocketEventsMode m_mode; + /* File descriptor used to interact with events mode */ + int m_fd{-1}; +}; + +#endif /* BITCOIN_UTIL_EDGE_H */ diff --git a/src/util/sock.h b/src/util/sock.h index 8099c702a1..324e0c763e 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -18,6 +18,54 @@ */ static constexpr auto MAX_WAIT_FOR_IO = 1s; +enum class SocketEventsMode : int8_t { + Select = 0, + Poll = 1, + EPoll = 2, + KQueue = 3, + + Unknown = -1 +}; + +/* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ +static std::string SEMToString(const SocketEventsMode val) +{ + switch (val) { + case (SocketEventsMode::Select): + return "select"; +#ifdef USE_POLL + case (SocketEventsMode::Poll): + return "poll"; +#endif /* USE_POLL */ +#ifdef USE_EPOLL + case (SocketEventsMode::EPoll): + return "epoll"; +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + case (SocketEventsMode::KQueue): + return "kqueue"; +#endif /* USE_KQUEUE */ + default: + return "unknown"; + }; +} + +/* Converts string to SocketEventsMode value with additional check to report modes not compiled for as unknown */ +static SocketEventsMode SEMFromString(const std::string str) +{ + if (str == "select") { return SocketEventsMode::Select; } +#ifdef USE_POLL + else if (str == "poll") { return SocketEventsMode::Poll; } +#endif /* USE_POLL */ +#ifdef USE_EPOLL + else if (str == "epoll") { return SocketEventsMode::EPoll; } +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + else if (str == "kqueue") { return SocketEventsMode::KQueue; } +#endif /* USE_KQUEUE */ + else { return SocketEventsMode::Unknown; } +} + /** * RAII helper class that manages a socket. Mimics `std::unique_ptr`, but instead of a pointer it * contains a socket and closes it automatically when it goes out of scope. diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp new file mode 100644 index 0000000000..732e23c791 --- /dev/null +++ b/src/util/wpipe.cpp @@ -0,0 +1,95 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include + +static constexpr int EXPECTED_PIPE_WRITTEN_BYTES = 1; + +WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) + : m_edge_trig_events{edge_trig_events} +{ +#ifdef USE_WAKEUP_PIPE + if (pipe(m_pipe.data()) != 0) { + LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed with error %s\n", + NetworkErrorString(WSAGetLastError())); + return; + } + for (size_t idx = 0; idx < m_pipe.size(); idx++) { + int flags = fcntl(m_pipe[idx], F_GETFL, 0); + if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) { + LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed with error %s\n", idx, + NetworkErrorString(WSAGetLastError())); + return; + } + } + if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) { + LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed for m_pipe[0] = %d\n", + m_pipe[0]); + return; + } + m_valid = true; +#else + LogPrintf("Attempting to initialize WakeupPipe without support compiled in!\n"); +#endif /* USE_WAKEUP_PIPE */ +} + +WakeupPipe::~WakeupPipe() +{ + if (m_valid) { +#ifdef USE_WAKEUP_PIPE + if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { + LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n", + m_pipe[0]); + } + for (size_t idx = 0; idx < m_pipe.size(); idx++) { + if (close(m_pipe[idx]) != 0) { + LogPrintf("Destroying WakeupPipe instance, close() failed for m_pipe[%d] = %d with error %s\n", + idx, m_pipe[idx], NetworkErrorString(WSAGetLastError())); + } + } +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ + } +} + +void WakeupPipe::Drain() const +{ +#ifdef USE_WAKEUP_PIPE + assert(m_valid && m_pipe[0] != -1); + + int ret{0}; + std::array buf; + do { + ret = read(m_pipe[0], buf.data(), buf.size()); + } while (ret > 0); +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} + +void WakeupPipe::Write() +{ +#ifdef USE_WAKEUP_PIPE + assert(m_valid && m_pipe[1] != -1); + + std::array buf; + int ret = write(m_pipe[1], buf.data(), buf.size()); + if (ret == -1) { + LogPrintf("write() to m_pipe[1] = %d failed with error %s\n", m_pipe[1], NetworkErrorString(WSAGetLastError())); + } + if (ret != EXPECTED_PIPE_WRITTEN_BYTES) { + LogPrintf("write() to m_pipe[1] = %d succeeded with unexpected result %d (expected %d)\n", m_pipe[1], ret, + EXPECTED_PIPE_WRITTEN_BYTES); + } + + m_need_wakeup = false; +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} diff --git a/src/util/wpipe.h b/src/util/wpipe.h new file mode 100644 index 0000000000..c4fff558ec --- /dev/null +++ b/src/util/wpipe.h @@ -0,0 +1,59 @@ +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_WPIPE_H +#define BITCOIN_UTIL_WPIPE_H + +#include +#include +#include + +#ifndef WIN32 +#define USE_WAKEUP_PIPE +#endif + +class EdgeTriggeredEvents; + +/** + * A manager for abstracting logic surrounding wakeup pipes. Supported only on + * platforms with a POSIX API. Disabled on Windows. + */ +class WakeupPipe +{ +public: + explicit WakeupPipe(EdgeTriggeredEvents* edge_trig_events); + ~WakeupPipe(); + + bool IsValid() const { return m_valid; }; + + /* Drain pipe of all contents */ + void Drain() const; + /* Write a byte to the pipe */ + void Write(); + + /* Used to wrap calls around m_need_wakeup toggling */ + template + void Toggle(Callable&& func) + { + assert(m_valid); + + m_need_wakeup = true; + func(); + m_need_wakeup = false; + } + +public: + /* File descriptors for read and write data channels */ + std::array m_pipe{{ -1, -1 }}; + /* Flag used to determine if Write() needs to be called. Used occasionally */ + std::atomic m_need_wakeup{false}; + +private: + /* Instance validity flag set during construction */ + bool m_valid{false}; + /* Pointer to EdgeTriggeredEvents instance used for pipe (de)registration if using supported events modes */ + EdgeTriggeredEvents* m_edge_trig_events{nullptr}; +}; + +#endif /* BITCOIN_UTIL_WPIPE_H */