net: add a flag to indicate when a node's send buffer is full

Similar to the recv flag, but this one indicates whether or not the net's send
buffer is full.

The socket handler checks the send queue when a new message is added and pauses
if necessary, and possibly unpauses after each message is drained from its buffer.
This commit is contained in:
Cory Fields 2016-12-31 02:05:32 -05:00
parent c6e8a9bcff
commit 991955ee81
3 changed files with 12 additions and 8 deletions

View File

@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend) // requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode) size_t CConnman::SocketSendData(CNode *pnode)
{ {
auto it = pnode->vSendMsg.begin(); auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0; size_t nSentSize = 0;
@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) { if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0; pnode->nSendOffset = 0;
pnode->nSendSize -= data.size(); pnode->nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++; it++;
} else { } else {
// could not send full message; stop sending more // could not send full message; stop sending more
@ -1286,10 +1287,11 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) { if (lockSend) {
size_t nBytes = SocketSendData(pnode); size_t nBytes = SocketSendData(pnode);
if (nBytes) if (nBytes) {
RecordBytesSent(nBytes); RecordBytesSent(nBytes);
} }
} }
}
// //
// Inactivity checking // Inactivity checking
@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler()
if (lockRecv) if (lockRecv)
{ {
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
} }
} }
if (flagInterruptMsgProc) if (flagInterruptMsgProc)
@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
lastSentFeeFilter = 0; lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0; nextSendTimeFeeFilter = 0;
fPauseRecv = false; fPauseRecv = false;
fPauseSend = false;
nProcessQueueSize = 0; nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize; pnode->nSendSize += nTotalSize;
if (pnode->nSendSize > nSendBufferMaxSize)
pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader)); pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize) if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data)); pnode->vSendMsg.push_back(std::move(msg.data));

View File

@ -358,6 +358,7 @@ private:
NodeId GetNewNodeId(); NodeId GetNewNodeId();
size_t SocketSendData(CNode *pnode);
//!check is the banlist has unwritten changes //!check is the banlist has unwritten changes
bool BannedSetIsDirty(); bool BannedSetIsDirty();
//!set the "dirty" flag for the banlist //!set the "dirty" flag for the banlist
@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup);
void MapPort(bool fUseUPnP); void MapPort(bool fUseUPnP);
unsigned short GetListenPort(); unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
size_t SocketSendData(CNode *pnode);
struct CombinerAll struct CombinerAll
{ {
@ -652,6 +652,7 @@ public:
const uint64_t nKeyedNetGroup; const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv; std::atomic_bool fPauseRecv;
std::atomic_bool fPauseSend;
protected: protected:
mapMsgCmdSize mapSendBytesPerMsgCmd; mapMsgCmdSize mapSendBytesPerMsgCmd;

View File

@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
vector<CInv> vNotFound; vector<CInv> vNotFound;
CNetMsgMaker msgMaker(pfrom->GetSendVersion()); CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main); LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) { while (it != pfrom->vRecvGetData.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize) if (pfrom->fPauseSend)
break; break;
const CInv &inv = *it; const CInv &inv = *it;
@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
// //
// Message format // Message format
// (4) message start // (4) message start
@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!pfrom->vRecvGetData.empty()) return true; if (!pfrom->vRecvGetData.empty()) return true;
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize) if (pfrom->fPauseSend)
return false; return false;
std::list<CNetMessage> msgs; std::list<CNetMessage> msgs;