Merge pull request #3396 from codablock/pr_backport_poll

Backport poll() code from Bitcoin
This commit is contained in:
Alexander Block 2020-04-09 00:12:20 +02:00 committed by GitHub
commit 410d4a68c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 449 additions and 288 deletions

View File

@ -102,8 +102,15 @@ typedef void* sockopt_arg_type;
typedef char* sockopt_arg_type;
#endif
// Note these both should work with the current usage of poll, but best to be safe
// WIN32 poll is broken https://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
// __APPLE__ poll is broke https://github.com/bitcoin/bitcoin/pull/14336#issuecomment-437384408
#if defined(__linux__)
#define USE_POLL
#endif
bool static inline IsSelectableSocket(const SOCKET& s) {
#ifdef WIN32
#if defined(USE_POLL) || defined(WIN32)
return true;
#else
return (s < FD_SETSIZE);

View File

@ -1117,8 +1117,14 @@ bool AppInitParameterInteraction()
nMaxConnections = std::max(nUserMaxConnections, 0);
// Trim requested connection counts, to fit into system limitations
nMaxConnections = std::max(std::min(nMaxConnections, (int)(FD_SETSIZE - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS)), 0);
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS);
#ifdef USE_POLL
int fd_max = nFD;
#else
int fd_max = FD_SETSIZE;
#endif
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
if (nFD < MIN_CORE_FILEDESCRIPTORS)
return InitError(_("Not enough file descriptors available."));
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);

View File

