Merge #6231: backport: merge bitcoin#19572, #20953, #20523, #21008, #21310, #22079, #23471, #24218 (zmq backports)

b75e83b298 merge bitcoin#24218: Fix implicit-integer-sign-change (Kittywhiskers Van Gogh)
8ecc22f51f merge bitcoin#23471: Improve ZMQ documentation (Kittywhiskers Van Gogh)
2965093c4a merge bitcoin#22079: Add support to listen on IPv6 addresses (Kittywhiskers Van Gogh)
3ac3714957 merge bitcoin#21310: fix sync-up by matching notification to generated block (Kittywhiskers Van Gogh)
7b0c725c59 merge bitcoin#21008: fix zmq test flakiness, improve speed (Kittywhiskers Van Gogh)
5e87efd04b merge bitcoin#20523: deduplicate 'sequence' publisher message creation/sending (Kittywhiskers Van Gogh)
99c730f0f3 merge bitcoin#20953: dedup zmq test setup code (node restart, topics subscription) (Kittywhiskers Van Gogh)
982c1f03d4 merge bitcoin#19572: Create "sequence" notifier, enabling client-side mempool tracking (Kittywhiskers Van Gogh)
b0b4e0fa7f zmq: Make `g_zmq_notification_interface` a smart pointer (Kittywhiskers Van Gogh)
0a1ffd30b9 zmq: extend appending address to log msg for Dash-specific notifications (Kittywhiskers Van Gogh)

Pull request description:

  ## Additional Information

  * [bitcoin#19572](https://github.com/bitcoin/bitcoin/pull/19572) introduces tests in `interface_zmq.py` that validate newly introduced "sequence" reporting of all message types (`C`, `D`, `R` and `A`). The `R` message type (removed from mempool) is tested by leveraging the RBF mechanism, which isn't present in Dash.

    In order to allow the tests to successfully pass, all fee bumping and RBF-specific code had to be removed from the tests. This test also involves creating a new block to test the `C` message (connected block) that would return a `time-too-new` error and fail unless mocktime has been disabled (which has been done in this test).

  * When backporting [bitcoin#18309](https://github.com/bitcoin/bitcoin/pull/18309) ([dash#5728](https://github.com/dashpay/dash/pull/5728)), Dash-specific ZMQ notifications did not have those changes applied to them and that particular backport was merged upstream *after* [bitcoin#19572](https://github.com/bitcoin/bitcoin/pull/19572), meaning, for completion, the changes from [bitcoin#18309](https://github.com/bitcoin/bitcoin/pull/18309) have been integrated into [bitcoin#19572](https://github.com/bitcoin/bitcoin/pull/19572)'s backport.

    As for the Dash-specific notifications, they have been covered to ensure uniformity in a separate commit.

  * The ZMQ notification interface has been converted to a smart pointer in the interest of safety (this change is eventually done upstream in 8ed4ff8e05d61a8e954d72cebdc2e1d1ab24fb84 ([bitcoin#27125](https://github.com/bitcoin/bitcoin/pull/27125)))

  ## Breaking Changes

  None expected.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)**
  - [x] I have added or updated relevant unit/integration/functional/e2e tests
  - [x] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  UdjinM6:
    utACK b75e83b298
  PastaPastaPasta:
    utACK b75e83b298
  knst:
    utACK b75e83b298

Tree-SHA512: 9f860d1203bebe0914a5102f101f646873d14754830d651fb91ed0d1285a6c1a58ffc492b07d4768324d94f53171c9a4da974cf4a0b1e5c665979eace289f6f0
This commit is contained in:
pasta 2024-08-30 22:14:04 -05:00
commit 7a9e475c26
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
26 changed files with 716 additions and 151 deletions

View File

@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
@ -58,18 +59,14 @@ class ZMQHandler():
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(body.hex())
@ -118,6 +115,12 @@ class ZMQHandler():
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(body.hex())
elif topic == b"sequence":
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

View File

@ -77,6 +77,7 @@ Currently, the following notifications are supported:
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubrawrecoveredsig=address
-zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
@ -103,6 +104,7 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n
-zmqpubsequencehwm=address
The high water mark value must be an integer greater than or equal to 0.
@ -110,16 +112,44 @@ For instance:
$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubhashblock="tcp://[::1]:28333" \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000
Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
terminator). These options can also be provided in dash.conf.
These options can also be provided in dash.conf.
The topics are:
`sequence`: the body is structured as the following based on the type of message:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
Where the 8-byte uints correspond to the mempool sequence number.
`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages).
| rawtx | <serialized transaction> | <uint32 sequence number in Little Endian>
`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages).
| hashtx | <32-byte transaction hash in Little Endian> | <uint32 sequence number in Little Endian>
`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages).
| rawblock | <serialized block> | <uint32 sequence number in Little Endian>
`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages).
| hashblock | <32-byte block hash in Little Endian> | <uint32 sequence number in Little Endian>
**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes.
ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org/4-0:_start).
@ -143,6 +173,9 @@ Setting the keepalive values appropriately for your operating environment may
improve connectivity in situations where long-lived connections are silently
dropped by network middle boxes.
Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6
hosts as well. If needed, this option has to be set on the client side too.
## Remarks
From the perspective of dashd, the ZeroMQ socket is write-only; PUB
@ -154,13 +187,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.
The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.

