dash/test/functional/interface_zmq.py
Wladimir J. van der Laan 8adbc386a5
Merge #13051: qa: Normalize executable location
fa811b0 qa: Normalize executable location (MarcoFalke)

Pull request description:

  This removes the need to override the executable locations by just reading them from the config file. Beside making the code easier to read, running individual test on Windows is now possible by default (without providing further command line arguments).

  Note: Of course, it is still possible to manually specify the location through the `BITCOIND` environment variable, e.g. `bitcoin-qt`.

Tree-SHA512: bee6d22246796242d747120ca18aaab089f73067de213c9111182561985c5912228a0b0f7f9eec025ecfdb44db031f15652f30d67c489d481c995bb3232a7ac7
2020-07-22 12:12:46 -05:00

125 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface."""
import configparser
import struct
from codecs import encode
from test_framework.mininode import dashhash
from test_framework.test_framework import BitcoinTestFramework, SkipTest
from test_framework.util import (assert_equal,
bytes_to_hex_str,
hash256,
)
def dashhash_helper(b):
return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii')
class ZMQSubscriber:
def __init__(self, socket, topic):
self.sequence = 0
self.socket = socket
self.topic = topic
import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
self.sequence += 1
return body
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
def setup_nodes(self):
# Try to import python3-zmq. Skip this test if the import fails.
try:
import zmq
except ImportError:
raise SkipTest("python3-zmq module not available.")
# Check that dash has been built with ZMQ enabled.
config = configparser.ConfigParser()
config.read_file(open(self.options.configfile))
if not config["components"].getboolean("ENABLE_ZMQ"):
raise SkipTest("dashd has not been built with zmq enabled.")
# Initialize ZMQ context and socket.
# All messages are received in the same socket which means
# that this test fails if the publishing order changes.
# Note that the publishing order is not defined in the documentation and
# is subject to change.
address = "tcp://127.0.0.1:28332"
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
socket.connect(address)
# Subscribe to all available topics.
self.hashblock = ZMQSubscriber(socket, b"hashblock")
self.hashtx = ZMQSubscriber(socket, b"hashtx")
self.rawblock = ZMQSubscriber(socket, b"rawblock")
self.rawtx = ZMQSubscriber(socket, b"rawtx")
self.extra_args = [["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]], []]
self.add_nodes(self.num_nodes, self.extra_args)
self.start_nodes()
def run_test(self):
try:
self._zmq_test()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.zmq_context.destroy(linger=None)
def _zmq_test(self):
num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
genhashes = self.nodes[0].generate(num_blocks)
self.sync_all()
for x in range(num_blocks):
# Should receive the coinbase txid.
txid = self.hashtx.receive()
# Should receive the coinbase raw transaction.
hex = self.rawtx.receive()
assert_equal(hash256(hex), txid)
# Should receive the generated block hash.
hash = bytes_to_hex_str(self.hashblock.receive())
assert_equal(genhashes[x], hash)
# The block should only have the coinbase txid.
assert_equal([bytes_to_hex_str(txid)], self.nodes[1].getblock(hash)["tx"])
# Should receive the generated raw block.
block = self.rawblock.receive()
assert_equal(genhashes[x], dashhash_helper(block[:80]))
self.log.info("Wait for tx from second node")
payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
self.sync_all()
# Should receive the broadcasted txid.
txid = self.hashtx.receive()
assert_equal(payment_txid, bytes_to_hex_str(txid))
# Should receive the broadcasted raw transaction.
hex = self.rawtx.receive()
assert_equal(payment_txid, bytes_to_hex_str(hash256(hex)))
if __name__ == '__main__':
ZMQTest().main()