#!/usr/bin/env python3 # Copyright (c) 2018-2021 The Dash Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the dash specific ZMQ notification interfaces.""" import configparser from enum import Enum import io import json import random import struct import time try: import zmq finally: pass from test_framework.test_framework import DashTestFramework from test_framework.mininode import P2PInterface from test_framework.util import assert_equal, assert_raises_rpc_error from test_framework.messages import ( CBlock, CGovernanceObject, CGovernanceVote, CInv, COutPoint, CRecoveredSig, CTransaction, FromHex, hash256, msg_clsig, msg_inv, msg_isdlock, msg_tx, ser_string, uint256_from_str, uint256_to_string ) class ZMQPublisher(Enum): hash_chain_lock = "hashchainlock" hash_tx_lock = "hashtxlock" hash_governance_vote = "hashgovernancevote" hash_governance_object = "hashgovernanceobject" hash_instantsend_doublespend = "hashinstantsenddoublespend" hash_recovered_sig = "hashrecoveredsig" raw_chain_lock = "rawchainlock" raw_chain_lock_sig = "rawchainlocksig" raw_tx_lock = "rawtxlock" raw_tx_lock_sig = "rawtxlocksig" raw_governance_vote = "rawgovernancevote" raw_governance_object = "rawgovernanceobject" raw_instantsend_doublespend = "rawinstantsenddoublespend" raw_recovered_sig = "rawrecoveredsig" class TestP2PConn(P2PInterface): def __init__(self): super().__init__() self.islocks = {} self.txes = {} def send_islock(self, islock): hash = uint256_from_str(hash256(islock.serialize())) self.islocks[hash] = islock inv = msg_inv([CInv(30, hash)]) self.send_message(inv) def send_tx(self, tx): hash = uint256_from_str(hash256(tx.serialize())) self.txes[hash] = tx inv = msg_inv([CInv(30, hash)]) self.send_message(inv) def on_getdata(self, message): for inv in message.inv: if inv.hash in self.islocks: self.send_message(self.islocks[inv.hash]) if inv.hash in self.txes: self.send_message(self.txes[inv.hash]) class DashZMQTest (DashTestFramework): def set_test_params(self): # That's where the zmq publisher will listen for subscriber self.address = "tcp://127.0.0.1:28333" # node0 creates all available ZMQ publisher node0_extra_args = ["-zmqpub%s=%s" % (pub.value, self.address) for pub in ZMQPublisher] node0_extra_args.append("-whitelist=127.0.0.1") node0_extra_args.append("-watchquorums") # have to watch quorums to receive recsigs and trigger zmq self.set_dash_test_params(4, 3, fast_dip3_enforcement=True, extra_args=[node0_extra_args, [], [], []]) def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() self.skip_if_no_bitcoind_zmq() self.skip_if_no_wallet() def run_test(self): # Check that dashd has been built with ZMQ enabled. config = configparser.ConfigParser() config.read_file(open(self.options.configfile)) try: # Setup the ZMQ subscriber socket self.zmq_context = zmq.Context() self.socket = self.zmq_context.socket(zmq.SUB) self.socket.connect(self.address) # Initialize the network self.activate_dip8() self.nodes[0].spork("SPORK_17_QUORUM_DKG_ENABLED", 0) self.wait_for_sporks_same() # Create an LLMQ for testing self.quorum_type = 100 # llmq_test self.quorum_hash = self.mine_quorum() self.sync_blocks() self.wait_for_chainlocked_block_all_nodes(self.nodes[0].getbestblockhash()) # Wait a moment to avoid subscribing to recovered sig in the test before the one from the chainlock # has been sent which leads to test failure. time.sleep(1) # Test all dash related ZMQ publisher self.test_recovered_signature_publishers() self.test_chainlock_publishers() self.test_instantsend_publishers() self.test_governance_publishers() self.test_getzmqnotifications() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") self.zmq_context.destroy(linger=None) def subscribe(self, publishers): # Subscribe to a list of ZMQPublishers for pub in publishers: self.socket.subscribe(pub.value) def unsubscribe(self, publishers): # Unsubscribe from a list of ZMQPublishers for pub in publishers: self.socket.unsubscribe(pub.value) def receive(self, publisher, flags=0): # Receive a ZMQ message and validate it's sent from the correct ZMQPublisher topic, body, seq = self.socket.recv_multipart(flags) # Topic should match the publisher value assert_equal(topic.decode(), publisher.value) return io.BytesIO(body) def test_recovered_signature_publishers(self): def validate_recovered_sig(request_id, msg_hash): # Make sure the recovered sig exists by RPC rpc_recovered_sig = self.get_recovered_sig(request_id, msg_hash) # Validate hashrecoveredsig zmq_recovered_sig_hash = self.receive(ZMQPublisher.hash_recovered_sig).read(32).hex() assert_equal(zmq_recovered_sig_hash, msg_hash) # Validate rawrecoveredsig zmq_recovered_sig_raw = CRecoveredSig() zmq_recovered_sig_raw.deserialize(self.receive(ZMQPublisher.raw_recovered_sig)) assert_equal(zmq_recovered_sig_raw.llmqType, rpc_recovered_sig['llmqType']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.quorumHash), rpc_recovered_sig['quorumHash']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.id), rpc_recovered_sig['id']) assert_equal(uint256_to_string(zmq_recovered_sig_raw.msgHash), rpc_recovered_sig['msgHash']) assert_equal(zmq_recovered_sig_raw.sig.hex(), rpc_recovered_sig['sig']) recovered_sig_publishers = [ ZMQPublisher.hash_recovered_sig, ZMQPublisher.raw_recovered_sig ] self.log.info("Testing %d recovered signature publishers" % len(recovered_sig_publishers)) # Subscribe to recovered signature messages self.subscribe(recovered_sig_publishers) # Generate a ChainLock and make sure this leads to valid recovered sig ZMQ messages rpc_last_block_hash = self.nodes[0].generate(1)[0] self.wait_for_chainlocked_block_all_nodes(rpc_last_block_hash) height = self.nodes[0].getblockcount() rpc_request_id = hash256(ser_string(b"clsig") + struct.pack("