Implement kqueue support (#3892)

This commit is contained in:
UdjinM6 2020-12-30 22:34:42 +03:00 committed by GitHub
parent bfe6c7ca5e
commit a06eba3eb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 160 additions and 2 deletions

View File

@ -114,6 +114,10 @@ typedef char* sockopt_arg_type;
#define USE_EPOLL
#endif
#if defined(__FreeBSD__) || defined(__APPLE__)
#define USE_KQUEUE
#endif
bool static inline IsSelectableSocket(const SOCKET& s) {
#if defined(USE_POLL) || defined(WIN32)
return true;

View File

@ -439,6 +439,9 @@ std::string GetSupportedSocketEventsStr()
#endif
#ifdef USE_EPOLL
strSupportedModes += ", 'epoll'";
#endif
#ifdef USE_KQUEUE
strSupportedModes += ", 'kqueue'";
#endif
return strSupportedModes;
}
@ -2418,6 +2421,10 @@ bool AppInitMain()
#ifdef USE_EPOLL
} else if (strSocketEventsMode == "epoll") {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_EPOLL;
#endif
#ifdef USE_KQUEUE
} else if (strSocketEventsMode == "kqueue") {
connOptions.socketEventsMode = CConnman::SOCKETEVENTS_KQUEUE;
#endif
} else {
return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), strSocketEventsMode, GetSupportedSocketEventsStr()));

View File

