mirror of
https://github.com/dashpay/dash.git
synced 2024-12-27 04:52:59 +01:00
net: add a flag to indicate when a node's send buffer is full
Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer.
This commit is contained in:
parent
c6e8a9bcff
commit
991955ee81
11
src/net.cpp
11
src/net.cpp
@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
|
|||||||
|
|
||||||
|
|
||||||
// requires LOCK(cs_vSend)
|
// requires LOCK(cs_vSend)
|
||||||
size_t SocketSendData(CNode *pnode)
|
size_t CConnman::SocketSendData(CNode *pnode)
|
||||||
{
|
{
|
||||||
auto it = pnode->vSendMsg.begin();
|
auto it = pnode->vSendMsg.begin();
|
||||||
size_t nSentSize = 0;
|
size_t nSentSize = 0;
|
||||||
@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
|
|||||||
if (pnode->nSendOffset == data.size()) {
|
if (pnode->nSendOffset == data.size()) {
|
||||||
pnode->nSendOffset = 0;
|
pnode->nSendOffset = 0;
|
||||||
pnode->nSendSize -= data.size();
|
pnode->nSendSize -= data.size();
|
||||||
|
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
|
||||||
it++;
|
it++;
|
||||||
} else {
|
} else {
|
||||||
// could not send full message; stop sending more
|
// could not send full message; stop sending more
|
||||||
@ -1286,10 +1287,11 @@ void CConnman::ThreadSocketHandler()
|
|||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend) {
|
if (lockSend) {
|
||||||
size_t nBytes = SocketSendData(pnode);
|
size_t nBytes = SocketSendData(pnode);
|
||||||
if (nBytes)
|
if (nBytes) {
|
||||||
RecordBytesSent(nBytes);
|
RecordBytesSent(nBytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Inactivity checking
|
// Inactivity checking
|
||||||
@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler()
|
|||||||
if (lockRecv)
|
if (lockRecv)
|
||||||
{
|
{
|
||||||
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
||||||
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
|
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (flagInterruptMsgProc)
|
if (flagInterruptMsgProc)
|
||||||
@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
|
|||||||
lastSentFeeFilter = 0;
|
lastSentFeeFilter = 0;
|
||||||
nextSendTimeFeeFilter = 0;
|
nextSendTimeFeeFilter = 0;
|
||||||
fPauseRecv = false;
|
fPauseRecv = false;
|
||||||
|
fPauseSend = false;
|
||||||
nProcessQueueSize = 0;
|
nProcessQueueSize = 0;
|
||||||
|
|
||||||
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
|
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
|
||||||
@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||||||
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
|
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
|
||||||
pnode->nSendSize += nTotalSize;
|
pnode->nSendSize += nTotalSize;
|
||||||
|
|
||||||
|
if (pnode->nSendSize > nSendBufferMaxSize)
|
||||||
|
pnode->fPauseSend = true;
|
||||||
pnode->vSendMsg.push_back(std::move(serializedHeader));
|
pnode->vSendMsg.push_back(std::move(serializedHeader));
|
||||||
if (nMessageSize)
|
if (nMessageSize)
|
||||||
pnode->vSendMsg.push_back(std::move(msg.data));
|
pnode->vSendMsg.push_back(std::move(msg.data));
|
||||||
|
@ -358,6 +358,7 @@ private:
|
|||||||
|
|
||||||
NodeId GetNewNodeId();
|
NodeId GetNewNodeId();
|
||||||
|
|
||||||
|
size_t SocketSendData(CNode *pnode);
|
||||||
//!check is the banlist has unwritten changes
|
//!check is the banlist has unwritten changes
|
||||||
bool BannedSetIsDirty();
|
bool BannedSetIsDirty();
|
||||||
//!set the "dirty" flag for the banlist
|
//!set the "dirty" flag for the banlist
|
||||||
@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup);
|
|||||||
void MapPort(bool fUseUPnP);
|
void MapPort(bool fUseUPnP);
|
||||||
unsigned short GetListenPort();
|
unsigned short GetListenPort();
|
||||||
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
|
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
|
||||||
size_t SocketSendData(CNode *pnode);
|
|
||||||
|
|
||||||
struct CombinerAll
|
struct CombinerAll
|
||||||
{
|
{
|
||||||
@ -652,6 +652,7 @@ public:
|
|||||||
|
|
||||||
const uint64_t nKeyedNetGroup;
|
const uint64_t nKeyedNetGroup;
|
||||||
std::atomic_bool fPauseRecv;
|
std::atomic_bool fPauseRecv;
|
||||||
|
std::atomic_bool fPauseSend;
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
||||||
|
@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
|
|||||||
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
||||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
|
||||||
vector<CInv> vNotFound;
|
vector<CInv> vNotFound;
|
||||||
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
|
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
|
||||||
while (it != pfrom->vRecvGetData.end()) {
|
while (it != pfrom->vRecvGetData.end()) {
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->fPauseSend)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
const CInv &inv = *it;
|
const CInv &inv = *it;
|
||||||
@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||||||
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
const CChainParams& chainparams = Params();
|
const CChainParams& chainparams = Params();
|
||||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
|
||||||
//
|
//
|
||||||
// Message format
|
// Message format
|
||||||
// (4) message start
|
// (4) message start
|
||||||
@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||||||
if (!pfrom->vRecvGetData.empty()) return true;
|
if (!pfrom->vRecvGetData.empty()) return true;
|
||||||
|
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
if (pfrom->fPauseSend)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
std::list<CNetMessage> msgs;
|
std::list<CNetMessage> msgs;
|
||||||
|
Loading…
Reference in New Issue
Block a user