merge bitcoin#19572: Create "sequence" notifier, enabling client-side mempool tracking

This commit is contained in:
Kittywhiskers Van Gogh 2024-08-26 15:35:12 +00:00
parent b0b4e0fa7f
commit 982c1f03d4
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
25 changed files with 548 additions and 71 deletions

View File

@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \ -zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \ -zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event future at the end of the function. Since it never returns with the event
@ -58,18 +59,14 @@ class ZMQHandler():
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
async def handle(self) : async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart() topic, body, seq = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
sequence = "Unknown" sequence = "Unknown"
if len(seq) == 4:
if len(msg[-1]) == 4: sequence = str(struct.unpack('<I', seq)[-1])
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if topic == b"hashblock": if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -') print('- HASH BLOCK ('+sequence+') -')
print(body.hex()) print(body.hex())
@ -118,6 +115,12 @@ class ZMQHandler():
elif topic == b"rawinstantsenddoublespend": elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -') print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(body.hex()) print(body.hex())
elif topic == b"sequence":
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message # schedule ourselves to receive the next message
asyncio.ensure_future(self.handle()) asyncio.ensure_future(self.handle())

View File

@ -77,6 +77,7 @@ Currently, the following notifications are supported:
-zmqpubrawgovernanceobject=address -zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address -zmqpubrawinstantsenddoublespend=address
-zmqpubrawrecoveredsig=address -zmqpubrawrecoveredsig=address
-zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification. address. The same address can be used in more than one notification.
@ -103,6 +104,7 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubrawgovernanceobjecthwm=n -zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n -zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n -zmqpubrawrecoveredsighwm=n
-zmqpubsequencehwm=address
The high water mark value must be an integer greater than or equal to 0. The high water mark value must be an integer greater than or equal to 0.
@ -117,7 +119,15 @@ Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32 terminator) and the body is the transaction hash (32
bytes). bytes) for all but `sequence` topic. For `sequence`, the body
is structured as the following based on the type of message:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
Where the 8-byte uints correspond to the mempool sequence number.
These options can also be provided in dash.conf. These options can also be provided in dash.conf.
@ -154,13 +164,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities, is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling. using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur Note that for `*block` topics, when the block chain tip changes,
and just the tip will be notified. It is up to the subscriber to a reorganisation may occur and just the tip will be notified.
retrieve the chain from the last known block to the new tip. Also note It is up to the subscriber to retrieve the chain from the last known
that no notification occurs if the tip was in the active chain - this block to the new tip. Also note that no notification will occur if the tip
is the case after calling invalidateblock RPC. was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.
There are several possibilities that ZMQ notification can get lost There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are during transmission depending on the communication type you are
using. Dashd appends an up-counting sequence number to each using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications. notification which allows listeners to detect lost notifications.
The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.

View File

@ -103,7 +103,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
} }
} }
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime,
uint64_t mempool_sequence)
{ {
assert(m_cj_ctx && m_llmq_ctx); assert(m_cj_ctx && m_llmq_ctx);
@ -112,7 +113,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
} }
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence)
{ {
assert(m_llmq_ctx); assert(m_llmq_ctx);

View File

@ -40,8 +40,9 @@ protected:
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override; void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override;
void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override; void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override; void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override; void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override; void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override;

View File

@ -645,6 +645,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
@ -663,6 +664,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else #else
hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>"); hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
@ -682,6 +684,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtx=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>"); hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>"); hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>"); hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>"); hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>"); hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
@ -700,6 +703,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>"); hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>"); hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>"); hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif #endif
argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);

View File

@ -258,8 +258,8 @@ public:
{ {
public: public:
virtual ~Notifications() {} virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {} virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {} virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {} virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {} virtual void updatedBlockTip() {}

View File

@ -611,13 +611,13 @@ public:
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications) explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {} : m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default; virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override
{ {
m_notifications->transactionAddedToMempool(tx, nAcceptTime); m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence);
} }
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{ {
m_notifications->transactionRemovedFromMempool(tx, reason); m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
} }
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{ {
@ -997,7 +997,7 @@ public:
if (!m_node.mempool) return; if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs); LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) { for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0); notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0);
} }
} }
NodeContext& m_node; NodeContext& m_node;