@ -34,6 +34,10 @@
#include <fcntl.h>
#endif
#ifdef USE_POLL
#include <poll.h>
#endif
#ifdef USE_UPNP
#include <miniupnpc/miniupnpc.h>
#include <miniupnpc/miniwget.h>
@ -41,6 +45,7 @@
#include <miniupnpc/upnperrors.h>
#endif
#include <unordered_map>
#include <math.h>
@ -78,6 +83,10 @@ enum BindFlags {
BF_WHITELIST = (1U << 2),
};
// The set of sockets cannot be modified while waiting
// The sleep time needs to be small to avoid new sockets stalling
static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50;
const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
constexpr const CConnman::CFullyConnectedOnly CConnman::FullyConnectedOnly;
@ -1228,330 +1237,442 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
}
}
void CConnman::ThreadSocketHandler()
void CConnman::DisconnectNodes()
{
unsigned int nPrevNodeCount = 0;
while (!interruptNet)
{
//
// Disconnect nodes
//
{
LOCK(cs_vNodes);
// Disconnect unused nodes
for (auto it = vNodes.begin(); it != vNodes.end(); )
{
CNode* pnode = *it;
if (pnode->fDisconnect)
{
if (fLogIPs) {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode);
} else {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode);
}
LOCK(cs_vNodes);
// remove from vNodes
it = vNodes.erase(it);
// release outbound grant (if any)
pnode->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
} else {
++it;
if (!fNetworkActive) {
// Disconnect any connected nodes
for (CNode* pnode : vNodes) {
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
pnode->fDisconnect = true;
}
}
}
// Disconnect unused nodes
for (auto it = vNodes.begin(); it != vNodes.end(); )
{
// Delete disconnected nodes
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
for (auto it = vNodesDisconnected.begin(); it != vNodesDisconnected.end(); )
CNode* pnode = *it;
if (pnode->fDisconnect)
{
CNode* pnode = *it;
// wait until threads are done using it
bool fDelete = false;
if (pnode->GetRefCount() <= 0) {
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) {
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
fDelete = true;
}
if (fLogIPs) {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->addr.ToString(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode);
} else {
LogPrintf("ThreadSocketHandler -- removing node: peer=%d nRefCount=%d fInbound=%d fMasternode=%d\n",
pnode->GetId(), pnode->GetRefCount(), pnode->fInbound, pnode->fMasternode);
}
// remove from vNodes
it = vNodes.erase(it);
// release outbound grant (if any)
pnode->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
} else {
++it;
}
}
}
{
// Delete disconnected nodes
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
for (auto it = vNodesDisconnected.begin(); it != vNodesDisconnected.end(); )
{
CNode* pnode = *it;
// wait until threads are done using it
bool fDelete = false;
if (pnode->GetRefCount() <= 0) {
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) {
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
fDelete = true;
}
}
if (fDelete) {
it = vNodesDisconnected.erase(it);
DeleteNode(pnode);
}
}
if (!fDelete) {
++it;
if (fDelete) {
it = vNodesDisconnected.erase(it);
DeleteNode(pnode);
}
}
if (!fDelete) {
++it;
}
}
size_t vNodesSize;
}
}
void CConnman::NotifyNumConnectionsChanged()
{
size_t vNodesSize;
{
LOCK(cs_vNodes);
vNodesSize = vNodes.size();
}
if(vNodesSize != nPrevNodeCount) {
nPrevNodeCount = vNodesSize;
if(clientInterface)
clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
}
}
void CConnman::InactivityCheck(CNode *pnode)
{
int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
{
LOCK(cs_vNodes);
vNodesSize = vNodes.size();
LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
pnode->fDisconnect = true;
}
if(vNodesSize != nPrevNodeCount) {
nPrevNodeCount = vNodesSize;
if(clientInterface)
clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
{
LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
{
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
}
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
{
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
else if (!pnode->fSuccessfullyConnected)
{
LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
pnode->fDisconnect = true;
}
}
}
//
// Find which sockets have data to receive
//
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
bool CConnman::GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
{
for (const ListenSocket& hListenSocket : vhListenSocket) {
recv_set.insert(hListenSocket.socket);
}
fd_set fdsetRecv;
fd_set fdsetSend;
fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
SOCKET hSocketMax = 0;
bool have_fds = false;
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
{
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
#ifndef WIN32
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is available for sending and at the same time optimistic sending was disabled
// when pushing the data.
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
FD_SET(wakeupPipe[0], &fdsetRecv);
hSocketMax = std::max(hSocketMax, (SOCKET)wakeupPipe[0]);
have_fds = true;
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
error_set.insert(pnode->hSocket);
if (select_send) {
send_set.insert(pnode->hSocket);
continue;
}
if (select_recv) {
recv_set.insert(pnode->hSocket);
}
}
}
#ifdef USE_WAKEUP_PIPE
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is available for sending and at the same time optimistic sending was disabled
// when pushing the data.
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
recv_set.insert(wakeupPipe[0]);
#endif
for (const ListenSocket& hListenSocket : vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
have_fds = true;
return !recv_set.empty() || !send_set.empty() || !error_set.empty();
}
#ifdef USE_POLL
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
std::unordered_map<SOCKET, struct pollfd> pollfds;
for (SOCKET socket_id : recv_select_set) {
pollfds[socket_id].fd = socket_id;
pollfds[socket_id].events |= POLLIN;
}
for (SOCKET socket_id : send_select_set) {
pollfds[socket_id].fd = socket_id;
pollfds[socket_id].events |= POLLOUT;
}
for (SOCKET socket_id : error_select_set) {
pollfds[socket_id].fd = socket_id;
// These flags are ignored, but we set them for clarity
pollfds[socket_id].events |= POLLERR|POLLHUP;
}
std::vector<struct pollfd> vpollfds;
vpollfds.reserve(pollfds.size());
for (auto it : pollfds) {
vpollfds.push_back(std::move(it.second));
}
wakeupSelectNeeded = true;
int r = poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
if (r < 0) {
return;
}
if (interruptNet) return;
for (struct pollfd pollfd_entry : vpollfds) {
if (pollfd_entry.revents & POLLIN) recv_set.insert(pollfd_entry.fd);
if (pollfd_entry.revents & POLLOUT) send_set.insert(pollfd_entry.fd);
if (pollfd_entry.revents & (POLLERR|POLLHUP)) error_set.insert(pollfd_entry.fd);
}
}
#else
void CConnman::SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set)
{
std::set<SOCKET> recv_select_set, send_select_set, error_select_set;
if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) {
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
return;
}
//
// Find which sockets have data to receive
//
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend
fd_set fdsetRecv;
fd_set fdsetSend;
fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
SOCKET hSocketMax = 0;
for (SOCKET hSocket : recv_select_set) {
FD_SET(hSocket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hSocket);
}
for (SOCKET hSocket : send_select_set) {
FD_SET(hSocket, &fdsetSend);
hSocketMax = std::max(hSocketMax, hSocket);
}
for (SOCKET hSocket : error_select_set) {
FD_SET(hSocket, &fdsetError);
hSocketMax = std::max(hSocketMax, hSocket);
}
wakeupSelectNeeded = true;
int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
wakeupSelectNeeded = false;
if (interruptNet)
return;
if (nSelect == SOCKET_ERROR)
{
int nErr = WSAGetLastError();
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
for (unsigned int i = 0; i <= hSocketMax; i++)
FD_SET(i, &fdsetRecv);
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
if (!interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)))
return;
}
for (SOCKET hSocket : recv_select_set) {
if (FD_ISSET(hSocket, &fdsetRecv)) {
recv_set.insert(hSocket);
}
}
{
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
{
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
for (SOCKET hSocket : send_select_set) {
if (FD_ISSET(hSocket, &fdsetSend)) {
send_set.insert(hSocket);
}
}
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
for (SOCKET hSocket : error_select_set) {
if (FD_ISSET(hSocket, &fdsetError)) {
error_set.insert(hSocket);
}
}
}
#endif
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
void CConnman::SocketHandler()
{
std::set<SOCKET> recv_set, send_set, error_set;
SocketEvents(recv_set, send_set, error_set);
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = std::max(hSocketMax, pnode->hSocket);
have_fds = true;
if (select_send) {
FD_SET(pnode->hSocket, &fdsetSend);
continue;
}
if (select_recv) {
FD_SET(pnode->hSocket, &fdsetRecv);
}
#ifdef USE_WAKEUP_PIPE
// drain the wakeup pipe
if (recv_set.count(wakeupPipe[0])) {
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif
wakeupSelectNeeded = true;
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
wakeupSelectNeeded = false;
if (interruptNet) return;
//
// Accept new connections
//
for (const ListenSocket& hListenSocket : vhListenSocket)
{
if (hListenSocket.socket != INVALID_SOCKET && recv_set.count(hListenSocket.socket) > 0)
{
AcceptConnection(hListenSocket);
}
}
//
// Service each socket
//
std::vector<CNode*> vNodesCopy = CopyNodeVector();
for (CNode* pnode : vNodesCopy)
{
if (interruptNet)
return;
if (nSelect == SOCKET_ERROR)
//
// Receive
//
bool recvSet = false;
bool sendSet = false;
bool errorSet = false;
{
if (have_fds)
{
int nErr = WSAGetLastError();
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
for (unsigned int i = 0; i <= hSocketMax; i++)
FD_SET(i, &fdsetRecv);
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
recvSet = recv_set.count(pnode->hSocket) > 0;
sendSet = send_set.count(pnode->hSocket) > 0;
errorSet = error_set.count(pnode->hSocket) > 0;
}
#ifndef WIN32
// drain the wakeup pipe
if (FD_ISSET(wakeupPipe[0], &fdsetRecv)) {
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
}
#endif
//
// Accept new connections
//
for (const ListenSocket& hListenSocket : vhListenSocket)
if (recvSet || errorSet)
{
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
{
AcceptConnection(hListenSocket);
}
}
//
// Service each socket
//
std::vector<CNode*> vNodesCopy = CopyNodeVector();
for (CNode* pnode : vNodesCopy)
{
if (interruptNet)
return;
//
// Receive
//
bool recvSet = false;
bool sendSet = false;
bool errorSet = false;
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
int nBytes = 0;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
}
if (recvSet || errorSet)
if (nBytes > 0)
{
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
int nBytes = 0;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
}
if (nBytes > 0)
{
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
}
else if (nBytes == 0)
{
// socket closed gracefully
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "socket closed\n");
}
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
}
else if (nBytes < 0)
{
// error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
if (!pnode->fDisconnect)
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
}
//
// Send
//
if (sendSet)
else if (nBytes == 0)
{
LOCK(pnode->cs_vSend);
size_t nBytes = SocketSendData(pnode);
if (nBytes) {
RecordBytesSent(nBytes);
// socket closed gracefully
if (!pnode->fDisconnect) {
LogPrint(BCLog::NET, "socket closed\n");
}
pnode->CloseSocketDisconnect();
}
//
// Inactivity checking
//
int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
else if (nBytes < 0)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
// error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
{
LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
{
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
}
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
{
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
else if (!pnode->fSuccessfullyConnected)
{
LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
pnode->fDisconnect = true;
if (!pnode->fDisconnect)
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
pnode->CloseSocketDisconnect();
}
}
}
ReleaseNodeVector(vNodesCopy);
//
// Send
//
if (sendSet)
{
LOCK(pnode->cs_vSend);
size_t nBytes = SocketSendData(pnode);
if (nBytes) {
RecordBytesSent(nBytes);
}
}
InactivityCheck(pnode);
}
ReleaseNodeVector(vNodesCopy);
}
void CConnman::ThreadSocketHandler()
{
while (!interruptNet)
{
DisconnectNodes();
NotifyNumConnectionsChanged();
SocketHandler();
}
}
@ -1566,7 +1687,7 @@ void CConnman::WakeMessageHandler()
void CConnman::WakeSelect()
{
#ifndef WIN32
#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[1] == -1) {
return;
}
@ -2487,14 +2608,6 @@ void CConnman::SetNetworkActive(bool active)
fNetworkActive = active;
if (!fNetworkActive) {
LOCK(cs_vNodes);
// Close sockets to all nodes
for (CNode* pnode : vNodes) {
pnode->CloseSocketDisconnect();
}
}
uiInterface.NotifyNetworkActiveChanged(fNetworkActive);
}
@ -2506,6 +2619,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) :
setBannedIsDirty = false;
fAddressesInitialized = false;
nLastNodeId = 0;
nPrevNodeCount = 0;
nSendBufferMaxSize = 0;
nReceiveFloodSize = 0;
flagInterruptMsgProc = false;
@ -2639,7 +2753,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
fMsgProcWake = false;
}
#ifndef WIN32
#ifdef USE_WAKEUP_PIPE
if (pipe(wakeupPipe) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n");
@ -2778,7 +2892,7 @@ void CConnman::Stop()
semOutbound.reset();
semAddnode.reset();
#ifndef WIN32
#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;

View File

@ -50,6 +50,7 @@
#define DEFAULT_ALLOW_OPTIMISTIC_SEND true
#else
#define DEFAULT_ALLOW_OPTIMISTIC_SEND false
#define USE_WAKEUP_PIPE
#endif
class CScheduler;
@ -474,6 +475,12 @@ private:
void ThreadOpenConnections(std::vector<std::string> connect);
void ThreadMessageHandler();
void AcceptConnection(const ListenSocket& hListenSocket);
void DisconnectNodes();
void NotifyNumConnectionsChanged();
void InactivityCheck(CNode *pnode);
bool GenerateSelectSet(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
void SocketEvents(std::set<SOCKET> &recv_set, std::set<SOCKET> &send_set, std::set<SOCKET> &error_set);
void SocketHandler();
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
void ThreadOpenMasternodeConnections();
@ -549,6 +556,7 @@ private:
std::list<CNode*> vNodesDisconnected;
mutable CCriticalSection cs_vNodes;
std::atomic<NodeId> nLastNodeId;
unsigned int nPrevNodeCount;
/** Services this instance offers */
ServiceFlags nLocalServices;
@ -575,7 +583,7 @@ private:
CThreadInterrupt interruptNet;
#ifndef WIN32
#ifdef USE_WAKEUP_PIPE
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif

View File

@ -20,6 +20,9 @@
#include <boost/algorithm/string/case_conv.hpp> // for to_lower()
#include <boost/algorithm/string/predicate.hpp> // for startswith() and endswith()
#ifdef USE_POLL
#include <poll.h>
#endif
#if !defined(HAVE_MSG_NOSIGNAL)
#define MSG_NOSIGNAL 0
@ -261,11 +264,19 @@ static IntrRecvError InterruptibleRecv(uint8_t* data, size_t len, int timeout, c
if (!IsSelectableSocket(hSocket)) {
return IntrRecvError::NetworkError;
}
struct timeval tval = MillisToTimeval(std::min(endTime - curTime, maxWait));
int timeout_ms = std::min(endTime - curTime, maxWait);
#ifdef USE_POLL
struct pollfd pollfd = {};
pollfd.fd = hSocket;
pollfd.events = POLLIN | POLLOUT;
int nRet = poll(&pollfd, 1, timeout_ms);
#else
struct timeval tval = MillisToTimeval(timeout_ms);
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(hSocket, &fdset);
int nRet = select(hSocket + 1, &fdset, nullptr, nullptr, &tval);
#endif
if (nRet == SOCKET_ERROR) {
return IntrRecvError::NetworkError;
}
@ -486,11 +497,18 @@ bool ConnectSocketDirectly(const CService &addrConnect, const SOCKET& hSocket, i
// WSAEINVAL is here because some legacy version of winsock uses it
if (nErr == WSAEINPROGRESS || nErr == WSAEWOULDBLOCK || nErr == WSAEINVAL)
{
#ifdef USE_POLL
struct pollfd pollfd = {};
pollfd.fd = hSocket;
pollfd.events = POLLIN | POLLOUT;
int nRet = poll(&pollfd, 1, nTimeout);
#else
struct timeval timeout = MillisToTimeval(nTimeout);
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(hSocket, &fdset);
int nRet = select(hSocket + 1, nullptr, &fdset, nullptr, &timeout);
#endif
if (nRet == 0)
{
LogPrint(BCLog::NET, "connection to %s timeout\n", addrConnect.ToString());

View File

@ -188,6 +188,9 @@ class ExampleTest(BitcoinTestFramework):
self.log.info("Connect node2 and node1")
connect_nodes(self.nodes[1], 2)
self.log.info("Wait for node2 to receive all the blocks from node1")
self.sync_all()
self.log.info("Add P2P connection to node2")
# We can't add additional P2P connections once the network thread has started. Disconnect the connection
# to node0, wait for the network thread to terminate, then connect to node2. This is specific to
@ -199,7 +202,7 @@ class ExampleTest(BitcoinTestFramework):
network_thread_start()
self.nodes[2].p2p.wait_for_verack()
self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us")
self.log.info("Test that node2 propagates all the blocks to us")
getdata_request = msg_getdata()
for block in blocks:

View File

@ -23,6 +23,11 @@ class NetTest(BitcoinTestFramework):
self.num_nodes = 2
def run_test(self):
# Wait for one ping/pong to finish so that we can be sure that there is no chatter between nodes for some time
# Especially the exchange of messages like getheaders and friends causes test failures here
self.nodes[0].ping()
wait_until(lambda: all(['pingtime' in n for n in self.nodes[0].getpeerinfo()]))
self._test_connection_count()
self._test_getnettotals()
self._test_getnetworkinginfo()
@ -60,8 +65,8 @@ class NetTest(BitcoinTestFramework):
peer_info_after_ping = self.nodes[0].getpeerinfo()
for before, after in zip(peer_info, peer_info_after_ping):
assert_greater_than_or_equal(after['bytesrecv_per_msg']['pong'], before['bytesrecv_per_msg']['pong'] + 32)
assert_greater_than_or_equal(after['bytessent_per_msg']['ping'], before['bytessent_per_msg']['ping'] + 32)
assert_greater_than_or_equal(after['bytesrecv_per_msg'].get('pong', 0), before['bytesrecv_per_msg'].get('pong', 0) + 32)
assert_greater_than_or_equal(after['bytessent_per_msg'].get('ping', 0), before['bytessent_per_msg'].get('ping', 0) + 32)
def _test_getnetworkinginfo(self):
assert_equal(self.nodes[0].getnetworkinfo()['networkactive'], True)