diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 9221f000b0..807b2d680e 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -11,7 +11,8 @@ -zmqpubrawtx=tcp://127.0.0.1:28332 \ -zmqpubrawblock=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubhashblock=tcp://127.0.0.1:28332 + -zmqpubhashblock=tcp://127.0.0.1:28332 \ + -zmqpubsequence=tcp://127.0.0.1:28332 We use the asyncio library here. `self.handle()` installs itself as a future at the end of the function. Since it never returns with the event @@ -58,18 +59,14 @@ class ZMQHandler(): self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence") self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) async def handle(self) : - msg = await self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] + topic, body, seq = await self.zmqSubSocket.recv_multipart() sequence = "Unknown" - - if len(msg[-1]) == 4: - msgSequence = struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + +Where the 8-byte uints correspond to the mempool sequence number. These options can also be provided in dash.conf. @@ -154,13 +164,20 @@ No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. -Note that when the block chain tip changes, a reorganisation may occur -and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. Also note -that no notification occurs if the tip was in the active chain - this -is the case after calling invalidateblock RPC. +Note that for `*block` topics, when the block chain tip changes, +a reorganisation may occur and just the tip will be notified. +It is up to the subscriber to retrieve the chain from the last known +block to the new tip. Also note that no notification will occur if the tip +was in the active chain--as would be the case after calling invalidateblock RPC. +In contrast, the `sequence` topic publishes all block connections and +disconnections. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Dashd appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. + +The `sequence` topic refers specifically to the mempool sequence +number, which is also published along with all mempool events. This +is a different sequence value than in ZMQ itself in order to allow a total +ordering of mempool events to be constructed. diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index 3a30fec2c0..3b8a8e0710 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -103,7 +103,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con } } -void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, + uint64_t mempool_sequence) { assert(m_cj_ctx && m_llmq_ctx); @@ -112,7 +113,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); } -void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) +void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) { assert(m_llmq_ctx); diff --git a/src/dsnotificationinterface.h b/src/dsnotificationinterface.h index dc4456de40..c98f913880 100644 --- a/src/dsnotificationinterface.h +++ b/src/dsnotificationinterface.h @@ -40,8 +40,9 @@ protected: void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override; void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; - void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override; diff --git a/src/init.cpp b/src/init.cpp index eeb9514f08..42042ab4d5 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -645,6 +645,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlock=
", "Enable publish raw transaction (locked via InstantSend) in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksig=
", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequence=
", "Enable publish hash block and tx sequence in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashchainlockhwm=", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashgovernanceobjecthwm=", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); @@ -663,6 +664,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlockhwm=", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksighwm=", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashchainlock=
"); @@ -682,6 +684,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtx=
"); hidden_args.emplace_back("-zmqpubrawtxlock=
"); hidden_args.emplace_back("-zmqpubrawtxlocksig=
"); + hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashchainlockhwm="); hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm="); @@ -700,6 +703,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtxhwm="); hidden_args.emplace_back("-zmqpubrawtxlockhwm="); hidden_args.emplace_back("-zmqpubrawtxlocksighwm="); + hidden_args.emplace_back("-zmqpubsequencehwm="); #endif argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index dcc399a29d..0e8f302dfc 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -258,8 +258,8 @@ public: { public: virtual ~Notifications() {} - virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {} - virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {} + virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} virtual void blockConnected(const CBlock& block, int height) {} virtual void blockDisconnected(const CBlock& block, int height) {} virtual void updatedBlockTip() {} diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index eba23b0195..e2d19d3ff3 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -611,13 +611,13 @@ public: explicit NotificationsProxy(std::shared_ptr notifications) : m_notifications(std::move(notifications)) {} virtual ~NotificationsProxy() = default; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override { - m_notifications->transactionAddedToMempool(tx, nAcceptTime); + m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override { - m_notifications->transactionRemovedFromMempool(tx, reason); + m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence); } void BlockConnected(const std::shared_ptr& block, const CBlockIndex* index) override { @@ -997,7 +997,7 @@ public: if (!m_node.mempool) return; LOCK2(::cs_main, m_node.mempool->cs); for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) { - notifications.transactionAddedToMempool(entry.GetSharedTx(), 0); + notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0); } } NodeContext& m_node; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index cd0fd147ee..b8592153a6 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -532,9 +532,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); } -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose) +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose, bool include_mempool_sequence) { if (verbose) { + if (include_mempool_sequence) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values."); + } LOCK(pool.cs); UniValue o(UniValue::VOBJ); for (const CTxMemPoolEntry& e : pool.mapTx) { @@ -548,14 +551,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, } return o; } else { + uint64_t mempool_sequence; std::vector vtxid; - pool.queryHashes(vtxid); - + { + LOCK(pool.cs); + pool.queryHashes(vtxid); + mempool_sequence = pool.GetSequence(); + } UniValue a(UniValue::VARR); for (const uint256& hash : vtxid) a.push_back(hash.ToString()); - return a; + if (!include_mempool_sequence) { + return a; + } else { + UniValue o(UniValue::VOBJ); + o.pushKV("txids", a); + o.pushKV("mempool_sequence", mempool_sequence); + return o; + } } } @@ -566,6 +580,7 @@ static RPCHelpMan getrawmempool() "\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n", { {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, + {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."}, }, { RPCResult{"for verbose = false", @@ -578,6 +593,15 @@ static RPCHelpMan getrawmempool() { {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, }}, + RPCResult{"for verbose = false and mempool_sequence = true", + RPCResult::Type::OBJ, "", "", + { + {RPCResult::Type::ARR, "txids", "", + { + {RPCResult::Type::STR_HEX, "", "The transaction id"}, + }}, + {RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."}, + }}, }, RPCExamples{ HelpExampleCli("getrawmempool", "true") @@ -589,10 +613,15 @@ static RPCHelpMan getrawmempool() if (!request.params[0].isNull()) fVerbose = request.params[0].get_bool(); + bool include_mempool_sequence = false; + if (!request.params[1].isNull()) { + include_mempool_sequence = request.params[1].get_bool(); + } + const NodeContext& node = EnsureAnyNodeContext(request.context); const CTxMemPool& mempool = EnsureMemPool(node); LLMQContext& llmq_ctx = EnsureLLMQContext(node); - return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose); + return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose, include_mempool_sequence); }, }; } diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h index f8fc13d081..b67055bb66 100644 --- a/src/rpc/blockchain.h +++ b/src/rpc/blockchain.h @@ -47,7 +47,7 @@ UniValue blockToJSON(BlockManager& blockman, const CBlock& block, const CBlockIn UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman); /** Mempool to JSON */ -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false); +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false, bool include_mempool_sequence = false); /** Block header to JSON */ UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index b7b2e79604..be89fea835 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -168,6 +168,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "pruneblockchain", 0, "height" }, { "keypoolrefill", 0, "newsize" }, { "getrawmempool", 0, "verbose" }, + { "getrawmempool", 1, "mempool_sequence" }, { "estimatesmartfee", 0, "conf_target" }, { "estimaterawfee", 0, "conf_target" }, { "estimaterawfee", 1, "threshold" }, diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp index a11c258b1a..cbdffe29d8 100644 --- a/src/test/fuzz/tx_pool.cpp +++ b/src/test/fuzz/tx_pool.cpp @@ -51,12 +51,12 @@ struct TransactionsDelta final : public CValidationInterface { explicit TransactionsDelta(std::set& r, std::set& a) : m_removed{r}, m_added{a} {} - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */, uint64_t) override { Assert(m_added.insert(tx).second); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t) override { Assert(m_removed.insert(tx).second); } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 1f125a4c17..4f771dac8a 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -607,12 +607,16 @@ void CTxMemPool::addUncheckedProTx(indexed_transaction_set::iterator& newit, con void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) { + // We increment mempool sequence value no matter removal reason + // even if not directly reported below. + uint64_t mempool_sequence = GetAndIncrementSequence(); + if (reason != MemPoolRemovalReason::BLOCK) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason); + GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } const uint256 hash = it->GetTx().GetHash(); diff --git a/src/txmempool.h b/src/txmempool.h index 12efa08b51..8581e2122b 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -485,6 +485,11 @@ protected: mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially mutable Epoch m_epoch GUARDED_BY(cs); + // In-memory counter for external mempool tracking purposes. + // This number is incremented once every time a transaction + // is added or removed from the mempool for any reason. + mutable uint64_t m_sequence_number{1}; + void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs); bool m_is_loaded GUARDED_BY(cs){false}; @@ -810,6 +815,15 @@ public: return (m_unbroadcast_txids.count(txid) != 0); } + /** Guards this internal counter for external reporting */ + uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number++; + } + + uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/validation.cpp b/src/validation.cpp index b4f7c2c422..04fddb4cb3 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -889,7 +889,7 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state); const int64_t nAcceptTime = args.m_accept_time; - GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime); + GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime, m_pool.GetAndIncrementSequence()); const CTransaction& tx = *ptx; auto finish = Now(); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index ac8b19dcdc..e00a8bdb53 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -208,17 +208,17 @@ void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, cons m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); }); } -void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { - auto event = [tx, nAcceptTime, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime); }); +void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { + auto event = [tx, nAcceptTime, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { - auto event = [tx, reason, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); }); +void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { + auto event = [tx, reason, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); diff --git a/src/validationinterface.h b/src/validationinterface.h index c66b25a653..c860f7fb86 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -117,7 +117,8 @@ protected: * * Called on a background thread. */ - virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime) {} + virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime, uint64_t mempool_sequence) {} + /** * Notifies listeners of a transaction leaving mempool. * @@ -149,7 +150,7 @@ protected: * * Called on a background thread. */ - virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. @@ -226,8 +227,8 @@ public: void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); - void TransactionAddedToMempool(const CTransactionRef&, int64_t); - void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason); + void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence); + void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex); void BlockDisconnected(const std::shared_ptr &, const CBlockIndex* pindex); void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr& islock); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index f648fc2ced..4257cb6f57 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1267,7 +1267,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio fAnonymizableTallyCachedNonDenom = false; } -void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { +void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { LOCK(cs_wallet); CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0); WalletBatch batch(GetDatabase()); @@ -1279,7 +1279,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcce } } -void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { +void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { if (reason != MemPoolRemovalReason::CONFLICT) { LOCK(cs_wallet); auto it = mapWallet.find(tx->GetHash()); @@ -1330,7 +1330,7 @@ void CWallet::blockConnected(const CBlock& block, int height) WalletBatch batch(GetDatabase()); for (size_t index = 0; index < block.vtx.size(); index++) { SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch); - transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK); + transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */); } // reset cache to make sure no longer immature coins are included diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index a94217ee9d..46cfe3f1b0 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -1078,7 +1078,7 @@ public: CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true); bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); - void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; void blockConnected(const CBlock& block, int height) override; void blockDisconnected(const CBlock& block, int height) override; void updatedBlockTip() override; @@ -1100,7 +1100,7 @@ public: uint256 last_failed_block; }; ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional max_height, const WalletRescanReserver& reserver, bool fUpdate); - void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override; + void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ResendWalletTransactions(); struct Balance { diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 75d394326d..e3a1d98cfe 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -28,6 +28,26 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ return true; } +bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr& /*islock*/) { return true; diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index b2e0b10bf7..534082d64a 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -58,9 +58,19 @@ public: virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); - virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock); virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote); virtual bool NotifyGovernanceObject(const std::shared_ptr& object); diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 7539ca3114..eaf83097ea 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -50,6 +50,7 @@ std::unique_ptr CZMQNotificationInterface::Create() factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create; factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create; + factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto& entry : factories) @@ -157,31 +158,53 @@ void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const }); } -void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, uint64_t mempool_sequence) { - // Used by BlockConnected and BlockDisconnected as well, because they're - // all the same external callback. const CTransaction& tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { - return notifier->NotifyTransaction(tx); + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) +{ + // Called for all non-block inclusion reasons + const CTransaction& tx = *ptx; + + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block disconnection - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr& islock) diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 8cfc12de37..4dd078a45f 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,7 +26,8 @@ protected: void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index edbe879882..5b863d426c 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -46,6 +46,7 @@ static const char *MSG_RAWGVOTE = "rawgovernancevote"; static const char *MSG_RAWGOBJ = "rawgovernanceobject"; static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; static const char *MSG_RAWRECSIG = "rawrecoveredsig"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -351,6 +352,53 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'C'; // Block (C)onnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'D'; // Block (D)isconnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'R'; // Mempool (R)emoval + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index d674ef3891..7dc632d077 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -108,6 +108,15 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override; +}; + class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier { public: diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 35227ae70b..88ecc7186d 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -5,15 +5,26 @@ """Test the ZMQ notification interface.""" import struct -from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE +from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2SH_OP_TRUE +from test_framework.blocktools import create_block, create_coinbase from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import ( dashhash, hash256, + tx_from_hex, +) +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, ) -from test_framework.util import assert_equal from time import sleep +# Test may be skipped and not have zmq installed +try: + import zmq +except ImportError: + pass + def hash256_reversed(byte_str): return hash256(byte_str)[::-1] @@ -26,7 +37,6 @@ class ZMQSubscriber: self.socket = socket self.topic = topic - import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): @@ -38,6 +48,22 @@ class ZMQSubscriber: self.sequence += 1 return body + def receive_sequence(self): + topic, body, seq = self.socket.recv_multipart() + # Topic should match the subscriber topic. + assert_equal(topic, self.topic) + # Sequence should be incremental. + assert_equal(struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + """ + self.log.info("Testing 'sequence' publisher") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Mempool sequence number starts at 1 + seq_num = 1 + + # Generate 1 block in nodes[0] and receive all notifications + dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + + # Note: We are not notified of any block transactions, coinbase or mined + assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) + + # Generate 2 blocks in nodes[1] to a different address to ensure a chain split + self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE) + + # nodes[0] will reorg chain after connecting back nodes[1] + self.connect_nodes(0, 1) + + # Then we receive all block (dis)connect notifications for the 2 block reorg + assert_equal((dc_block, "D", None), seq.receive_sequence()) + block_count = self.nodes[1].getblockcount() + assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) + assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) + + # Rest of test requires wallet functionality + if self.is_wallet_compiled(): + self.log.info("Wait for tx from second node") + payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0) + self.sync_all() + self.log.info("Testing sequence notifications with mempool sequence values") + + # Should receive the broadcasted txid. + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Doesn't get published when mined, make a block and tx to "flush" the possibility + # though the mempool sequence number does go up by the number of transactions + # removed from the mempool by the block mining it. + mempool_size = len(self.nodes[0].getrawmempool()) + c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + self.sync_all() + # Make sure the number of mined transactions matches the number of txs out of mempool + mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) + assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) + seq_num += mempool_size_delta + payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) + self.sync_all() + assert_equal((c_block, "C", None), seq.receive_sequence()) + assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Spot check getrawmempool results that they only show up when asked for + assert type(self.nodes[0].getrawmempool()) is list + assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list + assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) + assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) + + self.log.info("Testing reorg notifications") + # Manually invalidate the last block to test mempool re-entry + # N.B. This part could be made more lenient in exact ordering + # since it greatly depends on inner-workings of blocks/mempool + # during "deep" re-orgs. Probably should "re-construct" + # blockchain/mempool state from notifications instead. + block_count = self.nodes[0].getblockcount() + best_hash = self.nodes[0].getbestblockhash() + self.nodes[0].invalidateblock(best_hash) + sleep(2) # Bit of room to make sure transaction things happened + + # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective + # of the time they were gathered. + assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num + + assert_equal((best_hash, "D", None), seq.receive_sequence()) + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Other things may happen but aren't wallet-deterministic so we don't test for them currently + self.nodes[0].reconsiderblock(best_hash) + self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() + + self.log.info("Evict mempool transaction by block conflict") + orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # More to be simply mined + more_tx = [] + for _ in range(5): + more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1)) + + raw_tx = self.nodes[0].getrawtransaction(orig_txid) + # Mine the tx + block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1)) + tx = tx_from_hex(raw_tx) + block.vtx.append(tx) + for txid in more_tx: + tx = tx_from_hex(self.nodes[0].getrawtransaction(txid)) + block.vtx.append(tx) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) + tip = self.nodes[0].getbestblockhash() + assert_equal(int(tip, 16), block.sha256) + orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # Flush old notifications until evicted tx original entry + (hash_str, label, mempool_seq) = seq.receive_sequence() + while hash_str != orig_txid: + (hash_str, label, mempool_seq) = seq.receive_sequence() + mempool_seq += 1 + + # Added original tx + assert_equal(label, "A") + # More transactions to be simply mined + for i in range(len(more_tx)): + assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + + mempool_seq += 1 + assert_equal((tip, "C", None), seq.receive_sequence()) + mempool_seq += len(more_tx) + # Last tx + assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() # want to make sure we didn't break "consensus" for other tests + + def test_mempool_sync(self): + """ + Use sequence notification plus getrawmempool sequence results to "sync mempool" + """ + if not self.is_wallet_compiled(): + self.log.info("Skipping mempool sync test") + return + + self.log.info("Testing 'mempool sync' usage of sequence notifier") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) + self.connect_nodes(0, 1) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # In-memory counter, should always start at 1 + next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] + assert_equal(next_mempool_seq, 1) + + # Some transactions have been happening but we aren't consuming zmq notifications yet + # or we lost a ZMQ message somehow and want to start over + txids = [] + num_txs = 5 + for _ in range(num_txs): + txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)) + self.sync_all() + + # 1) Consume backlog until we get a mempool sequence number + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + while zmq_mem_seq is None: + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + + assert label == "A" or label == "R" + assert hash_str is not None + + # 2) We need to "seed" our view of the mempool + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + assert_equal(get_raw_seq, 6) + # Snapshot may be too old compared to zmq message we read off latest + while zmq_mem_seq >= get_raw_seq: + sleep(2) + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + + # Things continue to happen in the "interim" while waiting for snapshot results + for _ in range(num_txs): + txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1)) + self.sync_all() + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1) + + # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot + while True: + if zmq_mem_seq == get_raw_seq - 1: + break + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + zmq_mem_seq = mempool_sequence + if zmq_mem_seq > get_raw_seq: + raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq)) + + # 4) Moving forward, we apply the delta to our local view + # remaining txs(5) + 1 block connect + 1 final tx + expected_sequence = get_raw_seq + r_gap = 0 + for _ in range(num_txs + 1 + 1): + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + if mempool_sequence != expected_sequence: + # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its + # position in the incoming block message "C" + if label == "R": + assert mempool_sequence > expected_sequence + r_gap += mempool_sequence - expected_sequence + else: + raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") + if label == "A": + assert hash_str not in mempool_view + mempool_view.add(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "R": + assert hash_str in mempool_view + mempool_view.remove(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "C": + # (Attempt to) remove all txids from known block connects + block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] + for txid in block_txids: + if txid in mempool_view: + expected_sequence += 1 + mempool_view.remove(txid) + expected_sequence -= r_gap + r_gap = 0 + elif label == "D": + # Not useful for mempool tracking per se + continue + else: + raise Exception("Unexpected ZMQ sequence label!") + + assert_equal(self.nodes[0].getrawmempool(), [final_txid]) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) + + # 5) If you miss a zmq/mempool sequence number, go back to step (2) + + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + def test_multiple_interfaces(self): - import zmq # Set up two subscribers with different addresses subscribers = [] for i in range(2):