View File

@ -532,9 +532,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
} }
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose) UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose, bool include_mempool_sequence)
{ {
if (verbose) { if (verbose) {
if (include_mempool_sequence) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
}
LOCK(pool.cs); LOCK(pool.cs);
UniValue o(UniValue::VOBJ); UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) { for (const CTxMemPoolEntry& e : pool.mapTx) {
@ -548,14 +551,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman,
} }
return o; return o;
} else { } else {
uint64_t mempool_sequence;
std::vector<uint256> vtxid; std::vector<uint256> vtxid;
pool.queryHashes(vtxid); {
LOCK(pool.cs);
pool.queryHashes(vtxid);
mempool_sequence = pool.GetSequence();
}
UniValue a(UniValue::VARR); UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid) for (const uint256& hash : vtxid)
a.push_back(hash.ToString()); a.push_back(hash.ToString());
return a; if (!include_mempool_sequence) {
return a;
} else {
UniValue o(UniValue::VOBJ);
o.pushKV("txids", a);
o.pushKV("mempool_sequence", mempool_sequence);
return o;
}
} }
} }
@ -566,6 +580,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n", "\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{ {
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
}, },
{ {
RPCResult{"for verbose = false", RPCResult{"for verbose = false",
@ -578,6 +593,15 @@ static RPCHelpMan getrawmempool()
{ {
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}}, }},
RPCResult{"for verbose = false and mempool_sequence = true",
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::ARR, "txids", "",
{
{RPCResult::Type::STR_HEX, "", "The transaction id"},
}},
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
}},
}, },
RPCExamples{ RPCExamples{
HelpExampleCli("getrawmempool", "true") HelpExampleCli("getrawmempool", "true")
@ -589,10 +613,15 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull()) if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool(); fVerbose = request.params[0].get_bool();
bool include_mempool_sequence = false;
if (!request.params[1].isNull()) {
include_mempool_sequence = request.params[1].get_bool();
}
const NodeContext& node = EnsureAnyNodeContext(request.context); const NodeContext& node = EnsureAnyNodeContext(request.context);
const CTxMemPool& mempool = EnsureMemPool(node); const CTxMemPool& mempool = EnsureMemPool(node);
LLMQContext& llmq_ctx = EnsureLLMQContext(node); LLMQContext& llmq_ctx = EnsureLLMQContext(node);
return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose); return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose, include_mempool_sequence);
}, },
}; };
} }

View File

@ -47,7 +47,7 @@ UniValue blockToJSON(BlockManager& blockman, const CBlock& block, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman); UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman);
/** Mempool to JSON */ /** Mempool to JSON */
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false); UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false, bool include_mempool_sequence = false);
/** Block header to JSON */ /** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main); UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main);

View File

@ -168,6 +168,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" }, { "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" }, { "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" }, { "getrawmempool", 0, "verbose" },
{ "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" }, { "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" }, { "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" }, { "estimaterawfee", 1, "threshold" },

View File

@ -51,12 +51,12 @@ struct TransactionsDelta final : public CValidationInterface {
explicit TransactionsDelta(std::set<CTransactionRef>& r, std::set<CTransactionRef>& a) explicit TransactionsDelta(std::set<CTransactionRef>& r, std::set<CTransactionRef>& a)
: m_removed{r}, m_added{a} {} : m_removed{r}, m_added{a} {}
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */) override void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */, uint64_t) override
{ {
Assert(m_added.insert(tx).second); Assert(m_added.insert(tx).second);
} }
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t) override
{ {
Assert(m_removed.insert(tx).second); Assert(m_removed.insert(tx).second);
} }

View File