@ -44,6 +44,10 @@
#include <sys/epoll.h>
#endif
#ifdef USE_KQUEUE
#include <sys/event.h>
#endif
#ifdef USE_UPNP
#include <miniupnpc/miniupnpc.h>
#include <miniupnpc/miniwget.h>
@ -1544,6 +1548,41 @@ bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &s
return !recv_set.empty() || !send_set.empty() || !error_set.empty();
}
#ifdef USE_KQUEUE
void CConnman::SocketEventsKqueue(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
{
const size_t maxEvents = 64;
struct kevent events[maxEvents];
struct timespec timeout;
timeout.tv_sec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
timeout.tv_nsec = (fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;
wakeupSelectNeeded = true;
int n = kevent(kqueuefd, nullptr, 0, events, maxEvents, &timeout);
wakeupSelectNeeded = false;
if (n == -1) {
LogPrintf("kevent wait error\n");
} else if (n > 0) {
for (int i = 0; i < n; i++) {
auto& event = events[i];
if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) {
error_set.insert((SOCKET)event.ident);
continue;
}
if (event.filter == EVFILT_READ) {
recv_set.insert((SOCKET)event.ident);
}
if (event.filter == EVFILT_WRITE) {
send_set.insert((SOCKET)event.ident);
}
}
}
}
#endif
#ifdef USE_EPOLL
void CConnman::SocketEventsEpoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
{
@ -1698,6 +1737,11 @@ void CConnman::SocketEventsSelect(std::set<SOCKET> &recv_set, std::set<SOCKET> &
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll)
{
switch (socketEventsMode) {
#ifdef USE_KQUEUE
case SOCKETEVENTS_KQUEUE:
SocketEventsKqueue(recv_set, send_set, error_set, fOnlyPoll);
break;
#endif
#ifdef USE_EPOLL
case SOCKETEVENTS_EPOLL:
SocketEventsEpoll(recv_set, send_set, error_set, fOnlyPoll);
@ -2890,6 +2934,19 @@ bool CConnman::BindListenPort(const CService &addrBind, std::string& strError, b
return false;
}
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
struct kevent event;
EV_SET(&event, hListenSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
if (kevent(kqueuefd, &event, 1, nullptr, 0, nullptr) != 0) {
strError = strprintf(_("Error: failed to add socket to kqueuefd (kevent returned error %s)"), NetworkErrorString(WSAGetLastError()));
LogPrintf("%s\n", strError);
CloseSocket(hListenSocket);
return false;
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_event event;
@ -3050,6 +3107,16 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
nMaxOutboundCycleStartTime = 0;
}
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
kqueuefd = kqueue();
if (kqueuefd == -1) {
LogPrintf("kqueue failed\n");
return false;
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epollfd = epoll_create1(0);
@ -3146,6 +3213,18 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
int r = kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, wakeupPipe[0], NetworkErrorString(WSAGetLastError()));
return false;
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_event event;
@ -3273,6 +3352,13 @@ void CConnman::Stop()
}
for (ListenSocket& hListenSocket : vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET) {
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE) {
struct kevent event;
EV_SET(&event, hListenSocket.socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, hListenSocket.socket, nullptr);
@ -3301,6 +3387,17 @@ void CConnman::Stop()
semOutbound.reset();
semAddnode.reset();
#ifdef USE_KQUEUE
if (socketEventsMode == SOCKETEVENTS_KQUEUE && kqueuefd != -1) {
#ifdef USE_WAKEUP_PIPE
struct kevent event;
EV_SET(&event, wakeupPipe[0], EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueuefd, &event, 1, nullptr, 0, nullptr);
#endif
close(kqueuefd);
}
kqueuefd = -1;
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SOCKETEVENTS_EPOLL && epollfd != -1) {
#ifdef USE_WAKEUP_PIPE
@ -3956,6 +4053,24 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
void CConnman::RegisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode != SOCKETEVENTS_KQUEUE) {
return;
}
LOCK(pnode->cs_hSocket);
assert(pnode->hSocket != INVALID_SOCKET);
struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode != SOCKETEVENTS_EPOLL) {
return;
@ -3979,6 +4094,26 @@ void CConnman::RegisterEvents(CNode *pnode)
void CConnman::UnregisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode != SOCKETEVENTS_KQUEUE) {
return;
}
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
return;
}
struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
int r = kevent(kqueuefd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
kqueuefd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode != SOCKETEVENTS_EPOLL) {
return;

View File

@ -98,7 +98,9 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;
// NOTE: When adjusting this, update rpcnet:setban's help ("24h")
static const unsigned int DEFAULT_MISBEHAVING_BANTIME = 60 * 60 * 24; // Default 24-hour ban
#if defined USE_EPOLL
#if defined USE_KQUEUE
#define DEFAULT_SOCKETEVENTS "kqueue"
#elif defined USE_EPOLL
#define DEFAULT_SOCKETEVENTS "epoll"
#elif defined USE_POLL
#define DEFAULT_SOCKETEVENTS "poll"
@ -149,6 +151,7 @@ public:
SOCKETEVENTS_SELECT = 0,
SOCKETEVENTS_POLL = 1,
SOCKETEVENTS_EPOLL = 2,
SOCKETEVENTS_KQUEUE = 3,
};
struct Options
@ -493,6 +496,9 @@ private:
void CalculateNumConnectionsChangedStats();
void InactivityCheck(CNode *pnode);
bool GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
#ifdef USE_KQUEUE
void SocketEventsKqueue(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
#endif
#ifdef USE_EPOLL
void SocketEventsEpoll(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set, bool fOnlyPoll);
#endif
@ -620,6 +626,9 @@ private:
std::atomic<bool> wakeupSelectNeeded{false};
SocketEventsMode socketEventsMode;
#ifdef USE_KQUEUE
int kqueuefd{-1};
#endif
#ifdef USE_EPOLL
int epollfd{-1};
#endif

View File

@ -439,7 +439,7 @@ UniValue getnetworkinfo(const JSONRPCRequest& request)
" \"timeoffset\": xxxxx, (numeric) the time offset\n"
" \"connections\": xxxxx, (numeric) the number of connections\n"
" \"networkactive\": true|false, (bool) whether p2p networking is enabled\n"
" \"socketevents\": \"xxx/\", (string) the socket events mode, either epoll, poll or select\n"
" \"socketevents\": \"xxx/\", (string) the socket events mode, either kqueue, epoll, poll or select\n"
" \"networks\": [ (array) information per network\n"
" {\n"
" \"name\": \"xxx\", (string) network (ipv4, ipv6 or onion)\n"
@ -490,6 +490,9 @@ UniValue getnetworkinfo(const JSONRPCRequest& request)
case CConnman::SOCKETEVENTS_EPOLL:
strSocketEvents = "epoll";
break;
case CConnman::SOCKETEVENTS_KQUEUE:
strSocketEvents = "kqueue";
break;
default:
assert(false);
}