View File

@ -102,7 +102,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
}
}
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime)
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime,
uint64_t mempool_sequence)
{
assert(m_cj_ctx && m_llmq_ctx);
@ -111,7 +112,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason)
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence)
{
assert(m_llmq_ctx);

View File

@ -40,8 +40,9 @@ protected:
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override;
void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override;

View File

@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node)
#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface);
delete g_zmq_notification_interface;
g_zmq_notification_interface = nullptr;
UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif
@ -646,6 +645,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
@ -664,6 +664,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
@ -683,6 +684,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
@ -701,6 +703,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif
argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
@ -1736,7 +1739,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
g_zmq_notification_interface = CZMQNotificationInterface::Create();
if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface);
RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

View File

@ -258,8 +258,8 @@ public:
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}

View File

@ -611,13 +611,13 @@ public:
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx, nAcceptTime);
m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
@ -997,7 +997,7 @@ public:
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0);
notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0);
}
}
NodeContext& m_node;

View File

@ -532,9 +532,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
}
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose)
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose, bool include_mempool_sequence)
{
if (verbose) {
if (include_mempool_sequence) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
}
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) {
@ -548,14 +551,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman,
}
return o;
} else {
uint64_t mempool_sequence;
std::vector<uint256> vtxid;
pool.queryHashes(vtxid);
{
LOCK(pool.cs);
pool.queryHashes(vtxid);
mempool_sequence = pool.GetSequence();
}
UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid)
a.push_back(hash.ToString());
return a;
if (!include_mempool_sequence) {
return a;
} else {
UniValue o(UniValue::VOBJ);
o.pushKV("txids", a);
o.pushKV("mempool_sequence", mempool_sequence);
return o;
}
}
}
@ -566,6 +580,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
{"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
@ -578,6 +593,15 @@ static RPCHelpMan getrawmempool()
{
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}},
RPCResult{"for verbose = false and mempool_sequence = true",
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::ARR, "txids", "",
{
{RPCResult::Type::STR_HEX, "", "The transaction id"},
}},
{RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
}},
},
RPCExamples{
HelpExampleCli("getrawmempool", "true")
@ -589,10 +613,15 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool();
bool include_mempool_sequence = false;
if (!request.params[1].isNull()) {
include_mempool_sequence = request.params[1].get_bool();
}
const NodeContext& node = EnsureAnyNodeContext(request.context);
const CTxMemPool& mempool = EnsureMemPool(node);
LLMQContext& llmq_ctx = EnsureLLMQContext(node);
return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose);
return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose, include_mempool_sequence);
},
};
}

View File

@ -47,7 +47,7 @@ UniValue blockToJSON(BlockManager& blockman, const CBlock& block, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman);
/** Mempool to JSON */
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false);
UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false, bool include_mempool_sequence = false);
/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main);

View File

@ -168,6 +168,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" },
{ "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" },

View File