@ -607,12 +607,16 @@ void CTxMemPool::addUncheckedProTx(indexed_transaction_set::iterator& newit, con
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{ {
// We increment mempool sequence value no matter removal reason
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
if (reason != MemPoolRemovalReason::BLOCK) { if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool // Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested // for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected // in transactions included in blocks can subscribe to the BlockConnected
// notification. // notification.
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason); GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
} }
const uint256 hash = it->GetTx().GetHash(); const uint256 hash = it->GetTx().GetHash();

View File

@ -485,6 +485,11 @@ protected:
mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially
mutable Epoch m_epoch GUARDED_BY(cs); mutable Epoch m_epoch GUARDED_BY(cs);
// In-memory counter for external mempool tracking purposes.
// This number is incremented once every time a transaction
// is added or removed from the mempool for any reason.
mutable uint64_t m_sequence_number{1};
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs); void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false}; bool m_is_loaded GUARDED_BY(cs){false};
@ -810,6 +815,15 @@ public:
return (m_unbroadcast_txids.count(txid) != 0); return (m_unbroadcast_txids.count(txid) != 0);
} }
/** Guards this internal counter for external reporting */
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number++;
}
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number;
}
private: private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the * the descendants for a single transaction that has been added to the

View File

@ -889,7 +889,7 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state); if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state);
const int64_t nAcceptTime = args.m_accept_time; const int64_t nAcceptTime = args.m_accept_time;
GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime); GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime, m_pool.GetAndIncrementSequence());
const CTransaction& tx = *ptx; const CTransaction& tx = *ptx;
auto finish = Now<SteadyMilliseconds>(); auto finish = Now<SteadyMilliseconds>();

View File

@ -208,17 +208,17 @@ void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, cons
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); }); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); });
} }
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {
auto event = [tx, nAcceptTime, this] { auto event = [tx, nAcceptTime, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime); }); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime, mempool_sequence); });
}; };
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
tx->GetHash().ToString()); tx->GetHash().ToString());
} }
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, this] { auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); }); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
}; };
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
tx->GetHash().ToString()); tx->GetHash().ToString());

View File

@ -117,7 +117,8 @@ protected:
* *
* Called on a background thread. * Called on a background thread.
*/ */
virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime) {} virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime, uint64_t mempool_sequence) {}
/** /**
* Notifies listeners of a transaction leaving mempool. * Notifies listeners of a transaction leaving mempool.
* *
@ -149,7 +150,7 @@ protected:
* *
* Called on a background thread. * Called on a background thread.
*/ */
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/** /**
* Notifies listeners of a block being connected. * Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result. * Provides a vector of transactions evicted from the mempool as a result.
@ -226,8 +227,8 @@ public:
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef&, int64_t); void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason); void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex); void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex); void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock); void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock);

View File

@ -1267,7 +1267,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio
fAnonymizableTallyCachedNonDenom = false; fAnonymizableTallyCachedNonDenom = false;
} }
void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {
LOCK(cs_wallet); LOCK(cs_wallet);
CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0); CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0);
WalletBatch batch(GetDatabase()); WalletBatch batch(GetDatabase());
@ -1279,7 +1279,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcce
} }
} }
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
if (reason != MemPoolRemovalReason::CONFLICT) { if (reason != MemPoolRemovalReason::CONFLICT) {
LOCK(cs_wallet); LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetHash()); auto it = mapWallet.find(tx->GetHash());
@ -1330,7 +1330,7 @@ void CWallet::blockConnected(const CBlock& block, int height)
WalletBatch batch(GetDatabase()); WalletBatch batch(GetDatabase());
for (size_t index = 0; index < block.vtx.size(); index++) { for (size_t index = 0; index < block.vtx.size(); index++) {
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch); SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch);
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK); transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */);
} }
// reset cache to make sure no longer immature coins are included // reset cache to make sure no longer immature coins are included

View File

