mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
merge bitcoin#18038: Mempool tracks locally submitted transactions to improve wallet privacy
This commit is contained in:
parent
bb4be52b48
commit
6390cae926
@ -905,6 +905,19 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) {
|
||||
PushNodeVersion(pnode, connman, GetTime());
|
||||
}
|
||||
|
||||
void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const
|
||||
{
|
||||
std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs();
|
||||
|
||||
for (const uint256& txid : unbroadcast_txids) {
|
||||
RelayTransaction(txid, *connman);
|
||||
}
|
||||
|
||||
// schedule next run for 10-15 minutes in the future
|
||||
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
|
||||
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta.count());
|
||||
}
|
||||
|
||||
void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
|
||||
fUpdateConnectionTime = false;
|
||||
LOCK(cs_main);
|
||||
@ -1170,6 +1183,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS
|
||||
// timer.
|
||||
static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
|
||||
scheduler.scheduleEvery(std::bind(&PeerLogicValidation::CheckForStaleTipAndEvictPeers, this, consensusParams), EXTRA_PEER_CHECK_INTERVAL * 1000);
|
||||
|
||||
// schedule next run for 10-15 minutes in the future
|
||||
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
|
||||
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta.count());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1637,7 +1654,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c
|
||||
}
|
||||
}
|
||||
|
||||
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
|
||||
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
|
||||
{
|
||||
AssertLockNotHeld(cs_main);
|
||||
|
||||
@ -1690,6 +1707,13 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
}
|
||||
}
|
||||
|
||||
if (push) {
|
||||
// We interpret fulfilling a GETDATA for a transaction as a
|
||||
// successful initial broadcast and remove it from our
|
||||
// unbroadcast set.
|
||||
mempool.RemoveUnbroadcastTx(inv.hash);
|
||||
}
|
||||
|
||||
if (!push && inv.type == MSG_SPORK) {
|
||||
CSporkMessage spork;
|
||||
if (sporkManager.GetSporkByHash(inv.hash, spork)) {
|
||||
@ -1744,6 +1768,7 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
push = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!push && (inv.type == MSG_QUORUM_COMPLAINT)) {
|
||||
llmq::CDKGComplaint o;
|
||||
if (llmq::quorumDKGSessionManager->GetComplaint(inv.hash, o)) {
|
||||
@ -1751,6 +1776,7 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
push = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!push && (inv.type == MSG_QUORUM_JUSTIFICATION)) {
|
||||
llmq::CDKGJustification o;
|
||||
if (llmq::quorumDKGSessionManager->GetJustification(inv.hash, o)) {
|
||||
@ -1758,6 +1784,7 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
push = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!push && (inv.type == MSG_QUORUM_PREMATURE_COMMITMENT)) {
|
||||
llmq::CDKGPrematureCommitment o;
|
||||
if (llmq::quorumDKGSessionManager->GetPrematureCommitment(inv.hash, o)) {
|
||||
@ -1765,6 +1792,7 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
push = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!push && (inv.type == MSG_QUORUM_RECOVERED_SIG)) {
|
||||
llmq::CRecoveredSig o;
|
||||
if (llmq::quorumSigningManager->GetRecoveredSigForGetData(inv.hash, o)) {
|
||||
@ -1790,9 +1818,10 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
||||
}
|
||||
}
|
||||
|
||||
if (!push)
|
||||
if (!push) {
|
||||
vNotFound.push_back(inv);
|
||||
}
|
||||
}
|
||||
} // release cs_main
|
||||
|
||||
if (it != pfrom->vRecvGetData.end() && !pfrom->fPauseSend) {
|
||||
|
@ -76,6 +76,8 @@ public:
|
||||
void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams);
|
||||
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
|
||||
void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
|
||||
void ReattemptInitialBroadcast(CScheduler& scheduler) const;
|
||||
|
||||
private:
|
||||
int64_t m_stale_tip_check_time; //!< Next time to check for stale tip
|
||||
|
@ -78,6 +78,10 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
||||
}
|
||||
|
||||
if (relay) {
|
||||
// the mempool tracks locally submitted transactions to make a
|
||||
// best-effort of initial broadcast
|
||||
node.mempool->AddUnbroadcastTx(hashTx);
|
||||
|
||||
RelayTransaction(hashTx, *node.connman);
|
||||
}
|
||||
|
||||
|
@ -549,6 +549,11 @@ std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max)
|
||||
return std::chrono::microseconds{GetRand(duration_max.count())};
|
||||
}
|
||||
|
||||
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept
|
||||
{
|
||||
return std::chrono::milliseconds{GetRand(duration_max.count())};
|
||||
}
|
||||
|
||||
void GetRandBytes(unsigned char* buf, int num) noexcept { ProcRand(buf, num, RNGLevel::FAST); }
|
||||
void GetStrongRandBytes(unsigned char* buf, int num) noexcept { ProcRand(buf, num, RNGLevel::SLOW); }
|
||||
void RandAddPeriodic() noexcept { ProcRand(nullptr, 0, RNGLevel::PERIODIC); }
|
||||
|
@ -69,6 +69,7 @@
|
||||
void GetRandBytes(unsigned char* buf, int num) noexcept;
|
||||
uint64_t GetRand(uint64_t nMax) noexcept;
|
||||
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;
|
||||
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept;
|
||||
int GetRandInt(int nMax) noexcept;
|
||||
uint256 GetRandHash() noexcept;
|
||||
|
||||
|
@ -624,6 +624,8 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
|
||||
for (const CTxIn& txin : it->GetTx().vin)
|
||||
mapNextTx.erase(txin.prevout);
|
||||
|
||||
RemoveUnbroadcastTx(hash, true /* add logging because unchecked */ );
|
||||
|
||||
if (vTxHashes.size() > 1) {
|
||||
vTxHashes[it->vTxHashesIdx] = std::move(vTxHashes.back());
|
||||
vTxHashes[it->vTxHashesIdx].second->vTxHashesIdx = it->vTxHashesIdx;
|
||||
@ -1427,6 +1429,15 @@ size_t CTxMemPool::DynamicMemoryUsage() const {
|
||||
return memusage::MallocUsage(sizeof(CTxMemPoolEntry) + 12 * sizeof(void*)) * mapTx.size() + memusage::DynamicUsage(mapNextTx) + memusage::DynamicUsage(mapDeltas) + memusage::DynamicUsage(mapLinks) + memusage::DynamicUsage(vTxHashes) + cachedInnerUsage;
|
||||
}
|
||||
|
||||
void CTxMemPool::RemoveUnbroadcastTx(const uint256& txid, const bool unchecked) {
|
||||
LOCK(cs);
|
||||
|
||||
if (m_unbroadcast_txids.erase(txid))
|
||||
{
|
||||
LogPrint(BCLog::MEMPOOL, "Removed %i from set of unbroadcast txns%s\n", txid.GetHex(), (unchecked ? " before confirmation that txn was sent out" : ""));
|
||||
}
|
||||
}
|
||||
|
||||
void CTxMemPool::RemoveStaged(setEntries &stage, bool updateDescendants, MemPoolRemovalReason reason) {
|
||||
AssertLockHeld(cs);
|
||||
UpdateForRemoveFromMempool(stage, updateDescendants);
|
||||
|
@ -581,6 +581,9 @@ private:
|
||||
|
||||
std::vector<indexed_transaction_set::const_iterator> GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs);
|
||||
|
||||
/** track locally submitted transactions to periodically retry initial broadcast */
|
||||
std::set<uint256> m_unbroadcast_txids GUARDED_BY(cs);
|
||||
|
||||
public:
|
||||
indirectmap<COutPoint, const CTransaction*> mapNextTx GUARDED_BY(cs);
|
||||
std::map<uint256, CAmount> mapDeltas;
|
||||
@ -750,6 +753,21 @@ public:
|
||||
boost::signals2::signal<void (CTransactionRef)> NotifyEntryAdded;
|
||||
boost::signals2::signal<void (CTransactionRef, MemPoolRemovalReason)> NotifyEntryRemoved;
|
||||
|
||||
/** Adds a transaction to the unbroadcast set */
|
||||
void AddUnbroadcastTx(const uint256& txid) {
|
||||
LOCK(cs);
|
||||
m_unbroadcast_txids.insert(txid);
|
||||
}
|
||||
|
||||
/** Removes a transaction from the unbroadcast set */
|
||||
void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false);
|
||||
|
||||
/** Returns transactions in unbroadcast set */
|
||||
const std::set<uint256> GetUnbroadcastTxs() const {
|
||||
LOCK(cs);
|
||||
return m_unbroadcast_txids;
|
||||
}
|
||||
|
||||
private:
|
||||
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
|
||||
* the descendants for a single transaction that has been added to the
|
||||
|
@ -5296,6 +5296,7 @@ bool LoadMempool(CTxMemPool& pool)
|
||||
int64_t expired = 0;
|
||||
int64_t failed = 0;
|
||||
int64_t already_there = 0;
|
||||
int64_t unbroadcast = 0;
|
||||
int64_t nNow = GetTime();
|
||||
|
||||
try {
|
||||
@ -5349,12 +5350,21 @@ bool LoadMempool(CTxMemPool& pool)
|
||||
for (const auto& i : mapDeltas) {
|
||||
pool.PrioritiseTransaction(i.first, i.second);
|
||||
}
|
||||
|
||||
std::set<uint256> unbroadcast_txids;
|
||||
file >> unbroadcast_txids;
|
||||
unbroadcast = unbroadcast_txids.size();
|
||||
|
||||
for (const auto& txid : unbroadcast_txids) {
|
||||
pool.AddUnbroadcastTx(txid);
|
||||
}
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there);
|
||||
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -5364,6 +5374,7 @@ bool DumpMempool(const CTxMemPool& pool)
|
||||
|
||||
std::map<uint256, CAmount> mapDeltas;
|
||||
std::vector<TxMempoolInfo> vinfo;
|
||||
std::set<uint256> unbroadcast_txids;
|
||||
|
||||
static Mutex dump_mutex;
|
||||
LOCK(dump_mutex);
|
||||
@ -5374,6 +5385,7 @@ bool DumpMempool(const CTxMemPool& pool)
|
||||
mapDeltas[i.first] = i.second;
|
||||
}
|
||||
vinfo = pool.infoAll();
|
||||
unbroadcast_txids = pool.GetUnbroadcastTxs();
|
||||
}
|
||||
|
||||
int64_t mid = GetTimeMicros();
|
||||
@ -5398,6 +5410,10 @@ bool DumpMempool(const CTxMemPool& pool)
|
||||
}
|
||||
|
||||
file << mapDeltas;
|
||||
|
||||
LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size());
|
||||
file << unbroadcast_txids;
|
||||
|
||||
if (!FileCommit(file.Get()))
|
||||
throw std::runtime_error("FileCommit failed");
|
||||
file.fclose();
|
||||
|
@ -2853,7 +2853,8 @@ void CWallet::ResendWalletTransactions()
|
||||
// that these are our transactions.
|
||||
if (GetTime() < nNextResend || !fBroadcastTransactions) return;
|
||||
bool fFirst = (nNextResend == 0);
|
||||
nNextResend = GetTime() + GetRand(30 * 60);
|
||||
// resend 12-36 hours from now, ~1 day on average.
|
||||
nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60);
|
||||
if (fFirst) return;
|
||||
|
||||
// Only do it if there's been a new block since last time
|
||||
|
@ -39,7 +39,8 @@ from decimal import Decimal
|
||||
import os
|
||||
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal, assert_raises_rpc_error, wait_until
|
||||
# from test_framework.mininode import P2PTxInvStore
|
||||
from test_framework.util import assert_equal, assert_raises_rpc_error, connect_nodes, disconnect_nodes, wait_until
|
||||
|
||||
|
||||
class MempoolPersistTest(BitcoinTestFramework):
|
||||
@ -75,6 +76,11 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
fees = self.nodes[0].getmempoolentry(txid=last_txid)['fees']
|
||||
assert_equal(fees['base'] + Decimal('0.00001000'), fees['modified'])
|
||||
|
||||
# disconnect nodes & make a txn that remains in the unbroadcast set.
|
||||
disconnect_nodes(self.nodes[0], 2)
|
||||
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
|
||||
connect_nodes(self.nodes[0], 2)
|
||||
|
||||
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
|
||||
self.stop_nodes()
|
||||
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
|
||||
@ -84,7 +90,7 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
self.start_node(2)
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"], timeout=1)
|
||||
wait_until(lambda: self.nodes[2].getmempoolinfo()["loaded"], timeout=1)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 6)
|
||||
assert_equal(len(self.nodes[2].getrawmempool()), 5)
|
||||
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 0)
|
||||
@ -97,9 +103,10 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
|
||||
assert_equal(node2_balance, self.nodes[2].getbalance())
|
||||
|
||||
# start node0 with wallet disabled so wallet transactions don't get resubmitted
|
||||
self.log.debug("Stop-start node0 with -persistmempool=0. Verify that it doesn't load its mempool.dat file.")
|
||||
self.stop_nodes()
|
||||
self.start_node(0, extra_args=["-persistmempool=0"])
|
||||
self.start_node(0, extra_args=["-persistmempool=0", "-disablewallet"])
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 0)
|
||||
|
||||
@ -107,7 +114,7 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
self.stop_nodes()
|
||||
self.start_node(0)
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 6)
|
||||
|
||||
mempooldat0 = os.path.join(self.nodes[0].datadir, self.chain, 'mempool.dat')
|
||||
mempooldat1 = os.path.join(self.nodes[1].datadir, self.chain, 'mempool.dat')
|
||||
@ -116,12 +123,12 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
self.nodes[0].savemempool()
|
||||
assert os.path.isfile(mempooldat0)
|
||||
|
||||
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 5 transactions")
|
||||
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 6 transactions")
|
||||
os.rename(mempooldat0, mempooldat1)
|
||||
self.stop_nodes()
|
||||
self.start_node(1, extra_args=[])
|
||||
wait_until(lambda: self.nodes[1].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 6)
|
||||
|
||||
self.log.debug("Prevent dashd from writing mempool.dat to disk. Verify that `savemempool` fails")
|
||||
# to test the exception we are creating a tmp folder called mempool.dat.new
|
||||
@ -131,6 +138,27 @@ class MempoolPersistTest(BitcoinTestFramework):
|
||||
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
|
||||
os.rmdir(mempooldotnew1)
|
||||
|
||||
self.test_persist_unbroadcast()
|
||||
|
||||
def test_persist_unbroadcast(self):
|
||||
node0 = self.nodes[0]
|
||||
self.start_node(0)
|
||||
|
||||
# clear out mempool
|
||||
node0.generate(1)
|
||||
|
||||
# disconnect nodes to make a txn that remains in the unbroadcast set.
|
||||
disconnect_nodes(node0, 1)
|
||||
node0.sendtoaddress(self.nodes[1].getnewaddress(), Decimal("12"))
|
||||
|
||||
# shutdown, then startup with wallet disabled
|
||||
self.stop_nodes()
|
||||
self.start_node(0, extra_args=["-disablewallet"])
|
||||
|
||||
# check that txn gets broadcast due to unbroadcast logic
|
||||
# conn = node0.add_p2p_connection(P2PTxInvStore())
|
||||
# node0.mockscheduler(16*60) # 15 min + 1 for buffer
|
||||
# wait_until(lambda: len(conn.get_invs()) == 1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
MempoolPersistTest().main()
|
||||
|
99
test/functional/mempool_unbroadcast.py
Executable file
99
test/functional/mempool_unbroadcast.py
Executable file
@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2017-2020 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test that the mempool ensures transaction delivery by periodically sending
|
||||
to peers until a GETDATA is received."""
|
||||
|
||||
import time
|
||||
|
||||
from test_framework.mininode import P2PTxInvStore
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import (
|
||||
assert_equal,
|
||||
connect_nodes,
|
||||
create_confirmed_utxos,
|
||||
disconnect_nodes,
|
||||
)
|
||||
|
||||
|
||||
class MempoolUnbroadcastTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 2
|
||||
|
||||
def skip_test_if_missing_module(self):
|
||||
self.skip_if_no_wallet()
|
||||
|
||||
def run_test(self):
|
||||
self.test_broadcast()
|
||||
self.test_txn_removal()
|
||||
|
||||
def test_broadcast(self):
|
||||
self.log.info("Test that mempool reattempts delivery of locally submitted transaction")
|
||||
node = self.nodes[0]
|
||||
|
||||
min_relay_fee = node.getnetworkinfo()["relayfee"]
|
||||
utxos = create_confirmed_utxos(min_relay_fee, node, 10)
|
||||
|
||||
disconnect_nodes(node, 1)
|
||||
|
||||
self.log.info("Generate transactions that only node 0 knows about")
|
||||
|
||||
# generate a wallet txn
|
||||
addr = node.getnewaddress()
|
||||
wallet_tx_hsh = node.sendtoaddress(addr, 0.0001)
|
||||
|
||||
# generate a txn using sendrawtransaction
|
||||
us0 = utxos.pop()
|
||||
inputs = [{"txid": us0["txid"], "vout": us0["vout"]}]
|
||||
outputs = {addr: 0.0001}
|
||||
tx = node.createrawtransaction(inputs, outputs)
|
||||
node.settxfee(min_relay_fee)
|
||||
txF = node.fundrawtransaction(tx)
|
||||
txFS = node.signrawtransactionwithwallet(txF["hex"])
|
||||
rpc_tx_hsh = node.sendrawtransaction(txFS["hex"])
|
||||
|
||||
# check that second node doesn't have these two txns
|
||||
mempool = self.nodes[1].getrawmempool()
|
||||
assert rpc_tx_hsh not in mempool
|
||||
assert wallet_tx_hsh not in mempool
|
||||
|
||||
# ensure that unbroadcast txs are persisted to mempool.dat
|
||||
self.restart_node(0)
|
||||
|
||||
self.log.info("Reconnect nodes & check if they are sent to node 1")
|
||||
connect_nodes(node, 1)
|
||||
|
||||
# fast forward into the future & ensure that the second node has the txns
|
||||
node.mockscheduler(15 * 60) # 15 min in seconds
|
||||
self.sync_mempools(timeout=30)
|
||||
mempool = self.nodes[1].getrawmempool()
|
||||
assert rpc_tx_hsh in mempool
|
||||
assert wallet_tx_hsh in mempool
|
||||
|
||||
self.log.info("Add another connection & ensure transactions aren't broadcast again")
|
||||
|
||||
conn = node.add_p2p_connection(P2PTxInvStore())
|
||||
node.mockscheduler(15 * 60)
|
||||
time.sleep(5)
|
||||
assert_equal(len(conn.get_invs()), 0)
|
||||
|
||||
def test_txn_removal(self):
|
||||
self.log.info("Test that transactions removed from mempool are removed from unbroadcast set")
|
||||
node = self.nodes[0]
|
||||
disconnect_nodes(node, 1)
|
||||
node.disconnect_p2ps
|
||||
|
||||
# since the node doesn't have any connections, it will not receive
|
||||
# any GETDATAs & thus the transaction will remain in the unbroadcast set.
|
||||
addr = node.getnewaddress()
|
||||
txhsh = node.sendtoaddress(addr, 0.0001)
|
||||
|
||||
# check transaction was removed from unbroadcast set due to presence in
|
||||
# a block
|
||||
removal_reason = "Removed {} from set of unbroadcast txns before confirmation that txn was sent out".format(txhsh)
|
||||
with node.assert_debug_log([removal_reason]):
|
||||
node.generate(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
MempoolUnbroadcastTest().main()
|
@ -13,6 +13,8 @@ P2PConnection: A low-level connection object to a node's P2P interface
|
||||
P2PInterface: A high-level interface object for communicating to a node over P2P
|
||||
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
|
||||
and can respond correctly to getdata and getheaders messages
|
||||
P2PTxInvStore: A p2p interface class that inherits from P2PDataStore, and keeps
|
||||
a count of how many times each txid has been announced.
|
||||
"""
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
@ -707,3 +709,20 @@ class P2PDataStore(P2PInterface):
|
||||
# Check that none of the txs are now in the mempool
|
||||
for tx in txs:
|
||||
assert tx.hash not in raw_mempool, "{} tx found in mempool".format(tx.hash)
|
||||
|
||||
class P2PTxInvStore(P2PInterface):
|
||||
"""A P2PInterface which stores a count of how many times each txid has been announced."""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.tx_invs_received = defaultdict(int)
|
||||
|
||||
def on_inv(self, message):
|
||||
# Store how many times invs have been received for each tx.
|
||||
for i in message.inv:
|
||||
if i.type == MSG_TX:
|
||||
# save txid
|
||||
self.tx_invs_received[i.hash] += 1
|
||||
|
||||
def get_invs(self):
|
||||
with mininode_lock:
|
||||
return list(self.tx_invs_received.keys())
|
||||
|
@ -235,6 +235,7 @@ BASE_SCRIPTS = [
|
||||
'p2p_blockfilters.py',
|
||||
'feature_asmap.py',
|
||||
'feature_includeconf.py',
|
||||
'mempool_unbroadcast.py',
|
||||
'rpc_deriveaddresses.py',
|
||||
'rpc_deriveaddresses.py --usecli',
|
||||
'rpc_scantxoutset.py',
|
||||
|
@ -3,29 +3,14 @@
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test that the wallet resends transactions periodically."""
|
||||
from collections import defaultdict
|
||||
import time
|
||||
|
||||
from test_framework.blocktools import create_block, create_coinbase
|
||||
from test_framework.messages import ToHex
|
||||
from test_framework.mininode import P2PInterface, mininode_lock
|
||||
from test_framework.mininode import P2PTxInvStore, mininode_lock
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal, wait_until
|
||||
|
||||
|
||||
class P2PStoreTxInvs(P2PInterface):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.tx_invs_received = defaultdict(int)
|
||||
|
||||
def on_inv(self, message):
|
||||
# Store how many times invs have been received for each tx.
|
||||
for i in message.inv:
|
||||
if i.type == 1:
|
||||
# save txid
|
||||
self.tx_invs_received[i.hash] += 1
|
||||
|
||||
|
||||
class ResendWalletTransactionsTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 1
|
||||
@ -36,7 +21,7 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
||||
def run_test(self):
|
||||
node = self.nodes[0] # alias
|
||||
|
||||
node.add_p2p_connection(P2PStoreTxInvs())
|
||||
node.add_p2p_connection(P2PTxInvStore())
|
||||
|
||||
self.log.info("Create a new transaction and wait until it's broadcast")
|
||||
txid = int(node.sendtoaddress(node.getnewaddress(), 1), 16)
|
||||
@ -54,7 +39,7 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
||||
wait_until(wait_p2p, lock=mininode_lock)
|
||||
|
||||
# Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown)
|
||||
node.add_p2p_connection(P2PStoreTxInvs())
|
||||
node.add_p2p_connection(P2PTxInvStore())
|
||||
|
||||
self.log.info("Create a block")
|
||||
# Create and submit a block without the transaction.
|
||||
@ -72,9 +57,10 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
||||
node.p2ps[1].sync_with_ping()
|
||||
assert_equal(node.p2ps[1].tx_invs_received[txid], 0)
|
||||
|
||||
self.log.info("Transaction should be rebroadcast after 30 minutes")
|
||||
# Use mocktime and give an extra 5 minutes to be sure.
|
||||
rebroadcast_time = self.mocktime + 41 * 60
|
||||
self.log.info("Bump time & check that transaction is rebroadcast")
|
||||
# Transaction should be rebroadcast approximately 24 hours in the future,
|
||||
# but can range from 12-36. So bump 36 hours to be sure.
|
||||
rebroadcast_time = self.mocktime + 36 * 60 * 60
|
||||
node.setmocktime(rebroadcast_time)
|
||||
self.mocktime = rebroadcast_time
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user