#!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" import struct from codecs import encode from test_framework.test_framework import ( BitcoinTestFramework, skip_if_no_bitcoind_zmq, skip_if_no_py3_zmq) from test_framework.messages import dashhash from test_framework.util import (assert_equal, hash256, ) ADDRESS = "tcp://127.0.0.1:28332" def dashhash_helper(b): return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii') class ZMQSubscriber: def __init__(self, socket, topic): self.sequence = 0 self.socket = socket self.topic = topic import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. assert_equal(struct.unpack('