@ -1078,7 +1078,7 @@ public:
CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true); CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true);
bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void blockConnected(const CBlock& block, int height) override; void blockConnected(const CBlock& block, int height) override;
void blockDisconnected(const CBlock& block, int height) override; void blockDisconnected(const CBlock& block, int height) override;
void updatedBlockTip() override; void updatedBlockTip() override;
@ -1100,7 +1100,7 @@ public:
uint256 last_failed_block; uint256 last_failed_block;
}; };
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate); ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate);
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override; void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions(); void ResendWalletTransactions();
struct Balance { struct Balance {

View File

@ -28,6 +28,26 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
return true; return true;
} }
bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr<const llmq::CInstantSendLock>& /*islock*/) bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr<const llmq::CInstantSendLock>& /*islock*/)
{ {
return true; return true;

View File

@ -58,9 +58,19 @@ public:
virtual bool Initialize(void *pcontext) = 0; virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0; virtual void Shutdown() = 0;
// Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex); virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig); // Notifies of every block connection
virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
// Notifies of every block disconnection
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
// Notifies of every mempool acceptance
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of every mempool removal, except inclusion in blocks
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction); virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig);
virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock); virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock);
virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr<const CGovernanceVote>& vote); virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr<const CGovernanceVote>& vote);
virtual bool NotifyGovernanceObject(const std::shared_ptr<const Governance::Object>& object); virtual bool NotifyGovernanceObject(const std::shared_ptr<const Governance::Object>& object);

View File

@ -50,6 +50,7 @@ std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create()
factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceObjectNotifier>; factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceObjectNotifier>;
factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishRawInstantSendDoubleSpendNotifier>; factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishRawInstantSendDoubleSpendNotifier>;
factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create<CZMQPublishRawRecoveredSigNotifier>; factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create<CZMQPublishRawRecoveredSigNotifier>;
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories) for (const auto& entry : factories)
@ -157,31 +158,53 @@ void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const
}); });
} }
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, uint64_t mempool_sequence)
{ {
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx; const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx); return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
{
// Called for all non-block inclusion reasons
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
}); });
} }
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
{ {
for (const CTransactionRef& ptx : pblock->vtx) { for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block const CTransaction& tx = *ptx;
TransactionAddedToMempool(ptx, 0); TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
} }
// Next we notify BlockConnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockConnect(pindexConnected);
});
} }
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
{ {
for (const CTransactionRef& ptx : pblock->vtx) { for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection const CTransaction& tx = *ptx;
TransactionAddedToMempool(ptx, 0); TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
} }
// Next we notify BlockDisconnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockDisconnect(pindexDisconnected);
});
} }
void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock) void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock)

View File

@ -26,7 +26,8 @@ protected:
void Shutdown(); void Shutdown();
// CValidationInterface // CValidationInterface
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override; void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override; void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;

View File