@ -51,12 +51,12 @@ struct TransactionsDelta final : public CValidationInterface {
explicit TransactionsDelta(std::set<CTransactionRef>& r, std::set<CTransactionRef>& a)
: m_removed{r}, m_added{a} {}
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */) override
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */, uint64_t) override
{
Assert(m_added.insert(tx).second);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t) override
{
Assert(m_removed.insert(tx).second);
}

View File

@ -665,12 +665,16 @@ void CTxMemPool::addUncheckedProTx(indexed_transaction_set::iterator& newit, con
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{
// We increment mempool sequence value no matter removal reason
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
const uint256 hash = it->GetTx().GetHash();

View File

@ -486,6 +486,11 @@ protected:
mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially
mutable Epoch m_epoch GUARDED_BY(cs);
// In-memory counter for external mempool tracking purposes.
// This number is incremented once every time a transaction
// is added or removed from the mempool for any reason.
mutable uint64_t m_sequence_number{1};
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false};
@ -852,6 +857,15 @@ public:
return (m_unbroadcast_txids.count(txid) != 0);
}
/** Guards this internal counter for external reporting */
uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number++;
}
uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
return m_sequence_number;
}
private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the

View File

@ -889,7 +889,7 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state);
const int64_t nAcceptTime = args.m_accept_time;
GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime);
GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime, m_pool.GetAndIncrementSequence());
const CTransaction& tx = *ptx;
auto finish = Now<SteadyMilliseconds>();

View File

@ -208,17 +208,17 @@ void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, cons
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); });
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {
auto event = [tx, nAcceptTime, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime); });
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {
auto event = [tx, nAcceptTime, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
tx->GetHash().ToString());
}
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
auto event = [tx, reason, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__,
tx->GetHash().ToString());

View File

@ -117,7 +117,8 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime) {}
virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a transaction leaving mempool.
*
@ -149,7 +150,7 @@ protected:
*
* Called on a background thread.
*/
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
@ -226,8 +227,8 @@ public:
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef&, int64_t);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason);
void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock);

View File

@ -1267,7 +1267,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio
fAnonymizableTallyCachedNonDenom = false;
}
void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {
void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {
LOCK(cs_wallet);
CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0);
WalletBatch batch(GetDatabase());
@ -1279,7 +1279,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcce
}
}
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
if (reason != MemPoolRemovalReason::CONFLICT) {
LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetHash());
@ -1330,7 +1330,7 @@ void CWallet::blockConnected(const CBlock& block, int height)
WalletBatch batch(GetDatabase());
for (size_t index = 0; index < block.vtx.size(); index++) {
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch);
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK);
transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */);
}
// reset cache to make sure no longer immature coins are included

View File

@ -1078,7 +1078,7 @@ public:
CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true);
bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void blockConnected(const CBlock& block, int height) override;
void blockDisconnected(const CBlock& block, int height) override;
void updatedBlockTip() override;
@ -1100,7 +1100,7 @@ public:
uint256 last_failed_block;
};
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate);
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override;
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
struct Balance {

View File

@ -28,6 +28,26 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
return true;
}
bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr<const llmq::CInstantSendLock>& /*islock*/)
{
return true;

View File

@ -58,9 +58,19 @@ public:
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
// Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig);
// Notifies of every block connection
virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
// Notifies of every block disconnection
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
// Notifies of every mempool acceptance
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of every mempool removal, except inclusion in blocks
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig);
virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock);
virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr<const CGovernanceVote>& vote);
virtual bool NotifyGovernanceObject(const std::shared_ptr<const Governance::Object>& object);

View File

