Merge #7840: Several performance and privacy improvements to inv/mempool handling
b559914 Move bloom and feerate filtering to just prior to tx sending. (Gregory Maxwell) 4578215 Return mempool queries in dependency order (Pieter Wuille) ed70683 Handle mempool requests in send loop, subject to trickle (Pieter Wuille) dc13dcd Split up and optimize transaction and block inv queues (Pieter Wuille) f2d3ba7 Eliminate TX trickle bypass, sort TX invs for privacy and priority. (Gregory Maxwell)
This commit is contained in:
parent
33233409a9
commit
5d8e94a26f
16
src/net.cpp
16
src/net.cpp
@ -2514,20 +2514,7 @@ void CConnman::RelayTransaction(const CTransaction& tx, CFeeRate feerate, const
|
||||
LOCK(cs_vNodes);
|
||||
BOOST_FOREACH(CNode* pnode, vNodes)
|
||||
{
|
||||
if(!pnode->fRelayTxes)
|
||||
continue;
|
||||
{
|
||||
LOCK(pnode->cs_feeFilter);
|
||||
if (feerate.GetFeePerK() < pnode->minFeeFilter)
|
||||
continue;
|
||||
}
|
||||
LOCK(pnode->cs_filter);
|
||||
if (pnode->pfilter)
|
||||
{
|
||||
if (pnode->pfilter->IsRelevantAndUpdate(tx))
|
||||
pnode->PushInventory(inv);
|
||||
} else
|
||||
pnode->PushInventory(inv);
|
||||
pnode->PushInventory(inv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2703,6 +2690,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
|
||||
hashContinue = uint256();
|
||||
nStartingHeight = -1;
|
||||
filterInventoryKnown.reset();
|
||||
fSendMempool = false;
|
||||
fGetAddr = false;
|
||||
nNextLocalAddrSend = 0;
|
||||
nNextAddrSend = 0;
|
||||
|
23
src/net.h
23
src/net.h
@ -708,7 +708,7 @@ public:
|
||||
// a) it allows us to not relay tx invs before receiving the peer's version message
|
||||
// b) the peer may tell us in its version message that we should not relay tx invs
|
||||
// unless it loads a bloom filter.
|
||||
bool fRelayTxes;
|
||||
bool fRelayTxes; //protected by cs_filter
|
||||
bool fSentAddr;
|
||||
// If 'true' this node will be disconnected on CMasternodeMan::ProcessMasternodeConnections()
|
||||
bool fMasternode;
|
||||
@ -740,7 +740,13 @@ public:
|
||||
|
||||
// inventory based relay
|
||||
CRollingBloomFilter filterInventoryKnown;
|
||||
std::vector<CInv> vInventoryToSend;
|
||||
// Set of transaction ids we still have to announce.
|
||||
// They are sorted by the mempool before relay, so the order is not important.
|
||||
std::set<uint256> setInventoryTxToSend;
|
||||
// List of block ids we still have announce.
|
||||
// There is no final sorting before sending, as they are always sent immediately
|
||||
// and in the order requested.
|
||||
std::vector<uint256> vInventoryBlockToSend;
|
||||
CCriticalSection cs_inventory;
|
||||
std::set<uint256> setAskFor;
|
||||
std::multimap<int64_t, CInv> mapAskFor;
|
||||
@ -751,6 +757,8 @@ public:
|
||||
// Blocks received by INV while headers chain was too far behind. These are used to delay GETHEADERS messages
|
||||
// Also protected by cs_inventory
|
||||
std::vector<uint256> vBlockHashesFromINV;
|
||||
// Used for BIP35 mempool sending, also protected by cs_inventory
|
||||
bool fSendMempool;
|
||||
|
||||
// Block and TXN accept times
|
||||
std::atomic<int64_t> nLastBlockTime;
|
||||
@ -872,14 +880,15 @@ public:
|
||||
|
||||
void PushInventory(const CInv& inv)
|
||||
{
|
||||
{
|
||||
LOCK(cs_inventory);
|
||||
if (inv.type == MSG_TX && filterInventoryKnown.contains(inv.hash)) {
|
||||
LOCK(cs_inventory);
|
||||
if (inv.type == MSG_TX) {
|
||||
if (!filterInventoryKnown.contains(inv.hash)) {
|
||||
LogPrint("net", "PushInventory -- filtered inv: %s peer=%d\n", inv.ToString(), id);
|
||||
return;
|
||||
setInventoryTxToSend.insert(inv.hash);
|
||||
}
|
||||
} else if (inv.type == MSG_BLOCK) {
|
||||
LogPrint("net", "PushInventory -- inv: %s peer=%d\n", inv.ToString(), id);
|
||||
vInventoryToSend.push_back(inv);
|
||||
vInventoryBlockToSend.push_back(inv.hash);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1955,40 +1955,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
||||
|
||||
else if (strCommand == NetMsgType::MEMPOOL)
|
||||
{
|
||||
if (connman.OutboundTargetReached(false) && !pfrom->fWhitelisted)
|
||||
{
|
||||
LogPrint("net", "mempool request with bandwidth limit reached, disconnect peer=%d\n", pfrom->GetId());
|
||||
pfrom->fDisconnect = true;
|
||||
return true;
|
||||
}
|
||||
LOCK2(cs_main, pfrom->cs_filter);
|
||||
|
||||
std::vector<uint256> vtxid;
|
||||
mempool.queryHashes(vtxid);
|
||||
vector<CInv> vInv;
|
||||
BOOST_FOREACH(uint256& hash, vtxid) {
|
||||
CInv inv(MSG_TX, hash);
|
||||
if (pfrom->pfilter) {
|
||||
CTransaction tx;
|
||||
bool fInMemPool = mempool.lookup(hash, tx);
|
||||
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
|
||||
if (!pfrom->pfilter->IsRelevantAndUpdate(tx)) continue;
|
||||
}
|
||||
if (pfrom->minFeeFilter) {
|
||||
CFeeRate feeRate;
|
||||
mempool.lookupFeeRate(hash, feeRate);
|
||||
LOCK(pfrom->cs_feeFilter);
|
||||
if (feeRate.GetFeePerK() < pfrom->minFeeFilter)
|
||||
continue;
|
||||
}
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pfrom, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
}
|
||||
if (vInv.size() > 0)
|
||||
connman.PushMessage(pfrom, NetMsgType::INV, vInv);
|
||||
LOCK(pfrom->cs_inventory);
|
||||
pfrom->fSendMempool = true;
|
||||
}
|
||||
|
||||
|
||||
@ -2107,6 +2075,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
||||
CBloomFilter filter;
|
||||
vRecv >> filter;
|
||||
|
||||
LOCK(pfrom->cs_filter);
|
||||
|
||||
if (!filter.IsWithinSizeConstraints())
|
||||
{
|
||||
// There is no excuse for sending a too-large filter
|
||||
@ -2115,7 +2085,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
||||
}
|
||||
else
|
||||
{
|
||||
LOCK(pfrom->cs_filter);
|
||||
delete pfrom->pfilter;
|
||||
pfrom->pfilter = new CBloomFilter(filter);
|
||||
pfrom->pfilter->UpdateEmptyFull();
|
||||
@ -2338,6 +2307,22 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
||||
return fMoreWork;
|
||||
}
|
||||
|
||||
class CompareInvMempoolOrder
|
||||
{
|
||||
CTxMemPool *mp;
|
||||
public:
|
||||
CompareInvMempoolOrder(CTxMemPool *mempool)
|
||||
{
|
||||
mp = mempool;
|
||||
}
|
||||
|
||||
bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
|
||||
{
|
||||
/* As std::make_heap produces a max-heap, we want the entries with the
|
||||
* fewest ancestors/highest fee to sort later. */
|
||||
return mp->CompareDepthAndScore(*b, *a);
|
||||
}
|
||||
};
|
||||
|
||||
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||
{
|
||||
@ -2590,57 +2575,133 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
|
||||
// Message: inventory
|
||||
//
|
||||
vector<CInv> vInv;
|
||||
vector<CInv> vInvWait;
|
||||
{
|
||||
bool fSendTrickle = pto->fWhitelisted;
|
||||
if (pto->nNextInvSend < nNow) {
|
||||
fSendTrickle = true;
|
||||
pto->nNextInvSend = PoissonNextSend(nNow, AVG_INVENTORY_BROADCAST_INTERVAL);
|
||||
}
|
||||
LOCK(pto->cs_inventory);
|
||||
vInv.reserve(std::min<size_t>(1000, pto->vInventoryToSend.size()));
|
||||
vInvWait.reserve(pto->vInventoryToSend.size());
|
||||
BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend)
|
||||
{
|
||||
if (inv.type == MSG_TX && pto->filterInventoryKnown.contains(inv.hash))
|
||||
continue;
|
||||
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
|
||||
|
||||
// trickle out tx inv to protect privacy
|
||||
if (inv.type == MSG_TX && !fSendTrickle)
|
||||
{
|
||||
// 1/4 of tx invs blast to all immediately
|
||||
static uint256 hashSalt;
|
||||
if (hashSalt.IsNull())
|
||||
hashSalt = GetRandHash();
|
||||
uint256 hashRand = ArithToUint256(UintToArith256(inv.hash) ^ UintToArith256(hashSalt));
|
||||
hashRand = Hash(BEGIN(hashRand), END(hashRand));
|
||||
bool fTrickleWait = ((UintToArith256(hashRand) & 3) != 0);
|
||||
|
||||
if (fTrickleWait)
|
||||
{
|
||||
LogPrint("net", "SendMessages -- queued inv(vInvWait): %s index=%d peer=%d\n", inv.ToString(), vInvWait.size(), pto->id);
|
||||
vInvWait.push_back(inv);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pto->filterInventoryKnown.insert(inv.hash);
|
||||
|
||||
LogPrint("net", "SendMessages -- queued inv: %s index=%d peer=%d\n", inv.ToString(), vInv.size(), pto->id);
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() >= 1000)
|
||||
{
|
||||
LogPrint("net", "SendMessages -- pushing inv's: count=%d peer=%d\n", vInv.size(), pto->id);
|
||||
// Add blocks
|
||||
BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) {
|
||||
vInv.push_back(CInv(MSG_BLOCK, hash));
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
}
|
||||
pto->vInventoryToSend = vInvWait;
|
||||
pto->vInventoryBlockToSend.clear();
|
||||
|
||||
// Check whether periodic sends should happen
|
||||
bool fSendTrickle = pto->fWhitelisted;
|
||||
if (pto->nNextInvSend < nNow) {
|
||||
fSendTrickle = true;
|
||||
// Use half the delay for outbound peers, as there is less privacy concern for them.
|
||||
pto->nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> !pto->fInbound);
|
||||
}
|
||||
|
||||
// Time to send but the peer has requested we not relay transactions.
|
||||
if (fSendTrickle) {
|
||||
LOCK(pto->cs_filter);
|
||||
if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear();
|
||||
}
|
||||
|
||||
// Respond to BIP35 mempool requests
|
||||
if (fSendTrickle && pto->fSendMempool) {
|
||||
std::vector<uint256> vtxid;
|
||||
mempool.queryHashes(vtxid);
|
||||
pto->fSendMempool = false;
|
||||
CAmount filterrate = 0;
|
||||
{
|
||||
LOCK(pto->cs_feeFilter);
|
||||
filterrate = pto->minFeeFilter;
|
||||
}
|
||||
|
||||
LOCK(pto->cs_filter);
|
||||
|
||||
BOOST_FOREACH(const uint256& hash, vtxid) {
|
||||
CInv inv(MSG_TX, hash);
|
||||
pto->setInventoryTxToSend.erase(hash);
|
||||
if (filterrate) {
|
||||
CFeeRate feeRate;
|
||||
mempool.lookupFeeRate(hash, feeRate);
|
||||
if (feeRate.GetFeePerK() < filterrate)
|
||||
continue;
|
||||
}
|
||||
if (pto->pfilter) {
|
||||
CTransaction tx;
|
||||
bool fInMemPool = mempool.lookup(hash, tx);
|
||||
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
|
||||
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
|
||||
}
|
||||
pto->filterInventoryKnown.insert(hash);
|
||||
|
||||
LogPrint("net", "SendMessages -- queued inv: %s index=%d peer=%d\n", inv.ToString(), vInv.size(), pto->id);
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
LogPrint("net", "SendMessages -- pushing inv's: count=%d peer=%d\n", vInv.size(), pto->id);
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine transactions to relay
|
||||
if (fSendTrickle) {
|
||||
// Produce a vector with all candidates for sending
|
||||
vector<std::set<uint256>::iterator> vInvTx;
|
||||
vInvTx.reserve(pto->setInventoryTxToSend.size());
|
||||
for (std::set<uint256>::iterator it = pto->setInventoryTxToSend.begin(); it != pto->setInventoryTxToSend.end(); it++) {
|
||||
vInvTx.push_back(it);
|
||||
}
|
||||
CAmount filterrate = 0;
|
||||
{
|
||||
LOCK(pto->cs_feeFilter);
|
||||
filterrate = pto->minFeeFilter;
|
||||
}
|
||||
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
|
||||
// A heap is used so that not all items need sorting if only a few are being sent.
|
||||
CompareInvMempoolOrder compareInvMempoolOrder(&mempool);
|
||||
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
|
||||
// No reason to drain out at many times the network's capacity,
|
||||
// especially since we have many peers and some will draw much shorter delays.
|
||||
unsigned int nRelayedTransactions = 0;
|
||||
LOCK(pto->cs_filter);
|
||||
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
|
||||
// Fetch the top element from the heap
|
||||
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
|
||||
std::set<uint256>::iterator it = vInvTx.back();
|
||||
vInvTx.pop_back();
|
||||
uint256 hash = *it;
|
||||
// Remove it from the to-be-sent set
|
||||
pto->setInventoryTxToSend.erase(it);
|
||||
// Check if not in the filter already
|
||||
if (pto->filterInventoryKnown.contains(hash)) {
|
||||
continue;
|
||||
}
|
||||
// Not in the mempool anymore? don't bother sending it.
|
||||
CFeeRate feeRate;
|
||||
if (!mempool.lookupFeeRate(hash, feeRate)) {
|
||||
continue;
|
||||
}
|
||||
if (filterrate && feeRate.GetFeePerK() < filterrate) {
|
||||
continue;
|
||||
}
|
||||
if (pto->pfilter) {
|
||||
CTransaction tx;
|
||||
if (!mempool.lookup(hash, tx)) continue;
|
||||
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
|
||||
}
|
||||
// Send
|
||||
vInv.push_back(CInv(MSG_TX, hash));
|
||||
nRelayedTransactions++;
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
pto->filterInventoryKnown.insert(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!vInv.empty()) {
|
||||
LogPrint("net", "SendMessages -- pushing tailing inv's: count=%d peer=%d\n", vInv.size(), pto->id);
|
||||
if (!vInv.empty())
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
}
|
||||
|
||||
// Detect whether we're stalling
|
||||
nNow = GetTimeMicros();
|
||||
|
@ -888,6 +888,31 @@ void CTxMemPool::check(const CCoinsViewCache *pcoins) const
|
||||
assert(innerUsage == cachedInnerUsage);
|
||||
}
|
||||
|
||||
bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb)
|
||||
{
|
||||
LOCK(cs);
|
||||
indexed_transaction_set::const_iterator i = mapTx.find(hasha);
|
||||
if (i == mapTx.end()) return false;
|
||||
indexed_transaction_set::const_iterator j = mapTx.find(hashb);
|
||||
if (j == mapTx.end()) return true;
|
||||
uint64_t counta = i->GetCountWithAncestors();
|
||||
uint64_t countb = j->GetCountWithAncestors();
|
||||
if (counta == countb) {
|
||||
return CompareTxMemPoolEntryByScore()(*i, *j);
|
||||
}
|
||||
return counta < countb;
|
||||
}
|
||||
|
||||
namespace {
|
||||
class DepthAndScoreComparator
|
||||
{
|
||||
CTxMemPool *mp;
|
||||
public:
|
||||
DepthAndScoreComparator(CTxMemPool *mempool) : mp(mempool) {}
|
||||
bool operator()(const uint256& a, const uint256& b) { return mp->CompareDepthAndScore(a, b); }
|
||||
};
|
||||
}
|
||||
|
||||
void CTxMemPool::queryHashes(vector<uint256>& vtxid)
|
||||
{
|
||||
vtxid.clear();
|
||||
@ -896,6 +921,8 @@ void CTxMemPool::queryHashes(vector<uint256>& vtxid)
|
||||
vtxid.reserve(mapTx.size());
|
||||
for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi)
|
||||
vtxid.push_back(mi->GetTx().GetHash());
|
||||
|
||||
std::sort(vtxid.begin(), vtxid.end(), DepthAndScoreComparator(this));
|
||||
}
|
||||
|
||||
bool CTxMemPool::lookup(uint256 hash, CTransaction& result) const
|
||||
|
@ -549,6 +549,7 @@ public:
|
||||
std::list<CTransaction>& conflicts, bool fCurrentEstimate = true);
|
||||
void clear();
|
||||
void _clear(); //lock free
|
||||
bool CompareDepthAndScore(const uint256& hasha, const uint256& hashb);
|
||||
void queryHashes(std::vector<uint256>& vtxid);
|
||||
bool isSpent(const COutPoint& outpoint);
|
||||
unsigned int GetTransactionsUpdated() const;
|
||||
|
@ -107,9 +107,12 @@ static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111;
|
||||
static const unsigned int AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL = 24 * 24 * 60;
|
||||
/** Average delay between peer address broadcasts in seconds. */
|
||||
static const unsigned int AVG_ADDRESS_BROADCAST_INTERVAL = 30;
|
||||
/** Average delay between trickled inventory broadcasts in seconds.
|
||||
* Blocks, whitelisted receivers, and a random 25% of transactions bypass this. */
|
||||
static const unsigned int AVG_INVENTORY_BROADCAST_INTERVAL = 5;
|
||||
/** Average delay between trickled inventory transmissions in seconds.
|
||||
* Blocks and whitelisted receivers bypass this, outbound peers get half this delay. */
|
||||
static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5;
|
||||
/** Maximum number of inventory items to send per transmission.
|
||||
* Limits the impact of low-fee transaction floods. */
|
||||
static const unsigned int INVENTORY_BROADCAST_MAX = 7 * INVENTORY_BROADCAST_INTERVAL;
|
||||
/** Average delay between feefilter broadcasts in seconds. */
|
||||
static const unsigned int AVG_FEEFILTER_BROADCAST_INTERVAL = 10 * 60;
|
||||
/** Maximum feefilter broadcast delay after significant change. */
|
||||
|
Loading…
Reference in New Issue
Block a user