refactor: introduce EdgeTriggeredEvents, move {epoll, kqueue} fd there

This commit is contained in:
Kittywhiskers Van Gogh 2024-05-14 17:22:42 +00:00
parent 3b11ef9b89
commit 212df0677f
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
5 changed files with 131 additions and 51 deletions

View File

@ -327,6 +327,7 @@ BITCOIN_CORE_H = \
util/bip32.h \
util/bytevectorhash.h \
util/check.h \
util/edge.h \
util/enumerate.h \
util/epochguard.h \
util/error.h \
@ -776,6 +777,7 @@ libbitcoin_util_a_SOURCES = \
util/bip32.cpp \
util/bytevectorhash.cpp \
util/check.cpp \
util/edge.cpp \
util/error.cpp \
util/fees.cpp \
util/hasher.cpp \

View File

@ -1588,7 +1588,7 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
wakeupSelectNeeded = true;
int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout);
int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);
wakeupSelectNeeded = false;
if (n == -1) {
LogPrintf("kevent wait error\n");
@ -1622,7 +1622,7 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
epoll_event events[maxEvents];
wakeupSelectNeeded = true;
int n = epoll_wait(epollfd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
for (int i = 0; i < n; i++) {
auto& e = events[i];
@ -3136,8 +3136,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, sock->Get(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
if (kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueue fd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
@ -3149,8 +3149,8 @@ bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError,
epoll_event event;
event.data.fd = sock->Get();
event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) {
strError = strprintf(_("Error: failed to add socket to epollfd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError()));
if (epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, sock->Get(), &event) != 0) {
strError = strprintf(_("Error: failed to add socket to epoll fd (epoll_ctl returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError.original);
return false;
}
@ -3301,25 +3301,14 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
AssertLockNotHeld(m_total_bytes_sent_mutex);
Init(connOptions);
#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue) {
kqueuefd = kqueue();
if (kqueuefd == -1) {
LogPrintf("kqueue failed\n");
if (socketEventsMode == SocketEventsMode::EPoll || socketEventsMode == SocketEventsMode::KQueue) {
m_edge_trig_events = std::make_unique<EdgeTriggeredEvents>(socketEventsMode);
if (!m_edge_trig_events->IsValid()) {
LogPrintf("Unable to initialize EdgeTriggeredEvents instance\n");
m_edge_trig_events.reset();
return false;
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
epollfd = epoll_create1(0);
if (epollfd == -1) {
LogPrintf("epoll_create1 failed\n");
return false;
}
}
#endif
if (fListen && !InitBinds(connOptions.vBinds, connOptions.vWhiteBinds, connOptions.onion_binds)) {
if (clientInterface) {
@ -3408,10 +3397,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
@ -3421,10 +3410,10 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
epoll_event event;
event.events = EPOLLIN;
event.data.fd = wakeupPipe[0];
int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, wakeupPipe[0], &event);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], &event);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
@ -3570,12 +3559,12 @@ void CConnman::StopNodes()
if (socketEventsMode == SocketEventsMode::KQueue) {
struct kevent event;
EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
kevent(Assert(m_edge_trig_events)->m_fd, &event, 1, nullptr, 0, nullptr);
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
}
#endif
if (!CloseSocket(hListenSocket.socket))
@ -3606,24 +3595,22 @@ void CConnman::StopNodes()
semAddnode.reset();
#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue && kqueuefd != -1) {
if (socketEventsMode == SocketEventsMode::KQueue && Assert(m_edge_trig_events)->m_fd != -1) {
#ifdef USE_WAKEUP_PIPE
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
kevent(m_edge_trig_events->m_fd, &event, 1, nullptr, 0, nullptr);
#endif
close(kqueuefd);
m_edge_trig_events.reset();
}
kqueuefd = -1;
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll && epollfd != -1) {
if (socketEventsMode == SocketEventsMode::EPoll && Assert(m_edge_trig_events)->m_fd != -1) {
#ifdef USE_WAKEUP_PIPE
epoll_ctl(epollfd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr);
epoll_ctl(m_edge_trig_events->m_fd, EPOLL_CTL_DEL, wakeupPipe[0], nullptr);
#endif
close(epollfd);
m_edge_trig_events.reset();
}
epollfd = -1;
#endif
#ifdef USE_WAKEUP_PIPE
@ -4245,10 +4232,10 @@ void CConnman::RegisterEvents(CNode *pnode)
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
@ -4262,10 +4249,10 @@ void CConnman::RegisterEvents(CNode *pnode)
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = pnode->hSocket;
int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pnode->hSocket, &e);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, pnode->hSocket, &e);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
@ -4284,10 +4271,10 @@ void CConnman::UnregisterEvents(CNode *pnode)
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
@ -4298,10 +4285,10 @@ void CConnman::UnregisterEvents(CNode *pnode)
return;
}
int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
epollfd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
m_edge_trig_events->m_fd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif

View File

@ -28,9 +28,10 @@
#include <sync.h>
#include <threadinterrupt.h>
#include <uint256.h>
#include <util/check.h>
#include <util/edge.h>
#include <util/system.h>
#include <consensus/params.h>
#include <util/check.h>
#include <atomic>
#include <condition_variable>
@ -1514,12 +1515,7 @@ private:
std::atomic<bool> wakeupSelectNeeded{false};
SocketEventsMode socketEventsMode;
#ifdef USE_KQUEUE
int kqueuefd{-1};
#endif
#ifdef USE_EPOLL
int epollfd{-1};
#endif
std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};
Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);

60
src/util/edge.cpp Normal file
View File

@ -0,0 +1,60 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <util/edge.h>
#include <logging.h>
#include <util/sock.h>
#include <assert.h>
#ifdef USE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef USE_KQUEUE
#include <sys/event.h>
#endif
EdgeTriggeredEvents::EdgeTriggeredEvents(SocketEventsMode events_mode)
: m_mode(events_mode)
{
if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
m_fd = epoll_create1(0);
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, epoll_create1 returned -1\n");
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for epoll without support compiled in!\n");
return;
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
m_fd = kqueue();
if (m_fd == -1) {
LogPrintf("Unable to initialize EdgeTriggeredEvents, kqueue returned -1\n");
return;
}
#else
LogPrintf("Attempting to initialize EdgeTriggeredEvents for kqueue without support compiled in!\n");
return;
#endif /* USE_KQUEUE */
} else {
assert(false);
}
m_valid = true;
}
EdgeTriggeredEvents::~EdgeTriggeredEvents()
{
if (m_valid) {
#if defined(USE_KQUEUE) || defined(USE_EPOLL)
close(m_fd);
#else
assert(false);
#endif /* defined(USE_KQUEUE) || defined(USE_EPOLL) */
}
}

35
src/util/edge.h Normal file
View File

@ -0,0 +1,35 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_EDGE_H
#define BITCOIN_UTIL_EDGE_H
#include <cstdint>
enum class SocketEventsMode : int8_t;
/**
* A manager for abstracting logic surrounding edge-triggered socket events
* modes like kqueue and epoll.
*/
class EdgeTriggeredEvents
{
public:
explicit EdgeTriggeredEvents(SocketEventsMode events_mode);
~EdgeTriggeredEvents();
bool IsValid() const { return m_valid; }
public:
/* File descriptor used to interact with events mode */
int m_fd{-1};
private:
/* Instance validity flag set during construction */
bool m_valid{false};
/* Flag for storing selected socket events mode */
SocketEventsMode m_mode;
};
#endif /* BITCOIN_UTIL_EDGE_H */