From ca699cebaa85289f64c84e34d58c7cd1cc842a4b Mon Sep 17 00:00:00 2001 From: "Wladimir J. van der Laan" Date: Wed, 8 Jun 2016 14:01:05 +0200 Subject: [PATCH] Merge #8126: std::shared_ptr based CTransaction storage in mempool 288d85d Get rid of CTxMempool::lookup() entirely (Pieter Wuille) c2a4724 Optimization: use usec in expiration and reuse nNow (Pieter Wuille) e9b4780 Optimization: don't check the mempool at all if no mempool req ever (Pieter Wuille) dbfb426 Optimize the relay map to use shared_ptr's (Pieter Wuille) 8d39d7a Switch CTransaction storage in mempool to std::shared_ptr (Pieter Wuille) 1b9e6d3 Add support for unique_ptr and shared_ptr to memusage (Pieter Wuille) --- src/instantx.cpp | 2 +- src/memusage.h | 24 ++++++++ src/net_processing.cpp | 71 +++++++++------------- src/test/policyestimator_tests.cpp | 18 +++--- src/txmempool.cpp | 94 +++++++++++++++++++----------- src/txmempool.h | 29 +++++++-- src/validation.cpp | 4 +- 7 files changed, 149 insertions(+), 93 deletions(-) diff --git a/src/instantx.cpp b/src/instantx.cpp index 7cd95298d..f654a1a34 100644 --- a/src/instantx.cpp +++ b/src/instantx.cpp @@ -590,7 +590,7 @@ bool CInstantSend::ResolveConflicts(const CTxLockCandidate& txLockCandidate) return false; } else if (mempool.mapNextTx.count(txin.prevout)) { // check if it's in mempool - hashConflicting = mempool.mapNextTx[txin.prevout].ptx->GetHash(); + hashConflicting = mempool.mapNextTx.find(txin.prevout)->second->GetHash(); if(txHash == hashConflicting) continue; // matches current, not a conflict, skip to next txin // conflicts with tx in mempool LogPrintf("CInstantSend::ResolveConflicts -- ERROR: Failed to complete Transaction Lock, conflicts with mempool, txid=%s\n", txHash.ToString()); diff --git a/src/memusage.h b/src/memusage.h index 1043e20e1..59b3ee702 100644 --- a/src/memusage.h +++ b/src/memusage.h @@ -74,6 +74,15 @@ private: X x; }; +struct stl_shared_counter +{ + /* Various platforms use different sized counters here. + * Conservatively assume that they won't be larger than size_t. */ + void* class_type; + size_t use_count; + size_t weak_count; +}; + template static inline size_t DynamicUsage(const std::vector& v) { @@ -124,6 +133,21 @@ static inline size_t IncrementalDynamicUsage(const indirectmap& m) return MallocUsage(sizeof(stl_tree_node >)); } +template +static inline size_t DynamicUsage(const std::unique_ptr& p) +{ + return p ? MallocUsage(sizeof(X)) : 0; +} + +template +static inline size_t DynamicUsage(const std::shared_ptr& p) +{ + // A shared_ptr can either use a single continuous memory block for both + // the counter and the storage (when using std::make_shared), or separate. + // We can't observe the difference, however, so assume the worst. + return p ? MallocUsage(sizeof(X)) + MallocUsage(sizeof(stl_shared_counter)) : 0; +} + // Boost data structures template diff --git a/src/net_processing.cpp b/src/net_processing.cpp index a0e93fe1d..ab85ac200 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -52,10 +52,6 @@ int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last extern FeeFilterRounder filterRounder; -std::map mapRelay; -std::deque > vRelayExpiration; -CCriticalSection cs_mapRelay; - struct COrphanTx { CTransaction tx; NodeId fromPeer; @@ -110,9 +106,14 @@ namespace { /** Number of preferable block download peers. */ int nPreferredDownload = 0; - /** Number of peers from which we're downloading blocks. */ int nPeersWithValidatedDownloads = 0; + + /** Relay map, protected by cs_main. */ + typedef std::map> MapRelay; + MapRelay mapRelay; + /** Expiration-time ordered list of (expire time, relay map entry) pairs, protected by cs_main). */ + std::deque> vRelayExpiration; } // anon namespace ////////////////////////////////////////////////////////////////////////////// @@ -886,34 +887,26 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam } else if (inv.IsKnownType()) { - CTransaction tx; // Send stream from relay memory bool push = false; - // Only serve MSG_TX from mapRelay. // Otherwise we may send out a normal TX instead of a IX if (inv.type == MSG_TX) { - LOCK(cs_mapRelay); - map::iterator mi = mapRelay.find(inv.hash); + auto mi = mapRelay.find(inv.hash); if (mi != mapRelay.end()) { - tx = (*mi).second; + connman.PushMessage(pfrom, NetMsgType::TX, *mi->second); push = true; + } else if (pfrom->timeLastMempoolReq) { + auto txinfo = mempool.info(inv.hash); + // To protect privacy, do not answer getdata using the mempool when + // that TX couldn't have been INVed in reply to a MEMPOOL request. + if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { + connman.PushMessage(pfrom, NetMsgType::TX, *txinfo.tx); + push = true; + } } } - if (!push && inv.type == MSG_TX) { - int64_t txtime; - // To protect privacy, do not answer getdata using the mempool when - // that TX couldn't have been INVed in reply to a MEMPOOL request. - if (mempool.lookup(inv.hash, tx, txtime) && txtime <= pfrom->timeLastMempoolReq) { - push = true; - } - } - - if (push) { - pfrom->PushMessage(inv.GetCommand(), tx); - } - if (!push && inv.type == MSG_TXLOCK_REQUEST) { CTxLockRequest txLockRequest; if(instantsend.GetTxLockRequest(inv.hash, txLockRequest)) { @@ -2580,8 +2573,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interruptMsg // Respond to BIP35 mempool requests if (fSendTrickle && pto->fSendMempool) { - std::vector vtxid; - mempool.queryHashes(vtxid); + auto vtxinfo = mempool.infoAll(); pto->fSendMempool = false; CAmount filterrate = 0; { @@ -2591,20 +2583,16 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interruptMsg LOCK(pto->cs_filter); - BOOST_FOREACH(const uint256& hash, vtxid) { + for (const auto& txinfo : vtxinfo) { + const uint256& hash = txinfo.tx->GetHash(); CInv inv(MSG_TX, hash); pto->setInventoryTxToSend.erase(hash); if (filterrate) { - CFeeRate feeRate; - mempool.lookupFeeRate(hash, feeRate); - if (feeRate.GetFeePerK() < filterrate) + if (txinfo.feeRate.GetFeePerK() < filterrate) continue; } if (pto->pfilter) { - CTransaction tx; - bool fInMemPool = mempool.lookup(hash, tx); - if (!fInMemPool) continue; // another thread removed since queryHashes, maybe... - if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue; + if (!pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; } pto->filterInventoryKnown.insert(hash); @@ -2652,31 +2640,28 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic& interruptMsg continue; } // Not in the mempool anymore? don't bother sending it. - CFeeRate feeRate; - if (!mempool.lookupFeeRate(hash, feeRate)) { + auto txinfo = mempool.info(hash); + if (!txinfo.tx) { continue; } - if (filterrate && feeRate.GetFeePerK() < filterrate) { + if (filterrate && txinfo.feeRate.GetFeePerK() < filterrate) { continue; } - CTransaction tx; - if (!mempool.lookup(hash, tx)) continue; - if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(tx)) continue; + if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send vInv.push_back(CInv(MSG_TX, hash)); nRelayedTransactions++; { - LOCK(cs_mapRelay); // Expire old relay messages - while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime()) + while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow) { mapRelay.erase(vRelayExpiration.front().second); vRelayExpiration.pop_front(); } - auto ret = mapRelay.insert(std::make_pair(hash, tx)); + auto ret = mapRelay.insert(std::make_pair(hash, std::move(txinfo.tx))); if (ret.second) { - vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, hash)); + vRelayExpiration.push_back(std::make_pair(nNow + 15 * 60 * 1000000, ret.first)); } } if (vInv.size() == MAX_INV_SZ) { diff --git a/src/test/policyestimator_tests.cpp b/src/test/policyestimator_tests.cpp index da7e2e418..31a9dab49 100644 --- a/src/test/policyestimator_tests.cpp +++ b/src/test/policyestimator_tests.cpp @@ -74,9 +74,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) // 9/10 blocks add 2nd highest and so on until ... // 1/10 blocks add lowest fee/pri transactions while (txHashes[9-h].size()) { - CTransaction btx; - if (mpool.lookup(txHashes[9-h].back(), btx)) - block.push_back(btx); + std::shared_ptr ptx = mpool.get(txHashes[9-h].back()); + if (ptx) + block.push_back(*ptx); txHashes[9-h].pop_back(); } } @@ -160,9 +160,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) // Estimates should still not be below original for (int j = 0; j < 10; j++) { while(txHashes[j].size()) { - CTransaction btx; - if (mpool.lookup(txHashes[j].back(), btx)) - block.push_back(btx); + std::shared_ptr ptx = mpool.get(txHashes[j].back()); + if (ptx) + block.push_back(*ptx); txHashes[j].pop_back(); } } @@ -181,9 +181,9 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) tx.vin[0].prevout.n = 10000*blocknum+100*j+k; uint256 hash = tx.GetHash(); mpool.addUnchecked(hash, entry.Fee(feeV[k/4][j]).Time(GetTime()).Priority(priV[k/4][j]).Height(blocknum).FromTx(tx, &mpool)); - CTransaction btx; - if (mpool.lookup(hash, btx)) - block.push_back(btx); + std::shared_ptr ptx = mpool.get(hash); + if (ptx) + block.push_back(*ptx); } } mpool.removeForBlock(block, ++blocknum, dummyConflicted); diff --git a/src/txmempool.cpp b/src/txmempool.cpp index ca7222380..36b13918b 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -24,18 +24,18 @@ CTxMemPoolEntry::CTxMemPoolEntry(const CTransaction& _tx, const CAmount& _nFee, int64_t _nTime, double _entryPriority, unsigned int _entryHeight, bool poolHasNoInputsOf, CAmount _inChainInputValue, bool _spendsCoinbase, unsigned int _sigOps, LockPoints lp): - tx(_tx), nFee(_nFee), nTime(_nTime), entryPriority(_entryPriority), entryHeight(_entryHeight), + tx(std::make_shared(_tx)), nFee(_nFee), nTime(_nTime), entryPriority(_entryPriority), entryHeight(_entryHeight), hadNoDependencies(poolHasNoInputsOf), inChainInputValue(_inChainInputValue), spendsCoinbase(_spendsCoinbase), sigOpCount(_sigOps), lockPoints(lp) { - nTxSize = ::GetSerializeSize(tx, SER_NETWORK, PROTOCOL_VERSION); - nModSize = tx.CalculateModifiedSize(nTxSize); - nUsageSize = RecursiveDynamicUsage(tx); + nTxSize = ::GetSerializeSize(_tx, SER_NETWORK, PROTOCOL_VERSION); + nModSize = _tx.CalculateModifiedSize(nTxSize); + nUsageSize = RecursiveDynamicUsage(*tx) + memusage::DynamicUsage(tx); nCountWithDescendants = 1; nSizeWithDescendants = nTxSize; nModFeesWithDescendants = nFee; - CAmount nValueIn = tx.GetValueOut()+nFee; + CAmount nValueIn = _tx.GetValueOut()+nFee; assert(inChainInputValue <= nValueIn); feeDelta = 0; @@ -904,50 +904,76 @@ bool CTxMemPool::CompareDepthAndScore(const uint256& hasha, const uint256& hashb namespace { class DepthAndScoreComparator { - CTxMemPool *mp; public: - DepthAndScoreComparator(CTxMemPool *mempool) : mp(mempool) {} - bool operator()(const uint256& a, const uint256& b) { return mp->CompareDepthAndScore(a, b); } + bool operator()(const CTxMemPool::indexed_transaction_set::const_iterator& a, const CTxMemPool::indexed_transaction_set::const_iterator& b) + { + uint64_t counta = a->GetCountWithAncestors(); + uint64_t countb = b->GetCountWithAncestors(); + if (counta == countb) { + return CompareTxMemPoolEntryByScore()(*a, *b); + } + return counta < countb; + } }; } +std::vector CTxMemPool::GetSortedDepthAndScore() const +{ + std::vector iters; + AssertLockHeld(cs); + + iters.reserve(mapTx.size()); + + for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi) { + iters.push_back(mi); + } + std::sort(iters.begin(), iters.end(), DepthAndScoreComparator()); + return iters; +} + void CTxMemPool::queryHashes(vector& vtxid) { + LOCK(cs); + auto iters = GetSortedDepthAndScore(); + vtxid.clear(); - - LOCK(cs); vtxid.reserve(mapTx.size()); - for (indexed_transaction_set::iterator mi = mapTx.begin(); mi != mapTx.end(); ++mi) - vtxid.push_back(mi->GetTx().GetHash()); - std::sort(vtxid.begin(), vtxid.end(), DepthAndScoreComparator(this)); + for (auto it : iters) { + vtxid.push_back(it->GetTx().GetHash()); + } } - -bool CTxMemPool::lookup(uint256 hash, CTransaction& result, int64_t& time) const +std::vector CTxMemPool::infoAll() const { LOCK(cs); - indexed_transaction_set::const_iterator i = mapTx.find(hash); - if (i == mapTx.end()) return false; - result = i->GetTx(); - time = i->GetTime(); - return true; + auto iters = GetSortedDepthAndScore(); + + std::vector ret; + ret.reserve(mapTx.size()); + for (auto it : iters) { + ret.push_back(TxMempoolInfo{it->GetSharedTx(), it->GetTime(), CFeeRate(it->GetFee(), it->GetTxSize())}); + } + + return ret; } -bool CTxMemPool::lookup(uint256 hash, CTransaction& result) const -{ - int64_t time; - return CTxMemPool::lookup(hash, result, time); -} - -bool CTxMemPool::lookupFeeRate(const uint256& hash, CFeeRate& feeRate) const +std::shared_ptr CTxMemPool::get(const uint256& hash) const { LOCK(cs); indexed_transaction_set::const_iterator i = mapTx.find(hash); if (i == mapTx.end()) - return false; - feeRate = CFeeRate(i->GetFee(), i->GetTxSize()); - return true; + return nullptr; + return i->GetSharedTx(); +} + +TxMempoolInfo CTxMemPool::info(const uint256& hash) const +{ + LOCK(cs); + indexed_transaction_set::const_iterator i = mapTx.find(hash); + if (i == mapTx.end()) + return TxMempoolInfo(); + return TxMempoolInfo{i->GetSharedTx(), i->GetTime(), CFeeRate(i->GetFee(), i->GetTxSize())}; } CFeeRate CTxMemPool::estimateFee(int nBlocks) const @@ -1060,10 +1086,10 @@ bool CCoinsViewMemPool::GetCoin(const COutPoint &outpoint, Coin &coin) const { // If an entry in the mempool exists, always return that one, as it's guaranteed to never // conflict with the underlying cache, and it cannot have pruned entries (as it contains full) // transactions. First checking the underlying cache risks returning a pruned entry instead. - CTransaction tx; - if (mempool.lookup(outpoint.hash, tx)) { - if (outpoint.n < tx.vout.size()) { - coin = Coin(tx.vout[outpoint.n], MEMPOOL_HEIGHT, false); + shared_ptr ptx = mempool.get(outpoint.hash); + if (ptx) { + if (outpoint.n < ptx->vout.size()) { + coin = Coin(ptx->vout[outpoint.n], MEMPOOL_HEIGHT, false); return true; } else { return false; diff --git a/src/txmempool.h b/src/txmempool.h index 4c7dee01b..82c50a65c 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -7,6 +7,7 @@ #define BITCOIN_TXMEMPOOL_H #include +#include #include #include "addressindex.h" @@ -78,7 +79,7 @@ class CTxMemPool; class CTxMemPoolEntry { private: - CTransaction tx; + std::shared_ptr tx; CAmount nFee; //!< Cached to avoid expensive parent-transaction lookups size_t nTxSize; //!< ... and avoid recomputing tx size size_t nModSize; //!< ... and modified size for priority @@ -115,7 +116,8 @@ public: unsigned int nSigOps, LockPoints lp); CTxMemPoolEntry(const CTxMemPoolEntry& other); - const CTransaction& GetTx() const { return this->tx; } + const CTransaction& GetTx() const { return *this->tx; } + std::shared_ptr GetSharedTx() const { return this->tx; } /** * Fast calculation of lower bound of current priority as update * from entry priority. Only inputs that were originally in-chain will age. @@ -310,6 +312,21 @@ struct ancestor_score {}; class CBlockPolicyEstimator; +/** + * Information about a mempool transaction. + */ +struct TxMempoolInfo +{ + /** The transaction itself */ + std::shared_ptr tx; + + /** Time the transaction entered the mempool. */ + int64_t nTime; + + /** Feerate of the transaction. */ + CFeeRate feeRate; +}; + class SaltedTxidHasher { private: @@ -493,6 +510,8 @@ private: void UpdateParent(txiter entry, txiter parent, bool add); void UpdateChild(txiter entry, txiter child, bool add); + std::vector GetSortedDepthAndScore() const; + public: indirectmap mapNextTx; std::map > mapDeltas; @@ -634,9 +653,9 @@ public: return (it != mapTx.end() && outpoint.n < it->GetTx().vout.size()); } - bool lookup(uint256 hash, CTransaction& result) const; - bool lookup(uint256 hash, CTransaction& result, int64_t& time) const; - bool lookupFeeRate(const uint256& hash, CFeeRate& feeRate) const; + std::shared_ptr get(const uint256& hash) const; + TxMempoolInfo info(const uint256& hash) const; + std::vector infoAll() const; /** Estimate fee rate needed to get into the next nBlocks * If no answer can be given at nBlocks, return an estimate diff --git a/src/validation.cpp b/src/validation.cpp index 0a6ce741b..825a54c7b 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1087,8 +1087,10 @@ bool GetTransaction(const uint256 &hash, CTransaction &txOut, const Consensus::P LOCK(cs_main); - if (mempool.lookup(hash, txOut)) + std::shared_ptr ptx = mempool.get(hash); + if (ptx) { + txOut = *ptx; return true; }