diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 9221f000b0..807b2d680e 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -11,7 +11,8 @@ -zmqpubrawtx=tcp://127.0.0.1:28332 \ -zmqpubrawblock=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubhashblock=tcp://127.0.0.1:28332 + -zmqpubhashblock=tcp://127.0.0.1:28332 \ + -zmqpubsequence=tcp://127.0.0.1:28332 We use the asyncio library here. `self.handle()` installs itself as a future at the end of the function. Since it never returns with the event @@ -58,18 +59,14 @@ class ZMQHandler(): self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence") self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) async def handle(self) : - msg = await self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] + topic, body, seq = await self.zmqSubSocket.recv_multipart() sequence = "Unknown" - - if len(msg[-1]) == 4: - msgSequence = struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + +Where the 8-byte uints correspond to the mempool sequence number. + +`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages). + + | rawtx | | + +`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages). + + | hashtx | <32-byte transaction hash in Little Endian> | + + +`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages). + + | rawblock | | + +`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages). + + | hashblock | <32-byte block hash in Little Endian> | + +**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes. ZeroMQ endpoint specifiers for TCP (and others) are documented in the [ZeroMQ API](http://api.zeromq.org/4-0:_start). @@ -143,6 +173,9 @@ Setting the keepalive values appropriately for your operating environment may improve connectivity in situations where long-lived connections are silently dropped by network middle boxes. +Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6 +hosts as well. If needed, this option has to be set on the client side too. + ## Remarks From the perspective of dashd, the ZeroMQ socket is write-only; PUB @@ -154,13 +187,20 @@ No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. -Note that when the block chain tip changes, a reorganisation may occur -and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. Also note -that no notification occurs if the tip was in the active chain - this -is the case after calling invalidateblock RPC. +Note that for `*block` topics, when the block chain tip changes, +a reorganisation may occur and just the tip will be notified. +It is up to the subscriber to retrieve the chain from the last known +block to the new tip. Also note that no notification will occur if the tip +was in the active chain--as would be the case after calling invalidateblock RPC. +In contrast, the `sequence` topic publishes all block connections and +disconnections. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Dashd appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. + +The `sequence` topic refers specifically to the mempool sequence +number, which is also published along with all mempool events. This +is a different sequence value than in ZMQ itself in order to allow a total +ordering of mempool events to be constructed. diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index f948aa0148..880ada8a78 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -102,7 +102,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con } } -void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, + uint64_t mempool_sequence) { assert(m_cj_ctx && m_llmq_ctx); @@ -111,7 +112,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); } -void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) +void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) { assert(m_llmq_ctx); diff --git a/src/dsnotificationinterface.h b/src/dsnotificationinterface.h index dc4456de40..c98f913880 100644 --- a/src/dsnotificationinterface.h +++ b/src/dsnotificationinterface.h @@ -40,8 +40,9 @@ protected: void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override; void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; - void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override; diff --git a/src/init.cpp b/src/init.cpp index a8b518d46b..9c66b06fd5 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node) #if ENABLE_ZMQ if (g_zmq_notification_interface) { - UnregisterValidationInterface(g_zmq_notification_interface); - delete g_zmq_notification_interface; - g_zmq_notification_interface = nullptr; + UnregisterValidationInterface(g_zmq_notification_interface.get()); + g_zmq_notification_interface.reset(); } #endif @@ -646,6 +645,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlock=
", "Enable publish raw transaction (locked via InstantSend) in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksig=
", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequence=
", "Enable publish hash block and tx sequence in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashchainlockhwm=", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashgovernanceobjecthwm=", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); @@ -664,6 +664,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlockhwm=", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksighwm=", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashchainlock=
"); @@ -683,6 +684,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtx=
"); hidden_args.emplace_back("-zmqpubrawtxlock=
"); hidden_args.emplace_back("-zmqpubrawtxlocksig=
"); + hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashchainlockhwm="); hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm="); @@ -701,6 +703,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtxhwm="); hidden_args.emplace_back("-zmqpubrawtxlockhwm="); hidden_args.emplace_back("-zmqpubrawtxlocksighwm="); + hidden_args.emplace_back("-zmqpubsequencehwm="); #endif argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); @@ -1736,7 +1739,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) g_zmq_notification_interface = CZMQNotificationInterface::Create(); if (g_zmq_notification_interface) { - RegisterValidationInterface(g_zmq_notification_interface); + RegisterValidationInterface(g_zmq_notification_interface.get()); } #endif diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index d84688dcc0..37f434591d 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -258,8 +258,8 @@ public: { public: virtual ~Notifications() {} - virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {} - virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {} + virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} virtual void blockConnected(const CBlock& block, int height) {} virtual void blockDisconnected(const CBlock& block, int height) {} virtual void updatedBlockTip() {} diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index d453ff8975..d29e95a3e4 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -611,13 +611,13 @@ public: explicit NotificationsProxy(std::shared_ptr notifications) : m_notifications(std::move(notifications)) {} virtual ~NotificationsProxy() = default; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override { - m_notifications->transactionAddedToMempool(tx, nAcceptTime); + m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override { - m_notifications->transactionRemovedFromMempool(tx, reason); + m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence); } void BlockConnected(const std::shared_ptr& block, const CBlockIndex* index) override { @@ -997,7 +997,7 @@ public: if (!m_node.mempool) return; LOCK2(::cs_main, m_node.mempool->cs); for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) { - notifications.transactionAddedToMempool(entry.GetSharedTx(), 0); + notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0); } } NodeContext& m_node; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index cd0fd147ee..b8592153a6 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -532,9 +532,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); } -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose) +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose, bool include_mempool_sequence) { if (verbose) { + if (include_mempool_sequence) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values."); + } LOCK(pool.cs); UniValue o(UniValue::VOBJ); for (const CTxMemPoolEntry& e : pool.mapTx) { @@ -548,14 +551,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, } return o; } else { + uint64_t mempool_sequence; std::vector vtxid; - pool.queryHashes(vtxid); - + { + LOCK(pool.cs); + pool.queryHashes(vtxid); + mempool_sequence = pool.GetSequence(); + } UniValue a(UniValue::VARR); for (const uint256& hash : vtxid) a.push_back(hash.ToString()); - return a; + if (!include_mempool_sequence) { + return a; + } else { + UniValue o(UniValue::VOBJ); + o.pushKV("txids", a); + o.pushKV("mempool_sequence", mempool_sequence); + return o; + } } } @@ -566,6 +580,7 @@ static RPCHelpMan getrawmempool() "\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n", { {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, + {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."}, }, { RPCResult{"for verbose = false", @@ -578,6 +593,15 @@ static RPCHelpMan getrawmempool() { {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, }}, + RPCResult{"for verbose = false and mempool_sequence = true", + RPCResult::Type::OBJ, "", "", + { + {RPCResult::Type::ARR, "txids", "", + { + {RPCResult::Type::STR_HEX, "", "The transaction id"}, + }}, + {RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."}, + }}, }, RPCExamples{ HelpExampleCli("getrawmempool", "true") @@ -589,10 +613,15 @@ static RPCHelpMan getrawmempool() if (!request.params[0].isNull()) fVerbose = request.params[0].get_bool(); + bool include_mempool_sequence = false; + if (!request.params[1].isNull()) { + include_mempool_sequence = request.params[1].get_bool(); + } + const NodeContext& node = EnsureAnyNodeContext(request.context); const CTxMemPool& mempool = EnsureMemPool(node); LLMQContext& llmq_ctx = EnsureLLMQContext(node); - return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose); + return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose, include_mempool_sequence); }, }; } diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h index f8fc13d081..b67055bb66 100644 --- a/src/rpc/blockchain.h +++ b/src/rpc/blockchain.h @@ -47,7 +47,7 @@ UniValue blockToJSON(BlockManager& blockman, const CBlock& block, const CBlockIn UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman); /** Mempool to JSON */ -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false); +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false, bool include_mempool_sequence = false); /** Block header to JSON */ UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index b7b2e79604..be89fea835 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -168,6 +168,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "pruneblockchain", 0, "height" }, { "keypoolrefill", 0, "newsize" }, { "getrawmempool", 0, "verbose" }, + { "getrawmempool", 1, "mempool_sequence" }, { "estimatesmartfee", 0, "conf_target" }, { "estimaterawfee", 0, "conf_target" }, { "estimaterawfee", 1, "threshold" }, diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp index a11c258b1a..cbdffe29d8 100644 --- a/src/test/fuzz/tx_pool.cpp +++ b/src/test/fuzz/tx_pool.cpp @@ -51,12 +51,12 @@ struct TransactionsDelta final : public CValidationInterface { explicit TransactionsDelta(std::set& r, std::set& a) : m_removed{r}, m_added{a} {} - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */, uint64_t) override { Assert(m_added.insert(tx).second); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t) override { Assert(m_removed.insert(tx).second); } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 149d3362a4..823d096bc5 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -665,12 +665,16 @@ void CTxMemPool::addUncheckedProTx(indexed_transaction_set::iterator& newit, con void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) { + // We increment mempool sequence value no matter removal reason + // even if not directly reported below. + uint64_t mempool_sequence = GetAndIncrementSequence(); + if (reason != MemPoolRemovalReason::BLOCK) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason); + GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } const uint256 hash = it->GetTx().GetHash(); diff --git a/src/txmempool.h b/src/txmempool.h index 98b70d8547..e281ddaa4b 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -486,6 +486,11 @@ protected: mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially mutable Epoch m_epoch GUARDED_BY(cs); + // In-memory counter for external mempool tracking purposes. + // This number is incremented once every time a transaction + // is added or removed from the mempool for any reason. + mutable uint64_t m_sequence_number{1}; + void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs); bool m_is_loaded GUARDED_BY(cs){false}; @@ -852,6 +857,15 @@ public: return (m_unbroadcast_txids.count(txid) != 0); } + /** Guards this internal counter for external reporting */ + uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number++; + } + + uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/validation.cpp b/src/validation.cpp index 2082c90700..49c7ba77c1 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -889,7 +889,7 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state); const int64_t nAcceptTime = args.m_accept_time; - GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime); + GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime, m_pool.GetAndIncrementSequence()); const CTransaction& tx = *ptx; auto finish = Now(); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index ac8b19dcdc..e00a8bdb53 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -208,17 +208,17 @@ void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, cons m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); }); } -void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { - auto event = [tx, nAcceptTime, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime); }); +void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { + auto event = [tx, nAcceptTime, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { - auto event = [tx, reason, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); }); +void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { + auto event = [tx, reason, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); diff --git a/src/validationinterface.h b/src/validationinterface.h index c66b25a653..c860f7fb86 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -117,7 +117,8 @@ protected: * * Called on a background thread. */ - virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime) {} + virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime, uint64_t mempool_sequence) {} + /** * Notifies listeners of a transaction leaving mempool. * @@ -149,7 +150,7 @@ protected: * * Called on a background thread. */ - virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. @@ -226,8 +227,8 @@ public: void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); - void TransactionAddedToMempool(const CTransactionRef&, int64_t); - void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason); + void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence); + void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex); void BlockDisconnected(const std::shared_ptr &, const CBlockIndex* pindex); void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr& islock); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 33b874597e..14ba64998d 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1267,7 +1267,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio fAnonymizableTallyCachedNonDenom = false; } -void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { +void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { LOCK(cs_wallet); CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0); WalletBatch batch(GetDatabase()); @@ -1279,7 +1279,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcce } } -void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { +void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { if (reason != MemPoolRemovalReason::CONFLICT) { LOCK(cs_wallet); auto it = mapWallet.find(tx->GetHash()); @@ -1330,7 +1330,7 @@ void CWallet::blockConnected(const CBlock& block, int height) WalletBatch batch(GetDatabase()); for (size_t index = 0; index < block.vtx.size(); index++) { SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch); - transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK); + transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */); } // reset cache to make sure no longer immature coins are included diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index 0f33fbe0d9..194d734061 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -1078,7 +1078,7 @@ public: CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true); bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); - void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; void blockConnected(const CBlock& block, int height) override; void blockDisconnected(const CBlock& block, int height) override; void updatedBlockTip() override; @@ -1100,7 +1100,7 @@ public: uint256 last_failed_block; }; ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional max_height, const WalletRescanReserver& reserver, bool fUpdate); - void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override; + void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ResendWalletTransactions(); struct Balance { diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 75d394326d..e3a1d98cfe 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -28,6 +28,26 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ return true; } +bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr& /*islock*/) { return true; diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index b2e0b10bf7..534082d64a 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -58,9 +58,19 @@ public: virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); - virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock); virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote); virtual bool NotifyGovernanceObject(const std::shared_ptr& object); diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a0d004820c..eaf83097ea 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -29,7 +29,7 @@ std::list CZMQNotificationInterface::GetActiveNotif return result; } -CZMQNotificationInterface* CZMQNotificationInterface::Create() +std::unique_ptr CZMQNotificationInterface::Create() { std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; @@ -50,6 +50,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create; factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create; + factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto& entry : factories) @@ -71,7 +72,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() notificationInterface->notifiers = std::move(notifiers); if (notificationInterface->Initialize()) { - return notificationInterface.release(); + return notificationInterface; } } @@ -157,31 +158,53 @@ void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const }); } -void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, uint64_t mempool_sequence) { - // Used by BlockConnected and BlockDisconnected as well, because they're - // all the same external callback. const CTransaction& tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { - return notifier->NotifyTransaction(tx); + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) +{ + // Called for all non-block inclusion reasons + const CTransaction& tx = *ptx; + + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block disconnection - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr& islock) @@ -219,4 +242,4 @@ void CZMQNotificationInterface::NotifyRecoveredSig(const std::shared_ptr g_zmq_notification_interface; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 230dd6c8ff..4dd078a45f 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -19,14 +19,15 @@ public: std::list GetActiveNotifiers() const; - static CZMQNotificationInterface* Create(); + static std::unique_ptr Create(); protected: bool Initialize(); void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; @@ -44,6 +45,6 @@ private: std::list> notifiers; }; -extern CZMQNotificationInterface* g_zmq_notification_interface; +extern std::unique_ptr g_zmq_notification_interface; #endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 8cc4f9b33a..bebe1003e3 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,7 @@ #include #include #include +#include #include #include @@ -46,6 +48,7 @@ static const char *MSG_RAWGVOTE = "rawgovernancevote"; static const char *MSG_RAWGOBJ = "rawgovernanceobject"; static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; static const char *MSG_RAWRECSIG = "rawrecoveredsig"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -90,6 +93,20 @@ static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) return 0; } +static bool IsZMQAddressIPV6(const std::string &zmq_address) +{ + const std::string tcp_prefix = "tcp://"; + const size_t tcp_index = zmq_address.rfind(tcp_prefix); + const size_t colon_index = zmq_address.rfind(":"); + if (tcp_index == 0 && colon_index != std::string::npos) { + const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length()); + CNetAddr addr; + LookupHost(ip, addr, false); + if (addr.IsIPv6()) return true; + } + return false; +} + bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); @@ -124,6 +141,15 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } + // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6 + const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0}; + rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6)); + if (rc != 0) { + zmqError("Failed to set ZMQ_IPV6"); + zmq_close(psocket); + return false; + } + rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { @@ -200,16 +226,17 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); - char data[32]; - for (unsigned int i = 0; i < 32; i++) + uint8_t data[32]; + for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; + } return SendZmqMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -220,16 +247,17 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t { uint256 hash = transaction.GetHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); - char data[32]; - for (unsigned int i = 0; i < 32; i++) + uint8_t data[32]; + for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; + } return SendZmqMessage(MSG_HASHTX, data, 32); } bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -239,7 +267,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote) { uint256 hash = vote->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -249,7 +277,7 @@ bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermin bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr& object) { uint256 hash = object->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -259,7 +287,7 @@ bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std:: bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx) { uint256 currentHash = currentTx->GetHash(), previousHash = previousTx->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s\n", currentHash.ToString(), previousHash.ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s to %s\n", currentHash.ToString(), previousHash.ToString(), this->address); char dataCurrentHash[32], dataPreviousHash[32]; for (unsigned int i = 0; i < 32; i++) { dataCurrentHash[31 - i] = currentHash.begin()[i]; @@ -271,7 +299,7 @@ bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpend bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr &sig) { - LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s\n", sig->getMsgHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = sig->getMsgHash().begin()[i]; @@ -301,7 +329,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); @@ -322,7 +350,7 @@ bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); @@ -351,6 +379,47 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } +// Helper function to send a 'sequence' topic message with the following structure: +// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional) +static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional sequence = {}) +{ + unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)]; + for (unsigned int i = 0; i < sizeof(hash); ++i) { + data[sizeof(hash) - 1 - i] = hash.begin()[i]; + } + data[sizeof(hash)] = label; + if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence); + return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); + return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C'); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); + return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D'); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); + return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); + return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); +} + bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); @@ -373,7 +442,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote) { uint256 nHash = vote->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString(tip_mn_list)); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with vote %d to %s\n", nHash.ToString(), vote->ToString(tip_mn_list), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *vote; return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size()); @@ -382,7 +451,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermini bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr& govobj) { uint256 nHash = govobj->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->type)); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with type %d to %s\n", nHash.ToString(), ToUnderlying(govobj->type), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *govobj; return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size()); @@ -390,7 +459,7 @@ bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::s bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s to %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString(), this->address); CDataStream ssCurrent(SER_NETWORK, PROTOCOL_VERSION), ssPrevious(SER_NETWORK, PROTOCOL_VERSION); ssCurrent << *currentTx; ssPrevious << *previousTx; @@ -400,7 +469,7 @@ bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendA bool CZMQPublishRawRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr& sig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s\n", sig->getMsgHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *sig; diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index d674ef3891..7dc632d077 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -108,6 +108,15 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override; +}; + class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier { public: diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 35227ae70b..aebc07f39b 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -5,15 +5,27 @@ """Test the ZMQ notification interface.""" import struct -from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE +from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2SH_OP_TRUE +from test_framework.blocktools import create_block, create_coinbase from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import ( dashhash, hash256, + tx_from_hex, ) -from test_framework.util import assert_equal +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, +) +from test_framework.netutil import test_ipv6_local from time import sleep +# Test may be skipped and not have zmq installed +try: + import zmq +except ImportError: + pass + def hash256_reversed(byte_str): return hash256(byte_str)[::-1] @@ -22,28 +34,74 @@ def dashhash_reversed(byte_str): class ZMQSubscriber: def __init__(self, socket, topic): - self.sequence = 0 + self.sequence = None # no sequence number received yet self.socket = socket self.topic = topic - import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) - def receive(self): + # Receive message from publisher and verify that topic and sequence match + def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. - assert_equal(struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + """ + self.log.info("Testing 'sequence' publisher") + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) + self.disconnect_nodes(0, 1) + + # Mempool sequence number starts at 1 + seq_num = 1 + + # Generate 1 block in nodes[0] and receive all notifications + dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + + # Note: We are not notified of any block transactions, coinbase or mined + assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) + + # Generate 2 blocks in nodes[1] to a different address to ensure a chain split + self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE) + + # nodes[0] will reorg chain after connecting back nodes[1] + self.connect_nodes(0, 1) + + # Then we receive all block (dis)connect notifications for the 2 block reorg + assert_equal((dc_block, "D", None), seq.receive_sequence()) + block_count = self.nodes[1].getblockcount() + assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) + assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) + + # Rest of test requires wallet functionality + if self.is_wallet_compiled(): + self.log.info("Wait for tx from second node") + payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0) + self.sync_all() + self.log.info("Testing sequence notifications with mempool sequence values") + + # Should receive the broadcasted txid. + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Doesn't get published when mined, make a block and tx to "flush" the possibility + # though the mempool sequence number does go up by the number of transactions + # removed from the mempool by the block mining it. + mempool_size = len(self.nodes[0].getrawmempool()) + c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + self.sync_all() + # Make sure the number of mined transactions matches the number of txs out of mempool + mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) + assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) + seq_num += mempool_size_delta + payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) + self.sync_all() + assert_equal((c_block, "C", None), seq.receive_sequence()) + assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Spot check getrawmempool results that they only show up when asked for + assert type(self.nodes[0].getrawmempool()) is list + assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list + assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) + assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) + + self.log.info("Testing reorg notifications") + # Manually invalidate the last block to test mempool re-entry + # N.B. This part could be made more lenient in exact ordering + # since it greatly depends on inner-workings of blocks/mempool + # during "deep" re-orgs. Probably should "re-construct" + # blockchain/mempool state from notifications instead. + block_count = self.nodes[0].getblockcount() + best_hash = self.nodes[0].getbestblockhash() + self.nodes[0].invalidateblock(best_hash) + sleep(2) # Bit of room to make sure transaction things happened + + # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective + # of the time they were gathered. + assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num + + assert_equal((best_hash, "D", None), seq.receive_sequence()) + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Other things may happen but aren't wallet-deterministic so we don't test for them currently + self.nodes[0].reconsiderblock(best_hash) + self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() + + self.log.info("Evict mempool transaction by block conflict") + orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # More to be simply mined + more_tx = [] + for _ in range(5): + more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1)) + + raw_tx = self.nodes[0].getrawtransaction(orig_txid) + # Mine the tx + block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1)) + tx = tx_from_hex(raw_tx) + block.vtx.append(tx) + for txid in more_tx: + tx = tx_from_hex(self.nodes[0].getrawtransaction(txid)) + block.vtx.append(tx) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) + tip = self.nodes[0].getbestblockhash() + assert_equal(int(tip, 16), block.sha256) + orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # Flush old notifications until evicted tx original entry + (hash_str, label, mempool_seq) = seq.receive_sequence() + while hash_str != orig_txid: + (hash_str, label, mempool_seq) = seq.receive_sequence() + mempool_seq += 1 + + # Added original tx + assert_equal(label, "A") + # More transactions to be simply mined + for i in range(len(more_tx)): + assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + + mempool_seq += 1 + assert_equal((tip, "C", None), seq.receive_sequence()) + mempool_seq += len(more_tx) + # Last tx + assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() # want to make sure we didn't break "consensus" for other tests + + def test_mempool_sync(self): + """ + Use sequence notification plus getrawmempool sequence results to "sync mempool" + """ + if not self.is_wallet_compiled(): + self.log.info("Skipping mempool sync test") + return + + self.log.info("Testing 'mempool sync' usage of sequence notifier") + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) + + # In-memory counter, should always start at 1 + next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] + assert_equal(next_mempool_seq, 1) + + # Some transactions have been happening but we aren't consuming zmq notifications yet + # or we lost a ZMQ message somehow and want to start over + txids = [] + num_txs = 5 + for _ in range(num_txs): + txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)) + self.sync_all() + + # 1) Consume backlog until we get a mempool sequence number + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + while zmq_mem_seq is None: + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + + assert label == "A" or label == "R" + assert hash_str is not None + + # 2) We need to "seed" our view of the mempool + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + assert_equal(get_raw_seq, 6) + # Snapshot may be too old compared to zmq message we read off latest + while zmq_mem_seq >= get_raw_seq: + sleep(2) + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + + # Things continue to happen in the "interim" while waiting for snapshot results + for _ in range(num_txs): + txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1)) + self.sync_all() + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1) + + # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot + while True: + if zmq_mem_seq == get_raw_seq - 1: + break + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + zmq_mem_seq = mempool_sequence + if zmq_mem_seq > get_raw_seq: + raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq)) + + # 4) Moving forward, we apply the delta to our local view + # remaining txs(5) + 1 block connect + 1 final tx + expected_sequence = get_raw_seq + r_gap = 0 + for _ in range(num_txs + 1 + 1): + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + if mempool_sequence != expected_sequence: + # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its + # position in the incoming block message "C" + if label == "R": + assert mempool_sequence > expected_sequence + r_gap += mempool_sequence - expected_sequence + else: + raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") + if label == "A": + assert hash_str not in mempool_view + mempool_view.add(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "R": + assert hash_str in mempool_view + mempool_view.remove(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "C": + # (Attempt to) remove all txids from known block connects + block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] + for txid in block_txids: + if txid in mempool_view: + expected_sequence += 1 + mempool_view.remove(txid) + expected_sequence -= r_gap + r_gap = 0 + elif label == "D": + # Not useful for mempool tracking per se + continue + else: + raise Exception("Unexpected ZMQ sequence label!") + + assert_equal(self.nodes[0].getrawmempool(), [final_txid]) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) + + # 5) If you miss a zmq/mempool sequence number, go back to step (2) + + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + def test_multiple_interfaces(self): - import zmq # Set up two subscribers with different addresses - subscribers = [] - for i in range(2): - address = 'tcp://127.0.0.1:%d' % (28334 + i) - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - hashblock = ZMQSubscriber(socket, b"hashblock") - socket.connect(address) - subscribers.append({'address': address, 'hashblock': hashblock}) - - self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers]) - - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + # (note that after the reorg test, syncing would fail due to different + # chain lengths on node0 and node1; for this test we only need node0, so + # we can disable syncing blocks on the setup) + subscribers = self.setup_zmq_test([ + ("hashblock", "tcp://127.0.0.1:28334"), + ("hashblock", "tcp://127.0.0.1:28335"), + ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) # Should receive the same block hash on both subscribers - assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) - assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) + + def test_ipv6(self): + if not test_ipv6_local(): + self.log.info("Skipping IPv6 test, because IPv6 is not supported.") + return + self.log.info("Testing IPv6") + # Set up subscriber using IPv6 loopback address + subscribers = self.setup_zmq_test([ + ("hashblock", "tcp://[::1]:28332") + ], ipv6=True) + + # Generate 1 block in nodes[0] + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + + # Should receive the same block hash + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) if __name__ == '__main__': diff --git a/test/sanitizer_suppressions/ubsan b/test/sanitizer_suppressions/ubsan index af17ca71d3..1fd1b38819 100644 --- a/test/sanitizer_suppressions/ubsan +++ b/test/sanitizer_suppressions/ubsan @@ -75,7 +75,6 @@ implicit-integer-sign-change:txmempool.cpp implicit-integer-sign-change:util/strencodings.cpp implicit-integer-sign-change:util/strencodings.h implicit-integer-sign-change:validation.cpp -implicit-integer-sign-change:zmq/zmqpublishnotifier.cpp implicit-signed-integer-truncation,implicit-integer-sign-change:chain.h implicit-signed-integer-truncation,implicit-integer-sign-change:test/skiplist_tests.cpp implicit-signed-integer-truncation:addrman.cpp