@ -29,7 +29,7 @@ std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotif
return result;
}
CZMQNotificationInterface* CZMQNotificationInterface::Create()
std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create()
{
std::map<std::string, CZMQNotifierFactory> factories;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
@ -50,6 +50,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceObjectNotifier>;
factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishRawInstantSendDoubleSpendNotifier>;
factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create<CZMQPublishRawRecoveredSigNotifier>;
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories)
@ -71,7 +72,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
notificationInterface->notifiers = std::move(notifiers);
if (notificationInterface->Initialize()) {
return notificationInterface.release();
return notificationInterface;
}
}
@ -157,31 +158,53 @@ void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const
});
}
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime)
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, uint64_t mempool_sequence)
{
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
{
// Called for all non-block inclusion reasons
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block
TransactionAddedToMempool(ptx, 0);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockConnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockConnect(pindexConnected);
});
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection
TransactionAddedToMempool(ptx, 0);
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
return notifier->NotifyTransaction(tx);
});
}
// Next we notify BlockDisconnect listeners for *all* blocks
TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlockDisconnect(pindexDisconnected);
});
}
void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
@ -219,4 +242,4 @@ void CZMQNotificationInterface::NotifyRecoveredSig(const std::shared_ptr<const l
});
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;
std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;

View File

@ -19,14 +19,15 @@ public:
std::list<const CZMQAbstractNotifier*> GetActiveNotifiers() const;
static CZMQNotificationInterface* Create();
static std::unique_ptr<CZMQNotificationInterface> Create();
protected:
bool Initialize();
void Shutdown();
// CValidationInterface
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
@ -44,6 +45,6 @@ private:
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
};
extern CZMQNotificationInterface* g_zmq_notification_interface;
extern std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H

View File

@ -6,6 +6,7 @@
#include <chain.h>
#include <chainparams.h>
#include <netbase.h>
#include <node/blockstorage.h>
#include <streams.h>
#include <validation.h>
@ -23,6 +24,7 @@
#include <cstdarg>
#include <cstddef>
#include <map>
#include <optional>
#include <string>
#include <utility>
@ -46,6 +48,7 @@ static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
static const char *MSG_RAWRECSIG = "rawrecoveredsig";
static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -90,6 +93,20 @@ static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
return 0;
}
static bool IsZMQAddressIPV6(const std::string &zmq_address)
{
const std::string tcp_prefix = "tcp://";
const size_t tcp_index = zmq_address.rfind(tcp_prefix);
const size_t colon_index = zmq_address.rfind(":");
if (tcp_index == 0 && colon_index != std::string::npos) {
const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
CNetAddr addr;
LookupHost(ip, addr, false);
if (addr.IsIPv6()) return true;
}
return false;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
@ -124,6 +141,15 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}
// On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
if (rc != 0) {
zmqError("Failed to set ZMQ_IPV6");
zmq_close(psocket);
return false;
}
rc = zmq_bind(psocket, address.c_str());
if (rc != 0)
{
@ -200,16 +226,17 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
uint8_t data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = hash.begin()[i];
}
return SendZmqMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -220,16 +247,17 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
uint8_t data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = hash.begin()[i];
}
return SendZmqMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -239,7 +267,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa
bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr<const CGovernanceVote>& vote)
{
uint256 hash = vote->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -249,7 +277,7 @@ bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermin
bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr<const Governance::Object>& object)
{
uint256 hash = object->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -259,7 +287,7 @@ bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std::
bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx)
{
uint256 currentHash = currentTx->GetHash(), previousHash = previousTx->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s\n", currentHash.ToString(), previousHash.ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s to %s\n", currentHash.ToString(), previousHash.ToString(), this->address);
char dataCurrentHash[32], dataPreviousHash[32];
for (unsigned int i = 0; i < 32; i++) {
dataCurrentHash[31 - i] = currentHash.begin()[i];
@ -271,7 +299,7 @@ bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpend
bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr<const llmq::CRecoveredSig> &sig)
{
LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s\n", sig->getMsgHash().ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = sig->getMsgHash().begin()[i];
@ -301,7 +329,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
@ -322,7 +350,7 @@ bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex,
bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr<const llmq::CChainLockSig>& clsig)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s\n", pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
@ -351,6 +379,47 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
// Helper function to send a 'sequence' topic message with the following structure:
// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
{
unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
for (unsigned int i = 0; i < sizeof(hash); ++i) {
data[sizeof(hash) - 1 - i] = hash.begin()[i];
}
data[sizeof(hash)] = label;
if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
}
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
}
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
}
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
}
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
}
bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
@ -373,7 +442,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran
bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr<const CGovernanceVote>& vote)
{
uint256 nHash = vote->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString(tip_mn_list));
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with vote %d to %s\n", nHash.ToString(), vote->ToString(tip_mn_list), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *vote;
return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size());
@ -382,7 +451,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermini
bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr<const Governance::Object>& govobj)
{
uint256 nHash = govobj->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->type));
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with type %d to %s\n", nHash.ToString(), ToUnderlying(govobj->type), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *govobj;
return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size());
@ -390,7 +459,7 @@ bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::s
bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s to %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString(), this->address);
CDataStream ssCurrent(SER_NETWORK, PROTOCOL_VERSION), ssPrevious(SER_NETWORK, PROTOCOL_VERSION);
ssCurrent << *currentTx;
ssPrevious << *previousTx;
@ -400,7 +469,7 @@ bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendA
bool CZMQPublishRawRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr<const llmq::CRecoveredSig>& sig)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s\n", sig->getMsgHash().ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *sig;

