mirror of
https://github.com/dashpay/dash.git
synced 2024-12-26 20:42:59 +01:00
merge bitcoin#19911: guard vRecvGetData with cs_vRecv and orphan_work_set with g_cs_orphans
This commit is contained in:
parent
f2384ffa90
commit
60d589014c
@ -958,7 +958,6 @@ public:
|
|||||||
|
|
||||||
CCriticalSection cs_sendProcessing;
|
CCriticalSection cs_sendProcessing;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
|
||||||
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
|
||||||
std::atomic<int> nRecvVersion{INIT_PROTO_VERSION};
|
std::atomic<int> nRecvVersion{INIT_PROTO_VERSION};
|
||||||
|
|
||||||
@ -1134,8 +1133,6 @@ public:
|
|||||||
// If true, we will send him all quorum related messages, even if he is not a member of our quorums
|
// If true, we will send him all quorum related messages, even if he is not a member of our quorums
|
||||||
std::atomic<bool> qwatch{false};
|
std::atomic<bool> qwatch{false};
|
||||||
|
|
||||||
std::set<uint256> orphan_work_set;
|
|
||||||
|
|
||||||
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false, bool block_relay_only = false, bool inbound_onion = false);
|
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false, bool block_relay_only = false, bool inbound_onion = false);
|
||||||
~CNode();
|
~CNode();
|
||||||
CNode(const CNode&) = delete;
|
CNode(const CNode&) = delete;
|
||||||
|
@ -463,7 +463,15 @@ struct Peer {
|
|||||||
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
|
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
|
||||||
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};
|
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};
|
||||||
|
|
||||||
Peer(NodeId id) : m_id(id) {}
|
/** Set of txids to reconsider once their parent transactions have been accepted **/
|
||||||
|
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
|
||||||
|
|
||||||
|
/** Protects m_getdata_requests **/
|
||||||
|
Mutex m_getdata_requests_mutex;
|
||||||
|
/** Work queue of items requested by this peer **/
|
||||||
|
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
|
||||||
|
|
||||||
|
explicit Peer(NodeId id) : m_id(id) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
using PeerRef = std::shared_ptr<Peer>;
|
using PeerRef = std::shared_ptr<Peer>;
|
||||||
@ -1836,12 +1844,12 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool,
|
void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool,
|
||||||
LLMQContext& llmq_ctx, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
|
LLMQContext& llmq_ctx, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(peer.m_getdata_requests_mutex)
|
||||||
{
|
{
|
||||||
AssertLockNotHeld(cs_main);
|
AssertLockNotHeld(cs_main);
|
||||||
|
|
||||||
std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin();
|
std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
|
||||||
std::vector<CInv> vNotFound;
|
std::vector<CInv> vNotFound;
|
||||||
const CNetMsgMaker msgMaker(pfrom.GetSendVersion());
|
const CNetMsgMaker msgMaker(pfrom.GetSendVersion());
|
||||||
|
|
||||||
@ -1857,7 +1865,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
|||||||
// Process as many TX items from the front of the getdata queue as
|
// Process as many TX items from the front of the getdata queue as
|
||||||
// possible, since they're common and it's efficient to batch process
|
// possible, since they're common and it's efficient to batch process
|
||||||
// them.
|
// them.
|
||||||
while (it != pfrom.vRecvGetData.end() && it->IsKnownType()) {
|
while (it != peer.m_getdata_requests.end() && it->IsKnownType()) {
|
||||||
if (interruptMsgProc)
|
if (interruptMsgProc)
|
||||||
return;
|
return;
|
||||||
// The send buffer provides backpressure. If there's no space in
|
// The send buffer provides backpressure. If there's no space in
|
||||||
@ -2031,7 +2039,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
|||||||
|
|
||||||
// Only process one BLOCK item per call, since they're uncommon and can be
|
// Only process one BLOCK item per call, since they're uncommon and can be
|
||||||
// expensive to process.
|
// expensive to process.
|
||||||
if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) {
|
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
|
||||||
const CInv &inv = *it++;
|
const CInv &inv = *it++;
|
||||||
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK) {
|
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK) {
|
||||||
ProcessGetBlockData(pfrom, chainparams, inv, connman, *llmq_ctx.isman);
|
ProcessGetBlockData(pfrom, chainparams, inv, connman, *llmq_ctx.isman);
|
||||||
@ -2040,7 +2048,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
|
|||||||
// and continue processing the queue on the next call.
|
// and continue processing the queue on the next call.
|
||||||
}
|
}
|
||||||
|
|
||||||
pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it);
|
peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it);
|
||||||
|
|
||||||
if (!vNotFound.empty()) {
|
if (!vNotFound.empty()) {
|
||||||
// Let the peer know that we didn't find what it asked for, so it doesn't
|
// Let the peer know that we didn't find what it asked for, so it doesn't
|
||||||
@ -2609,6 +2617,9 @@ void PeerManager::ProcessMessage(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PeerRef peer = GetPeerRef(pfrom.GetId());
|
||||||
|
if (peer == nullptr) return;
|
||||||
|
|
||||||
if (!(pfrom.GetLocalServices() & NODE_BLOOM) &&
|
if (!(pfrom.GetLocalServices() & NODE_BLOOM) &&
|
||||||
(msg_type == NetMsgType::FILTERLOAD ||
|
(msg_type == NetMsgType::FILTERLOAD ||
|
||||||
msg_type == NetMsgType::FILTERADD))
|
msg_type == NetMsgType::FILTERADD))
|
||||||
@ -3106,8 +3117,11 @@ void PeerManager::ProcessMessage(
|
|||||||
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
|
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
|
||||||
}
|
}
|
||||||
|
|
||||||
pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end());
|
{
|
||||||
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, *m_llmq_ctx, interruptMsgProc);
|
LOCK(peer->m_getdata_requests_mutex);
|
||||||
|
peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
|
||||||
|
ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, *m_llmq_ctx, interruptMsgProc);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3197,6 +3211,7 @@ void PeerManager::ProcessMessage(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
|
||||||
const CBlockIndex* pindex = g_chainman.m_blockman.LookupBlockIndex(req.blockhash);
|
const CBlockIndex* pindex = g_chainman.m_blockman.LookupBlockIndex(req.blockhash);
|
||||||
@ -3205,7 +3220,16 @@ void PeerManager::ProcessMessage(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
|
if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
|
||||||
|
CBlock block;
|
||||||
|
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
|
||||||
|
assert(ret);
|
||||||
|
|
||||||
|
SendBlockTransactions(pfrom, block, req);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If an older block is requested (should never happen in practice,
|
// If an older block is requested (should never happen in practice,
|
||||||
// but can happen in tests) send a block response instead of a
|
// but can happen in tests) send a block response instead of a
|
||||||
// blocktxn response. Sending a full block response instead of a
|
// blocktxn response. Sending a full block response instead of a
|
||||||
@ -3215,21 +3239,13 @@ void PeerManager::ProcessMessage(
|
|||||||
// actually receive all the data read from disk over the network.
|
// actually receive all the data read from disk over the network.
|
||||||
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
|
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
|
||||||
CInv inv;
|
CInv inv;
|
||||||
inv.type = MSG_BLOCK;
|
WITH_LOCK(cs_main, inv.type = MSG_BLOCK);
|
||||||
inv.hash = req.blockhash;
|
inv.hash = req.blockhash;
|
||||||
pfrom.vRecvGetData.push_back(inv);
|
WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv));
|
||||||
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
|
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
CBlock block;
|
|
||||||
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
|
|
||||||
assert(ret);
|
|
||||||
|
|
||||||
SendBlockTransactions(pfrom, block, req);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg_type == NetMsgType::GETHEADERS || msg_type == NetMsgType::GETHEADERS2) {
|
if (msg_type == NetMsgType::GETHEADERS || msg_type == NetMsgType::GETHEADERS2) {
|
||||||
CBlockLocator locator;
|
CBlockLocator locator;
|
||||||
uint256 hashStop;
|
uint256 hashStop;
|
||||||
@ -3369,7 +3385,7 @@ void PeerManager::ProcessMessage(
|
|||||||
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
|
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
|
||||||
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
|
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
|
||||||
for (const auto& elem : it_by_prev->second) {
|
for (const auto& elem : it_by_prev->second) {
|
||||||
pfrom.orphan_work_set.insert(elem->first);
|
peer->m_orphan_work_set.insert(elem->first);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3382,7 +3398,7 @@ void PeerManager::ProcessMessage(
|
|||||||
m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000);
|
m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000);
|
||||||
|
|
||||||
// Recursively process any orphan transactions that depended on this one
|
// Recursively process any orphan transactions that depended on this one
|
||||||
ProcessOrphanTx(pfrom.orphan_work_set);
|
ProcessOrphanTx(peer->m_orphan_work_set);
|
||||||
}
|
}
|
||||||
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
|
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
|
||||||
{
|
{
|
||||||
@ -4201,21 +4217,37 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
|
|||||||
const CChainParams& chainparams = Params();
|
const CChainParams& chainparams = Params();
|
||||||
bool fMoreWork = false;
|
bool fMoreWork = false;
|
||||||
|
|
||||||
if (!pfrom->vRecvGetData.empty())
|
PeerRef peer = GetPeerRef(pfrom->GetId());
|
||||||
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, *m_llmq_ctx, interruptMsgProc);
|
if (peer == nullptr) return false;
|
||||||
|
|
||||||
if (!pfrom->orphan_work_set.empty()) {
|
{
|
||||||
|
LOCK(peer->m_getdata_requests_mutex);
|
||||||
|
if (!peer->m_getdata_requests.empty()) {
|
||||||
|
ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, *m_llmq_ctx, interruptMsgProc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
LOCK2(cs_main, g_cs_orphans);
|
LOCK2(cs_main, g_cs_orphans);
|
||||||
ProcessOrphanTx(pfrom->orphan_work_set);
|
if (!peer->m_orphan_work_set.empty()) {
|
||||||
|
ProcessOrphanTx(peer->m_orphan_work_set);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfrom->fDisconnect)
|
if (pfrom->fDisconnect)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// this maintains the order of responses
|
// this maintains the order of responses
|
||||||
// and prevents vRecvGetData to grow unbounded
|
// and prevents m_getdata_requests to grow unbounded
|
||||||
if (!pfrom->vRecvGetData.empty()) return true;
|
{
|
||||||
if (!pfrom->orphan_work_set.empty()) return true;
|
LOCK(peer->m_getdata_requests_mutex);
|
||||||
|
if (!peer->m_getdata_requests.empty()) return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
LOCK(g_cs_orphans);
|
||||||
|
if (!peer->m_orphan_work_set.empty()) return true;
|
||||||
|
}
|
||||||
|
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->fPauseSend)
|
if (pfrom->fPauseSend)
|
||||||
@ -4242,10 +4274,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
|
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
|
||||||
if (interruptMsgProc)
|
if (interruptMsgProc) return false;
|
||||||
return false;
|
{
|
||||||
if (!pfrom->vRecvGetData.empty())
|
LOCK(peer->m_getdata_requests_mutex);
|
||||||
fMoreWork = true;
|
if (!peer->m_getdata_requests.empty()) fMoreWork = true;
|
||||||
|
}
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
|
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
Loading…
Reference in New Issue
Block a user