@ -46,6 +46,7 @@ static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject"; static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
static const char *MSG_RAWRECSIG = "rawrecoveredsig"; static const char *MSG_RAWRECSIG = "rawrecoveredsig";
static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message // Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -351,6 +352,53 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
} }
// TODO: Dedup this code to take label char, log string
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'C'; // Block (C)onnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
}
bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock) bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{ {
uint256 hash = transaction->GetHash(); uint256 hash = transaction->GetHash();

View File

@ -108,6 +108,15 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override; bool NotifyTransaction(const CTransaction &transaction) override;
}; };
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlockConnect(const CBlockIndex *pindex) override;
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override;
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override;
};
class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier
{ {
public: public:

View File

@ -5,15 +5,26 @@
"""Test the ZMQ notification interface.""" """Test the ZMQ notification interface."""
import struct import struct
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2SH_OP_TRUE
from test_framework.blocktools import create_block, create_coinbase
from test_framework.test_framework import BitcoinTestFramework from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import ( from test_framework.messages import (
dashhash, dashhash,
hash256, hash256,
tx_from_hex,
)
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
) )
from test_framework.util import assert_equal
from time import sleep from time import sleep
# Test may be skipped and not have zmq installed
try:
import zmq
except ImportError:
pass
def hash256_reversed(byte_str): def hash256_reversed(byte_str):
return hash256(byte_str)[::-1] return hash256(byte_str)[::-1]
@ -26,7 +37,6 @@ class ZMQSubscriber:
self.socket = socket self.socket = socket
self.topic = topic self.topic = topic
import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self): def receive(self):
@ -38,6 +48,22 @@ class ZMQSubscriber:
self.sequence += 1 self.sequence += 1
return body return body
def receive_sequence(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
self.sequence += 1
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
if mempool_sequence is not None:
assert label == "A" or label == "R"
else:
assert label == "D" or label == "C"
return (hash, label, mempool_sequence)
class ZMQTest (BitcoinTestFramework): class ZMQTest (BitcoinTestFramework):
def set_test_params(self): def set_test_params(self):
@ -51,11 +77,16 @@ class ZMQTest (BitcoinTestFramework):
# TODO: drop this check after migration to MiniWallet, see bitcoin/bitcoin#24653 # TODO: drop this check after migration to MiniWallet, see bitcoin/bitcoin#24653
self.skip_if_no_bdb() self.skip_if_no_bdb()
def setup_network(self):
self.disable_mocktime()
super().setup_network()
def run_test(self): def run_test(self):
import zmq
self.ctx = zmq.Context() self.ctx = zmq.Context()
try: try:
self.test_basic() self.test_basic()
self.test_sequence()
self.test_mempool_sync()
self.test_reorg() self.test_reorg()
self.test_multiple_interfaces() self.test_multiple_interfaces()
finally: finally:
@ -64,7 +95,6 @@ class ZMQTest (BitcoinTestFramework):
self.ctx.destroy(linger=None) self.ctx.destroy(linger=None)
def test_basic(self): def test_basic(self):
import zmq
# Invalid zmq arguments don't take down the node, see #17185. # Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@ -127,6 +157,9 @@ class ZMQTest (BitcoinTestFramework):
txid = hashtx.receive() txid = hashtx.receive()
assert_equal(payment_txid, txid.hex()) assert_equal(payment_txid, txid.hex())
# TODO: Add "R" sequence testing, potentially using txes replaced with
# islocked txes
# Should receive the broadcasted raw transaction. # Should receive the broadcasted raw transaction.
hex = rawtx.receive() hex = rawtx.receive()
assert_equal(payment_txid, hash256_reversed(hex).hex()) assert_equal(payment_txid, hash256_reversed(hex).hex())
@ -155,7 +188,6 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Skipping reorg test because wallet is disabled") self.log.info("Skipping reorg test because wallet is disabled")
return return
import zmq
address = 'tcp://127.0.0.1:28333' address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"] services = [b"hashblock", b"hashtx"]
@ -186,8 +218,8 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), payment_txid) assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb) assert_equal(hashtx.receive().hex(), disconnect_cb)
# Generate 2 blocks in nodes[1] # Generate 2 blocks in nodes[1] to a different address to ensure split
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE) connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1] # nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1) self.connect_nodes(0, 1)
@ -213,8 +245,266 @@ class ZMQTest (BitcoinTestFramework):
# And the current tip # And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]) assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order
of processing, regardless of IBD, re-orgs, etc.
Format of messages:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Mempool sequence number starts at 1
seq_num = 1
# Generate 1 block in nodes[0] and receive all notifications
dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
# Note: We are not notified of any block transactions, coinbase or mined
assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
# Generate 2 blocks in nodes[1] to a different address to ensure a chain split
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
# Then we receive all block (dis)connect notifications for the 2 block reorg
assert_equal((dc_block, "D", None), seq.receive_sequence())
block_count = self.nodes[1].getblockcount()
assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
# Rest of test requires wallet functionality
if self.is_wallet_compiled():
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0)
self.sync_all()
self.log.info("Testing sequence notifications with mempool sequence values")
# Should receive the broadcasted txid.
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Doesn't get published when mined, make a block and tx to "flush" the possibility
# though the mempool sequence number does go up by the number of transactions
# removed from the mempool by the block mining it.
mempool_size = len(self.nodes[0].getrawmempool())
c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
self.sync_all()
# Make sure the number of mined transactions matches the number of txs out of mempool
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
seq_num += mempool_size_delta
payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
assert_equal((c_block, "C", None), seq.receive_sequence())
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Spot check getrawmempool results that they only show up when asked for
assert type(self.nodes[0].getrawmempool()) is list
assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
self.log.info("Testing reorg notifications")
# Manually invalidate the last block to test mempool re-entry
# N.B. This part could be made more lenient in exact ordering
# since it greatly depends on inner-workings of blocks/mempool
# during "deep" re-orgs. Probably should "re-construct"
# blockchain/mempool state from notifications instead.
block_count = self.nodes[0].getblockcount()
best_hash = self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(best_hash)
sleep(2) # Bit of room to make sure transaction things happened
# Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
# of the time they were gathered.
assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
assert_equal((best_hash, "D", None), seq.receive_sequence())
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Other things may happen but aren't wallet-deterministic so we don't test for them currently
self.nodes[0].reconsiderblock(best_hash)
self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all()
self.log.info("Evict mempool transaction by block conflict")
orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)
# More to be simply mined
more_tx = []
for _ in range(5):
more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
raw_tx = self.nodes[0].getrawtransaction(orig_txid)
# Mine the tx
block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
tx = tx_from_hex(raw_tx)
block.vtx.append(tx)
for txid in more_tx:
tx = tx_from_hex(self.nodes[0].getrawtransaction(txid))
block.vtx.append(tx)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
tip = self.nodes[0].getbestblockhash()
assert_equal(int(tip, 16), block.sha256)
orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)
# Flush old notifications until evicted tx original entry
(hash_str, label, mempool_seq) = seq.receive_sequence()
while hash_str != orig_txid:
(hash_str, label, mempool_seq) = seq.receive_sequence()
mempool_seq += 1
# Added original tx
assert_equal(label, "A")
# More transactions to be simply mined
for i in range(len(more_tx)):
assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
mempool_seq += 1
assert_equal((tip, "C", None), seq.receive_sequence())
mempool_seq += len(more_tx)
# Last tx
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all() # want to make sure we didn't break "consensus" for other tests
def test_mempool_sync(self):
"""
Use sequence notification plus getrawmempool sequence results to "sync mempool"
"""
if not self.is_wallet_compiled():
self.log.info("Skipping mempool sync test")
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
self.connect_nodes(0, 1)
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
assert_equal(next_mempool_seq, 1)
# Some transactions have been happening but we aren't consuming zmq notifications yet
# or we lost a ZMQ message somehow and want to start over
txids = []
num_txs = 5
for _ in range(num_txs):
txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0))
self.sync_all()
# 1) Consume backlog until we get a mempool sequence number
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
while zmq_mem_seq is None:
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
assert label == "A" or label == "R"
assert hash_str is not None
# 2) We need to "seed" our view of the mempool
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
assert_equal(get_raw_seq, 6)
# Snapshot may be too old compared to zmq message we read off latest
while zmq_mem_seq >= get_raw_seq:
sleep(2)
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
# Things continue to happen in the "interim" while waiting for snapshot results
for _ in range(num_txs):
txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1))
self.sync_all()
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1)
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
while True:
if zmq_mem_seq == get_raw_seq - 1:
break
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
# 4) Moving forward, we apply the delta to our local view
# remaining txs(5) + 1 block connect + 1 final tx
expected_sequence = get_raw_seq
r_gap = 0
for _ in range(num_txs + 1 + 1):
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
if mempool_sequence != expected_sequence:
# Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
# position in the incoming block message "C"
if label == "R":
assert mempool_sequence > expected_sequence
r_gap += mempool_sequence - expected_sequence
else:
raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "R":
assert hash_str in mempool_view
mempool_view.remove(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "C":
# (Attempt to) remove all txids from known block connects
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
for txid in block_txids:
if txid in mempool_view:
expected_sequence += 1
mempool_view.remove(txid)
expected_sequence -= r_gap
r_gap = 0
elif label == "D":
# Not useful for mempool tracking per se
continue
else:
raise Exception("Unexpected ZMQ sequence label!")
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
def test_multiple_interfaces(self): def test_multiple_interfaces(self):
import zmq
# Set up two subscribers with different addresses # Set up two subscribers with different addresses
subscribers = [] subscribers = []
for i in range(2): for i in range(2):