#!/usr/bin/env python3 # Copyright (c) 2018-2024 The Dash Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the dash specific ZMQ notification interfaces.""" import configparser from enum import Enum import io import json import random import struct import time from test_framework.test_framework import DashTestFramework from test_framework.p2p import P2PInterface from test_framework.util import assert_equal, assert_raises_rpc_error from test_framework.messages import ( CBlock, CGovernanceObject, CGovernanceVote, CInv, COutPoint, CRecoveredSig, CTransaction, from_hex, hash256, msg_clsig, msg_inv, msg_isdlock, msg_tx, MSG_TX, MSG_TYPE_MASK, ser_string, uint256_from_str, uint256_to_string ) class ZMQPublisher(Enum): hash_chain_lock = "hashchainlock" hash_tx_lock = "hashtxlock" hash_governance_vote = "hashgovernancevote" hash_governance_object = "hashgovernanceobject" hash_instantsend_doublespend = "hashinstantsenddoublespend" hash_recovered_sig = "hashrecoveredsig" raw_chain_lock = "rawchainlock" raw_chain_lock_sig = "rawchainlocksig" raw_tx_lock = "rawtxlock" raw_tx_lock_sig = "rawtxlocksig" raw_governance_vote = "rawgovernancevote" raw_governance_object = "rawgovernanceobject" raw_instantsend_doublespend = "rawinstantsenddoublespend" raw_recovered_sig = "rawrecoveredsig" class ZMQSubscriber: def __init__(self, socket, topic): self.socket = socket self.topic = topic import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self, flags=0): topic, body, seq = self.socket.recv_multipart(flags) # Topic should match the subscriber topic. assert_equal(topic, self.topic) return io.BytesIO(body) class TestP2PConn(P2PInterface): def __init__(self): super().__init__() self.islocks = {} self.txes = {} def send_islock(self, islock): hash = uint256_from_str(hash256(islock.serialize())) self.islocks[hash] = islock inv = msg_inv([CInv(31, hash)]) self.send_message(inv) def send_tx(self, tx): hash = uint256_from_str(hash256(tx.serialize())) self.txes[hash] = tx inv = msg_inv([CInv(MSG_TX, hash)]) self.send_message(inv) def on_getdata(self, message): for inv in message.inv: if ((inv.type & MSG_TYPE_MASK) == 30 or (inv.type & MSG_TYPE_MASK) == 31) and inv.hash in self.islocks: self.send_message(self.islocks[inv.hash]) if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.txes: self.send_message(self.txes[inv.hash]) class DashZMQTest (DashTestFramework): def set_test_params(self): # That's where the zmq publisher will listen for subscriber self.address = "tcp://127.0.0.1:28331" # node0 creates all available ZMQ publisher node0_extra_args = ["-zmqpub%s=%s" % (pub.value, self.address) for pub in ZMQPublisher] node0_extra_args.append("-whitelist=127.0.0.1") node0_extra_args.append("-watchquorums") # have to watch quorums to receive recsigs and trigger zmq extra_args = [[]] * 5 extra_args[0] = node0_extra_args self.set_dash_test_params(5, 4, fast_dip3_enforcement=True, extra_args=extra_args) self.set_dash_llmq_test_params(4, 4) def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() self.skip_if_no_bitcoind_zmq() self.skip_if_no_wallet() def run_test(self): self.subscribers = {} # Check that dashd has been built with ZMQ enabled. config = configparser.ConfigParser() config.read_file(open(self.options.configfile)) import zmq try: # Setup the ZMQ subscriber context self.zmq_context = zmq.Context() # Initialize the network self.activate_dip8() self.nodes[0].sporkupdate("SPORK_17_QUORUM_DKG_ENABLED", 0) self.wait_for_sporks_same() self.activate_v19(expected_activation_height=900) self.log.info("Activated v19 at height:" + str(self.nodes[0].getblockcount())) self.move_to_next_cycle() self.log.info("Cycle H height:" + str(self.nodes[0].getblockcount())) self.move_to_next_cycle() self.log.info("Cycle H+C height:" + str(self.nodes[0].getblockcount())) self.move_to_next_cycle() self.log.info("Cycle H+2C height:" + str(self.nodes[0].getblockcount())) self.mine_cycle_quorum(llmq_type_name='llmq_test_dip0024', llmq_type=103) self.sync_blocks() self.wait_for_chainlocked_block_all_nodes(self.nodes[0].getbestblockhash()) # Wait a moment to avoid subscribing to recovered sig in the test before the one from the chainlock # has been sent which leads to test failure. time.sleep(1) # Test all dash related ZMQ publisher #self.test_recovered_signature_publishers() self.test_chainlock_publishers() self.test_governance_publishers() self.test_getzmqnotifications() self.test_instantsend_publishers() self.wait_for_chainlocked_block_all_nodes(self.nodes[0].getbestblockhash()) self.test_instantsend_publishers() # At this point, we need to move forward 3 cycles (3 x 24 blocks) so the first 3 quarters can be created (without DKG sessions) self.move_to_next_cycle() self.test_instantsend_publishers() self.move_to_next_cycle() self.test_instantsend_publishers() self.move_to_next_cycle() self.test_instantsend_publishers() self.mine_cycle_quorum() self.test_instantsend_publishers() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") self.zmq_context.destroy(linger=None) def generate_blocks(self, num_blocks): mninfos_online = self.mninfo.copy() nodes = [self.nodes[0]] + [mn.node for mn in mninfos_online] self.nodes[0].generate(num_blocks) self.sync_blocks(nodes) def subscribe(self, publishers): import zmq # Setup the ZMQ subscriber socket socket = self.zmq_context.socket(zmq.SUB) socket.set(zmq.RCVTIMEO, 60000) socket.connect(self.address) # Subscribe to a list of ZMQPublishers for pub in publishers: self.subscribers[pub] = ZMQSubscriber(socket, pub.value.encode()) def unsubscribe(self, publishers): # Unsubscribe from a list of ZMQPublishers for pub in publishers: del self.subscribers[pub] def test_recovered_signature_publishers(self): def validate_recovered_sig(request_id, msg_hash): # Make sure the recovered sig exists by RPC self.wait_for_recovered_sig(request_id, msg_hash) rpc_recovered_sig = self.mninfo[0].node.quorum('getrecsig', 103, request_id, msg_hash) # Validate hashrecoveredsig zmq_recovered_sig_hash = self.subscribers[ZMQPublisher.hash_recovered_sig].receive().read(32).hex() assert_equal(zmq_recovered_sig_hash, msg_hash) # Validate rawrecoveredsig zmq_recovered_sig_raw = CRecoveredSig() zmq_recovered_sig_raw.deserialize(self.subscribers[ZMQPublisher.raw_recovered_sig].receive()) assert_equal(zmq_recovered_sig_raw.llmqType, rpc_recovered_sig['llmqType']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.quorumHash), rpc_recovered_sig['quorumHash']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.id), rpc_recovered_sig['id']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.msgHash), rpc_recovered_sig['msgHash']) assert_equal(zmq_recovered_sig_raw.sig.hex(), rpc_recovered_sig['sig']) recovered_sig_publishers = [ ZMQPublisher.hash_recovered_sig, ZMQPublisher.raw_recovered_sig ] self.log.info("Testing %d recovered signature publishers" % len(recovered_sig_publishers)) # Subscribe to recovered signature messages self.subscribe(recovered_sig_publishers) # Generate a ChainLock and make sure this leads to valid recovered sig ZMQ messages rpc_last_block_hash = self.nodes[0].generate(1)[0] self.wait_for_chainlocked_block_all_nodes(rpc_last_block_hash) height = self.nodes[0].getblockcount() rpc_request_id = hash256(ser_string(b"clsig") + struct.pack("