diff --git a/doc/zmq.md b/doc/zmq.md index bc6ec22a90..ee8fd82430 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -154,9 +154,7 @@ using other means such as firewalling. Note that when the block chain tip changes, a reorganisation may occur and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. Also note -that no notification occurs if the tip was in the active chain - this -is the case after calling invalidateblock RPC. +retrieve the chain from the last known block to the new tip. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 2fec98c1cd..b6b45ffe78 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -12,10 +12,11 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import dashhash from test_framework.util import ( assert_equal, - connect_nodes, hash256, ) +ADDRESS = "tcp://127.0.0.1:28332" + def dashhash_helper(b): return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii') @@ -47,59 +48,62 @@ class ZMQTest (BitcoinTestFramework): self.skip_if_no_py3_zmq() self.skip_if_no_bitcoind_zmq() - def run_test(self): + def setup_nodes(self): import zmq - self.ctx = zmq.Context() - try: - self.test_basic() - self.test_reorg() - finally: - # Destroy the ZMQ context. - self.log.debug("Destroying ZMQ context") - self.ctx.destroy(linger=None) - def test_basic(self): + # Initialize ZMQ context and socket. # All messages are received in the same socket which means # that this test fails if the publishing order changes. # Note that the publishing order is not defined in the documentation and # is subject to change. - import zmq - address = 'tcp://127.0.0.1:28332' - socket = self.ctx.socket(zmq.SUB) + self.zmq_context = zmq.Context() + socket = self.zmq_context.socket(zmq.SUB) socket.set(zmq.RCVTIMEO, 60000) - socket.connect(address) + socket.connect(ADDRESS) # Subscribe to all available topics. - hashblock = ZMQSubscriber(socket, b"hashblock") - hashtx = ZMQSubscriber(socket, b"hashtx") - rawblock = ZMQSubscriber(socket, b"rawblock") - rawtx = ZMQSubscriber(socket, b"rawtx") + self.hashblock = ZMQSubscriber(socket, b"hashblock") + self.hashtx = ZMQSubscriber(socket, b"hashtx") + self.rawblock = ZMQSubscriber(socket, b"rawblock") + self.rawtx = ZMQSubscriber(socket, b"rawtx") + self.extra_args = [ + ["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]], + [] + ] + self.add_nodes(self.num_nodes, self.extra_args) + self.start_nodes() - self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]]) - connect_nodes(self.nodes[0], 1) + def run_test(self): + try: + self._zmq_test() + finally: + # Destroy the ZMQ context. + self.log.debug("Destroying ZMQ context") + self.zmq_context.destroy(linger=None) + + def _zmq_test(self): num_blocks = 5 self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) - self.sync_all() for x in range(num_blocks): # Should receive the coinbase txid. - txid = hashtx.receive() + txid = self.hashtx.receive() # Should receive the coinbase raw transaction. - hex = rawtx.receive() + hex = self.rawtx.receive() assert_equal(hash256(hex), txid) # Should receive the generated block hash. - hash = hashblock.receive().hex() + hash = self.hashblock.receive().hex() assert_equal(genhashes[x], hash) # The block should only have the coinbase txid. assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) # Should receive the generated raw block. - block = rawblock.receive() + block = self.rawblock.receive() assert_equal(genhashes[x], dashhash_helper(block[:80])) if self.is_wallet_compiled(): @@ -108,47 +112,23 @@ class ZMQTest (BitcoinTestFramework): self.sync_all() # Should receive the broadcasted txid. - txid = hashtx.receive() + txid = self.hashtx.receive() assert_equal(payment_txid, txid.hex()) # Should receive the broadcasted raw transaction. - hex = rawtx.receive() + hex = self.rawtx.receive() assert_equal(payment_txid, hash256(hex).hex()) self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ - {"type": "pubhashblock", "address": address, "hwm": 1000}, - {"type": "pubhashtx", "address": address, "hwm": 1000}, - {"type": "pubrawblock", "address": address, "hwm": 1000}, - {"type": "pubrawtx", "address": address, "hwm": 1000}, + {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, ]) assert_equal(self.nodes[1].getzmqnotifications(), []) - def test_reorg(self): - import zmq - address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - socket.connect(address) - hashblock = ZMQSubscriber(socket, b'hashblock') - - # Should only notify the tip if a reorg occurs - self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)]) - - # Generate 1 block in nodes[0] and receive all notifications - self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) - assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) - - # Generate 2 blocks in nodes[1] - self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE) - - # nodes[0] will reorg chain after connecting back nodes[1] - connect_nodes(self.nodes[0], 1) - - # Should receive nodes[1] tip - assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) - if __name__ == '__main__': ZMQTest().main()