Merge #16404: qa: Test ZMQ notification after chain reorg

abdfc5e89b687f73de4ab97e924c29cc27e71c15 qa: Test ZMQ notification after chain reorg (João Barbosa)
aa2622a726bc0f02152d79c888a332694678a989 qa: Refactor ZMQ test (João Barbosa)
6bc1ff915dd495f05985d3402a34dbfc3b6a08b4 doc: Add note regarding ZMQ block notification (João Barbosa)

Pull request description:

Top commit has no ACKs.

Tree-SHA512: b93237adc8c84b3aa72ccc28097090eabcb006cf408083218bebf6fec703bd0de2ded80b6879e77096872e14ba9402a6d3f923b146a54d4c4e41dcb862c3e765
This commit is contained in:
MarcoFalke 2019-08-26 09:09:30 -04:00 committed by Vijay Das Manikpuri
parent 3d8be2d355
commit cf43f40fb4
No known key found for this signature in database
GPG Key ID: DB1D81B01DB7C46E
2 changed files with 59 additions and 37 deletions

View File

@ -154,7 +154,9 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
There are several possibilities that ZMQ notification can get lost There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are during transmission depending on the communication type you are

View File

@ -12,11 +12,10 @@ from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import dashhash from test_framework.messages import dashhash
from test_framework.util import ( from test_framework.util import (
assert_equal, assert_equal,
connect_nodes,
hash256, hash256,
) )
ADDRESS = "tcp://127.0.0.1:28332"
def dashhash_helper(b): def dashhash_helper(b):
return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii') return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii')
@ -48,62 +47,59 @@ class ZMQTest (BitcoinTestFramework):
self.skip_if_no_py3_zmq() self.skip_if_no_py3_zmq()
self.skip_if_no_bitcoind_zmq() self.skip_if_no_bitcoind_zmq()
def setup_nodes(self): def run_test(self):
import zmq import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
self.test_reorg()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)
# Initialize ZMQ context and socket. def test_basic(self):
# All messages are received in the same socket which means # All messages are received in the same socket which means
# that this test fails if the publishing order changes. # that this test fails if the publishing order changes.
# Note that the publishing order is not defined in the documentation and # Note that the publishing order is not defined in the documentation and
# is subject to change. # is subject to change.
self.zmq_context = zmq.Context() import zmq
socket = self.zmq_context.socket(zmq.SUB) address = 'tcp://127.0.0.1:28332'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000) socket.set(zmq.RCVTIMEO, 60000)
socket.connect(ADDRESS) socket.connect(address)
# Subscribe to all available topics. # Subscribe to all available topics.
self.hashblock = ZMQSubscriber(socket, b"hashblock") hashblock = ZMQSubscriber(socket, b"hashblock")
self.hashtx = ZMQSubscriber(socket, b"hashtx") hashtx = ZMQSubscriber(socket, b"hashtx")
self.rawblock = ZMQSubscriber(socket, b"rawblock") rawblock = ZMQSubscriber(socket, b"rawblock")
self.rawtx = ZMQSubscriber(socket, b"rawtx") rawtx = ZMQSubscriber(socket, b"rawtx")
self.extra_args = [
["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]],
[]
]
self.add_nodes(self.num_nodes, self.extra_args)
self.start_nodes()
def run_test(self): self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
try: connect_nodes(self.nodes[0], 1)
self._zmq_test()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.zmq_context.destroy(linger=None)
def _zmq_test(self):
num_blocks = 5 num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
self.sync_all() self.sync_all()
for x in range(num_blocks): for x in range(num_blocks):
# Should receive the coinbase txid. # Should receive the coinbase txid.
txid = self.hashtx.receive() txid = hashtx.receive()
# Should receive the coinbase raw transaction. # Should receive the coinbase raw transaction.
hex = self.rawtx.receive() hex = rawtx.receive()
assert_equal(hash256(hex), txid) assert_equal(hash256(hex), txid)
# Should receive the generated block hash. # Should receive the generated block hash.
hash = self.hashblock.receive().hex() hash = hashblock.receive().hex()
assert_equal(genhashes[x], hash) assert_equal(genhashes[x], hash)
# The block should only have the coinbase txid. # The block should only have the coinbase txid.
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
# Should receive the generated raw block. # Should receive the generated raw block.
block = self.rawblock.receive() block = rawblock.receive()
assert_equal(genhashes[x], dashhash_helper(block[:80])) assert_equal(genhashes[x], dashhash_helper(block[:80]))
if self.is_wallet_compiled(): if self.is_wallet_compiled():
@ -112,23 +108,47 @@ class ZMQTest (BitcoinTestFramework):
self.sync_all() self.sync_all()
# Should receive the broadcasted txid. # Should receive the broadcasted txid.
txid = self.hashtx.receive() txid = hashtx.receive()
assert_equal(payment_txid, txid.hex()) assert_equal(payment_txid, txid.hex())
# Should receive the broadcasted raw transaction. # Should receive the broadcasted raw transaction.
hex = self.rawtx.receive() hex = rawtx.receive()
assert_equal(payment_txid, hash256(hex).hex()) assert_equal(payment_txid, hash256(hex).hex())
self.log.info("Test the getzmqnotifications RPC") self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [ assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, {"type": "pubhashblock", "address": address, "hwm": 1000},
{"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, {"type": "pubhashtx", "address": address, "hwm": 1000},
{"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, {"type": "pubrawblock", "address": address, "hwm": 1000},
{"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, {"type": "pubrawtx", "address": address, "hwm": 1000},
]) ])
assert_equal(self.nodes[1].getzmqnotifications(), []) assert_equal(self.nodes[1].getzmqnotifications(), [])
def test_reorg(self):
import zmq
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
socket.connect(address)
hashblock = ZMQSubscriber(socket, b'hashblock')
# Should only notify the tip if a reorg occurs
self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)])
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
# Generate 2 blocks in nodes[1]
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
# Should receive nodes[1] tip
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
if __name__ == '__main__': if __name__ == '__main__':
ZMQTest().main() ZMQTest().main()