View File

@ -108,6 +108,15 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlockConnect(const CBlockIndex *pindex) override;
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override;
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override;
};
class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier
{
public:

View File

@ -5,15 +5,27 @@
"""Test the ZMQ notification interface."""
import struct
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2SH_OP_TRUE
from test_framework.blocktools import create_block, create_coinbase
from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import (
dashhash,
hash256,
tx_from_hex,
)
from test_framework.util import assert_equal
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
from test_framework.netutil import test_ipv6_local
from time import sleep
# Test may be skipped and not have zmq installed
try:
import zmq
except ImportError:
pass
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
@ -22,28 +34,74 @@ def dashhash_reversed(byte_str):
class ZMQSubscriber:
def __init__(self, socket, topic):
self.sequence = 0
self.sequence = None # no sequence number received yet
self.socket = socket
self.topic = topic
import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
# Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
received_seq = struct.unpack('<I', seq)[-1]
if self.sequence is None:
self.sequence = received_seq
else:
assert_equal(received_seq, self.sequence)
self.sequence += 1
return body
def receive(self):
return self._receive_from_publisher_and_check()
def receive_sequence(self):
body = self._receive_from_publisher_and_check()
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
if mempool_sequence is not None:
assert label == "A" or label == "R"
else:
assert label == "D" or label == "C"
return (hash, label, mempool_sequence)
class ZMQTestSetupBlock:
"""Helper class for setting up a ZMQ test via the "sync up" procedure.
Generates a block on the specified node on instantiation and provides a
method to check whether a ZMQ notification matches, i.e. the event was
caused by this generated block. Assumes that a notification either contains
the generated block's hash, it's (coinbase) transaction id, the raw block or
raw transaction data.
"""
def __init__(self, node):
self.block_hash = node.generate(1)[0]
coinbase = node.getblock(self.block_hash, 2)['tx'][0]
self.tx_hash = coinbase['txid']
self.raw_tx = coinbase['hex']
self.raw_block = node.getblock(self.block_hash, 0)
def caused_notification(self, notification):
return (
self.block_hash in notification
or self.tx_hash in notification
or self.raw_block in notification
or self.raw_tx in notification
)
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
if self.is_wallet_compiled():
self.requires_wallet = True
# This test isn't testing txn relay/timing, so set whitelist on the
# peers for instant txn relay. This speeds up the test run time 2-3x.
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
@ -51,48 +109,86 @@ class ZMQTest (BitcoinTestFramework):
# TODO: drop this check after migration to MiniWallet, see bitcoin/bitcoin#24653
self.skip_if_no_bdb()
def setup_network(self):
self.disable_mocktime()
super().setup_network()
def run_test(self):
import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
self.test_multiple_interfaces()
self.test_ipv6()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
if ipv6:
socket.setsockopt(zmq.IPV6, 1)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
self.extra_args[0])
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive the corresponding notification on all subscribers
# 3. If all subscribers get the message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
test_block = ZMQTestSetupBlock(self.nodes[0])
recv_failed = False
for sub in subscribers:
try:
while not test_block.caused_notification(sub.receive().hex()):
self.log.debug("Ignoring sync-up notification for previously generated block.")
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break
# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()
return subscribers
def test_basic(self):
import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
self.zmq_context = zmq.Context()
address = 'tcp://127.0.0.1:28332'
sockets = []
subs = []
services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"]
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
sockets[-1].set(zmq.RCVTIMEO, 60000)
subs.append(ZMQSubscriber(sockets[-1], service))
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
# Subscribe to all available topics.
hashblock = subs[0]
hashtx = subs[1]
rawblock = subs[2]
rawtx = subs[3]
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
self.connect_nodes(0, 1)
for socket in sockets:
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
@ -127,6 +223,9 @@ class ZMQTest (BitcoinTestFramework):
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())
# TODO: Add "R" sequence testing, potentially using txes replaced with
# islocked txes
# Should receive the broadcasted raw transaction.
hex = rawtx.receive()
assert_equal(payment_txid, hash256_reversed(hex).hex())
@ -155,28 +254,13 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Skipping reorg test because wallet is disabled")
return
import zmq
address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
sockets = []
subs = []
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
# 2 second timeout to check end of notifications
sockets[-1].set(zmq.RCVTIMEO, 2000)
subs.append(ZMQSubscriber(sockets[-1], service))
# Subscribe to all available topics.
hashblock = subs[0]
hashtx = subs[1]
# Should only notify the tip if a reorg occurs
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]])
for socket in sockets:
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@ -186,8 +270,8 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
# Generate 2 blocks in nodes[1]
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
# Generate 2 blocks in nodes[1] to a different address to ensure split
connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
@ -213,29 +297,281 @@ class ZMQTest (BitcoinTestFramework):
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order
of processing, regardless of IBD, re-orgs, etc.
Format of messages:
<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
# Generate 1 block in nodes[0] and receive all notifications
dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
# Note: We are not notified of any block transactions, coinbase or mined
assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
# Generate 2 blocks in nodes[1] to a different address to ensure a chain split
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
self.connect_nodes(0, 1)
# Then we receive all block (dis)connect notifications for the 2 block reorg
assert_equal((dc_block, "D", None), seq.receive_sequence())
block_count = self.nodes[1].getblockcount()
assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
# Rest of test requires wallet functionality
if self.is_wallet_compiled():
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0)
self.sync_all()
self.log.info("Testing sequence notifications with mempool sequence values")
# Should receive the broadcasted txid.
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Doesn't get published when mined, make a block and tx to "flush" the possibility
# though the mempool sequence number does go up by the number of transactions
# removed from the mempool by the block mining it.
mempool_size = len(self.nodes[0].getrawmempool())
c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
self.sync_all()
# Make sure the number of mined transactions matches the number of txs out of mempool
mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
seq_num += mempool_size_delta
payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
assert_equal((c_block, "C", None), seq.receive_sequence())
assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Spot check getrawmempool results that they only show up when asked for
assert type(self.nodes[0].getrawmempool()) is list
assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
self.log.info("Testing reorg notifications")
# Manually invalidate the last block to test mempool re-entry
# N.B. This part could be made more lenient in exact ordering
# since it greatly depends on inner-workings of blocks/mempool
# during "deep" re-orgs. Probably should "re-construct"
# blockchain/mempool state from notifications instead.
block_count = self.nodes[0].getblockcount()
best_hash = self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(best_hash)
sleep(2) # Bit of room to make sure transaction things happened
# Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
# of the time they were gathered.
assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
assert_equal((best_hash, "D", None), seq.receive_sequence())
assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
seq_num += 1
# Other things may happen but aren't wallet-deterministic so we don't test for them currently
self.nodes[0].reconsiderblock(best_hash)
self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all()
self.log.info("Evict mempool transaction by block conflict")
orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)
# More to be simply mined
more_tx = []
for _ in range(5):
more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
raw_tx = self.nodes[0].getrawtransaction(orig_txid)
# Mine the tx
block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
tx = tx_from_hex(raw_tx)
block.vtx.append(tx)
for txid in more_tx:
tx = tx_from_hex(self.nodes[0].getrawtransaction(txid))
block.vtx.append(tx)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
tip = self.nodes[0].getbestblockhash()
assert_equal(int(tip, 16), block.sha256)
orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)
# Flush old notifications until evicted tx original entry
(hash_str, label, mempool_seq) = seq.receive_sequence()
while hash_str != orig_txid:
(hash_str, label, mempool_seq) = seq.receive_sequence()
mempool_seq += 1
# Added original tx
assert_equal(label, "A")
# More transactions to be simply mined
for i in range(len(more_tx)):
assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
mempool_seq += 1
assert_equal((tip, "C", None), seq.receive_sequence())
mempool_seq += len(more_tx)
# Last tx
assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
mempool_seq += 1
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all() # want to make sure we didn't break "consensus" for other tests
def test_mempool_sync(self):
"""
Use sequence notification plus getrawmempool sequence results to "sync mempool"
"""
if not self.is_wallet_compiled():
self.log.info("Skipping mempool sync test")
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
assert_equal(next_mempool_seq, 1)
# Some transactions have been happening but we aren't consuming zmq notifications yet
# or we lost a ZMQ message somehow and want to start over
txids = []
num_txs = 5
for _ in range(num_txs):
txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0))
self.sync_all()
# 1) Consume backlog until we get a mempool sequence number
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
while zmq_mem_seq is None:
(hash_str, label, zmq_mem_seq) = seq.receive_sequence()
assert label == "A" or label == "R"
assert hash_str is not None
# 2) We need to "seed" our view of the mempool
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
assert_equal(get_raw_seq, 6)
# Snapshot may be too old compared to zmq message we read off latest
while zmq_mem_seq >= get_raw_seq:
sleep(2)
mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
mempool_view = set(mempool_snapshot["txids"])
get_raw_seq = mempool_snapshot["mempool_sequence"]
# Things continue to happen in the "interim" while waiting for snapshot results
for _ in range(num_txs):
txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1))
self.sync_all()
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1)
# 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
while True:
if zmq_mem_seq == get_raw_seq - 1:
break
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
zmq_mem_seq = mempool_sequence
if zmq_mem_seq > get_raw_seq:
raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
# 4) Moving forward, we apply the delta to our local view
# remaining txs(5) + 1 block connect + 1 final tx
expected_sequence = get_raw_seq
r_gap = 0
for _ in range(num_txs + 1 + 1):
(hash_str, label, mempool_sequence) = seq.receive_sequence()
if mempool_sequence is not None:
if mempool_sequence != expected_sequence:
# Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
# position in the incoming block message "C"
if label == "R":
assert mempool_sequence > expected_sequence
r_gap += mempool_sequence - expected_sequence
else:
raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
if label == "A":
assert hash_str not in mempool_view
mempool_view.add(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "R":
assert hash_str in mempool_view
mempool_view.remove(hash_str)
expected_sequence = mempool_sequence + 1
elif label == "C":
# (Attempt to) remove all txids from known block connects
block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
for txid in block_txids:
if txid in mempool_view:
expected_sequence += 1
mempool_view.remove(txid)
expected_sequence -= r_gap
r_gap = 0
elif label == "D":
# Not useful for mempool tracking per se
continue
else:
raise Exception("Unexpected ZMQ sequence label!")
assert_equal(self.nodes[0].getrawmempool(), [final_txid])
assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
# 5) If you miss a zmq/mempool sequence number, go back to step (2)
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
def test_multiple_interfaces(self):
import zmq
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = 'tcp://127.0.0.1:%d' % (28334 + i)
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})
self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
], sync_blocks=False)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
def test_ipv6(self):
if not test_ipv6_local():
self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
return
self.log.info("Testing IPv6")
# Set up subscriber using IPv6 loopback address
subscribers = self.setup_zmq_test([
("hashblock", "tcp://[::1]:28332")
], ipv6=True)
# Generate 1 block in nodes[0]
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
if __name__ == '__main__':

View File

@ -75,7 +75,6 @@ implicit-integer-sign-change:txmempool.cpp
implicit-integer-sign-change:util/strencodings.cpp
implicit-integer-sign-change:util/strencodings.h
implicit-integer-sign-change:validation.cpp
implicit-integer-sign-change:zmq/zmqpublishnotifier.cpp
implicit-signed-integer-truncation,implicit-integer-sign-change:chain.h
implicit-signed-integer-truncation,implicit-integer-sign-change:test/skiplist_tests.cpp
implicit-signed-integer-truncation:addrman.cpp