#!/usr/bin/env python3 # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Dash P2P network half-a-node. This python code was modified from ArtForz' public domain half-a-node, as found in the mini-node branch of http://github.com/jgarzik/pynode. NodeConn: an object which manages p2p connectivity to a bitcoin node NodeConnCB: a base class that describes the interface for receiving callbacks with network messages from a NodeConn CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives msg_block, msg_tx, msg_headers, etc.: data structures that represent network messages ser_*, deser_*: functions that handle serialization/deserialization """ import asyncore from collections import namedtuple from codecs import encode from collections import defaultdict import copy import hashlib from io import BytesIO import logging import random import socket import struct import sys import time import threading from test_framework.siphash import siphash256 from test_framework.util import hex_str_to_bytes, bytes_to_hex_str, wait_until import dash_hash BIP0031_VERSION = 60000 MY_VERSION = 70214 # MIN_PEER_PROTO_VERSION MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" MY_RELAY = 1 # from version 70001 onwards, fRelay should be appended to version messages (BIP37) MAX_INV_SZ = 50000 MAX_BLOCK_SIZE = 1000000 COIN = 100000000 # 1 btc in satoshis NODE_NETWORK = (1 << 0) NODE_GETUTXO = (1 << 1) NODE_BLOOM = (1 << 2) logger = logging.getLogger("TestFramework.mininode") # Keep our own socket map for asyncore, so that we can track disconnects # ourselves (to workaround an issue with closing an asyncore socket when # using select) mininode_socket_map = dict() # One lock for synchronizing all data access between the networking thread (see # NetworkThread below) and the thread running the test logic. For simplicity, # NodeConn acquires this lock whenever delivering a message to a NodeConnCB, # and whenever adding anything to the send buffer (in send_message()). This # lock should be acquired in the thread running the test logic to synchronize # access to any data shared with the NodeConnCB or NodeConn. mininode_lock = threading.RLock() # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() def hash256(s): return sha256(sha256(s)) def dashhash(s): return dash_hash.getPoWHash(s) def ser_compact_size(l): r = b"" if l < 253: r = struct.pack("B", l) elif l < 0x10000: r = struct.pack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r def ser_vector(l): r = ser_compact_size(len(l)) for i in l: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_string(f) r.append(t) return r def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack("H", f.read(2))[0] def serialize(self): r = b"" r += socket.inet_pton(socket.AF_INET6, self.ip) r += struct.pack(">H", self.port) return r def __repr__(self): return "CService(ip=%s port=%i)" % (self.ip, self.port) class CAddress(object): def __init__(self): self.nServices = 1 self.pchReserved = b"\x00" * 10 + b"\xff" * 2 self.ip = "0.0.0.0" self.port = 0 def deserialize(self, f): self.nServices = struct.unpack("H", f.read(2))[0] def serialize(self): r = b"" r += struct.pack("H", self.port) return r def __repr__(self): return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, self.ip, self.port) class CInv(object): typemap = { 0: "Error", 1: "TX", 2: "Block", 20: "CompactBlock" } def __init__(self, t=0, h=0): self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack("> 16) & 0xffff self.vin = deser_vector(f, CTxIn) self.vout = deser_vector(f, CTxOut) self.nLockTime = struct.unpack(" 21000000 * COIN: return False return True def __repr__(self): return "CTransaction(nVersion=%i vin=%s vout=%s nLockTime=%i)" \ % (self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) class CBlockHeader(object): def __init__(self, header=None): if header is None: self.set_null() else: self.nVersion = header.nVersion self.hashPrevBlock = header.hashPrevBlock self.hashMerkleRoot = header.hashMerkleRoot self.nTime = header.nTime self.nBits = header.nBits self.nNonce = header.nNonce self.sha256 = header.sha256 self.hash = header.hash self.calc_sha256() def set_null(self): self.nVersion = 1 self.hashPrevBlock = 0 self.hashMerkleRoot = 0 self.nTime = 0 self.nBits = 0 self.nNonce = 0 self.sha256 = None self.hash = None def deserialize(self, f): self.nVersion = struct.unpack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i+1, len(hashes)-1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes return uint256_from_str(hashes[0]) def calc_merkle_root(self): hashes = [] for tx in self.vtx: tx.calc_sha256() hashes.append(ser_uint256(tx.sha256)) return self.get_merkle_root(hashes) def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.nBits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hashMerkleRoot: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.nBits) while self.sha256 > target: self.nNonce += 1 self.rehash() def __repr__(self): return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) class PrefilledTransaction(object): def __init__(self, index=0, tx = None): self.index = index self.tx = tx def deserialize(self, f): self.index = deser_compact_size(f) self.tx = CTransaction() self.tx.deserialize(f) def serialize(self): r = b"" r += ser_compact_size(self.index) r += self.tx.serialize() return r def __repr__(self): return "PrefilledTransaction(index=%d, tx=%s)" % (self.index, repr(self.tx)) # This is what we send on the wire, in a cmpctblock message. class P2PHeaderAndShortIDs(object): def __init__(self): self.header = CBlockHeader() self.nonce = 0 self.shortids_length = 0 self.shortids = [] self.prefilled_txn_length = 0 self.prefilled_txn = [] def deserialize(self, f): self.header.deserialize(f) self.nonce = struct.unpack("= 2: self.merkleRootQuorums = deser_uint256(f) def serialize(self): r = b"" r += struct.pack("= 2: r += ser_uint256(self.merkleRootQuorums) return r class CSimplifiedMNListEntry(object): def __init__(self): self.set_null() def set_null(self): self.proRegTxHash = 0 self.confirmedHash = 0 self.service = CService() self.pubKeyOperator = b'\\x0' * 48 self.keyIDVoting = 0 self.isValid = False def deserialize(self, f): self.proRegTxHash = deser_uint256(f) self.confirmedHash = deser_uint256(f) self.service.deserialize(f) self.pubKeyOperator = f.read(48) self.keyIDVoting = f.read(20) self.isValid = struct.unpack("= 106: self.addrFrom = CAddress() self.addrFrom.deserialize(f) self.nNonce = struct.unpack("= 209: self.nStartingHeight = struct.unpack("= 70001: # Relay field is optional for version 70001 onwards try: self.nRelay = struct.unpack(" class msg_headers(object): command = b"headers" def __init__(self, headers=None): self.headers = headers if headers is not None else [] def deserialize(self, f): # comment in dashd indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks) def __repr__(self): return "msg_headers(headers=%s)" % repr(self.headers) class msg_reject(object): command = b"reject" REJECT_MALFORMED = 1 def __init__(self): self.message = b"" self.code = 0 self.reason = b"" self.data = 0 def deserialize(self, f): self.message = deser_string(f) self.code = struct.unpack(" BIP0031_VERSION: conn.send_message(msg_pong(message.nonce)) def on_mnlistdiff(self, conn, message): pass def on_clsig(self, conn, message): pass def on_islock(self, conn, message): pass def on_verack(self, conn, message): conn.ver_recv = conn.ver_send self.verack_received = True def on_version(self, conn, message): if message.nVersion >= 209: conn.send_message(msg_verack()) conn.ver_send = min(MY_VERSION, message.nVersion) if message.nVersion < 209: conn.ver_recv = conn.ver_send conn.nServices = message.nServices # Connection helper methods def add_connection(self, conn): self.connection = conn def wait_for_disconnect(self, timeout=60): test_function = lambda: not self.connected wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message receiving helper methods def wait_for_block(self, blockhash, timeout=60): test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getdata(self, timeout=60): test_function = lambda: self.last_message.get("getdata") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_getheaders(self, timeout=60): test_function = lambda: self.last_message.get("getheaders") wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_inv(self, expected_inv, timeout=60): """Waits for an INV message and checks that the first inv object in the message was as expected.""" if len(expected_inv) > 1: raise NotImplementedError("wait_for_inv() will only verify the first inv object") test_function = lambda: self.last_message.get("inv") and \ self.last_message["inv"].inv[0].type == expected_inv[0].type and \ self.last_message["inv"].inv[0].hash == expected_inv[0].hash wait_until(test_function, timeout=timeout, lock=mininode_lock) def wait_for_verack(self, timeout=60): test_function = lambda: self.message_count["verack"] wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message sending helper functions def send_message(self, message): if self.connection: self.connection.send_message(message) else: logger.error("Cannot send message. No connection to node!") def send_and_ping(self, message): self.send_message(message) self.sync_with_ping() # Sync up with the node def sync_with_ping(self, timeout=60): self.send_message(msg_ping(nonce=self.ping_counter)) test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter wait_until(test_function, timeout=timeout, lock=mininode_lock) self.ping_counter += 1 # The actual NodeConn class # This class provides an interface for a p2p connection to a specified node class NodeConn(asyncore.dispatcher): messagemap = { b"version": msg_version, b"verack": msg_verack, b"addr": msg_addr, b"inv": msg_inv, b"getdata": msg_getdata, b"getblocks": msg_getblocks, b"tx": msg_tx, b"block": msg_block, b"getaddr": msg_getaddr, b"ping": msg_ping, b"pong": msg_pong, b"headers": msg_headers, b"getheaders": msg_getheaders, b"reject": msg_reject, b"mempool": msg_mempool, b"sendheaders": msg_sendheaders, b"sendcmpct": msg_sendcmpct, b"cmpctblock": msg_cmpctblock, b"getblocktxn": msg_getblocktxn, b"blocktxn": msg_blocktxn, b"mnlistdiff": msg_mnlistdiff, b"clsig": msg_clsig, b"islock": msg_islock, b"senddsq": None, b"qsendrecsigs": None, b"spork": None, b"govsync": None, b"qfcommit": None, } MAGIC_BYTES = { "mainnet": b"\xbf\x0c\x6b\xbd", # mainnet "testnet3": b"\xce\xe2\xca\xff", # testnet3 "regtest": b"\xfc\xc1\xb7\xdc", # regtest "devnet": b"\xe2\xca\xff\xce", # devnet } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest", services=NODE_NETWORK, send_version=True): asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.dstaddr = dstaddr self.dstport = dstport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.sendbuf = b"" self.recvbuf = b"" self.ver_send = 209 self.ver_recv = 209 self.last_sent = 0 self.state = "connecting" self.network = net self.cb = callback self.disconnect = False self.nServices = 0 if send_version: # stuff version msg into sendbuf vt = msg_version() vt.nServices = services vt.addrTo.ip = self.dstaddr vt.addrTo.port = self.dstport vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 self.send_message(vt, True) logger.info('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport)) try: self.connect((dstaddr, dstport)) except: self.handle_close() self.rpc = rpc def handle_connect(self): if self.state != "connected": logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) self.state = "connected" self.cb.on_open(self) def handle_close(self): logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport)) self.state = "closed" self.recvbuf = b"" self.sendbuf = b"" try: self.close() except: pass self.cb.on_close(self) def handle_read(self): t = self.recv(8192) if len(t) > 0: self.recvbuf += t self.got_data() def readable(self): return True def writable(self): with mininode_lock: pre_connection = self.state == "connecting" length = len(self.sendbuf) return (length > 0 or pre_connection) def handle_write(self): with mininode_lock: # asyncore does not expose socket connection, only the first read/write # event, thus we must check connection manually here to know when we # actually connect if self.state == "connecting": self.handle_connect() if not self.writable(): return try: sent = self.send(self.sendbuf) except: self.handle_close() return self.sendbuf = self.sendbuf[sent:] def got_data(self): try: while True: if len(self.recvbuf) < 4: return if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: raise ValueError("got garbage %s" % repr(self.recvbuf)) if self.ver_recv < 209: if len(self.recvbuf) < 4 + 12 + 4: return command = self.recvbuf[4:4+12].split(b"\x00", 1)[0] msglen = struct.unpack("= 209: th = sha256(data) h = sha256(th) tmsg += h[:4] tmsg += data with mininode_lock: self.sendbuf += tmsg self.last_sent = time.time() def got_message(self, message): if message.command == b"version": if message.nVersion <= BIP0031_VERSION: self.messagemap[b'ping'] = msg_ping_prebip31 if self.last_sent + 30 * 60 < time.time(): self.send_message(self.messagemap[b'ping']()) self._log_message("receive", message) self.cb.deliver(self, message) def _log_message(self, direction, msg): if direction == "send": log_message = "Send message to " elif direction == "receive": log_message = "Received message from " log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500]) if len(log_message) > 500: log_message += "... (msg truncated)" logger.debug(log_message) def disconnect_node(self): self.disconnect = True class NetworkThread(threading.Thread): def __init__(self): super().__init__(name="NetworkThread") def run(self): while mininode_socket_map: # We check for whether to disconnect outside of the asyncore # loop to workaround the behavior of asyncore when using # select disconnected = [] for fd, obj in mininode_socket_map.items(): if obj.disconnect: disconnected.append(obj) [ obj.handle_close() for obj in disconnected ] asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) def network_thread_start(): """Start the network thread.""" # Only one network thread may run at a time assert not network_thread_running() NetworkThread().start() def network_thread_running(): """Return whether the network thread is running.""" return any([thread.name == "NetworkThread" for thread in threading.enumerate()]) def network_thread_join(timeout=10): """Wait timeout seconds for the network thread to terminate. Throw if the network thread doesn't terminate in timeout seconds.""" network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"] assert len(network_threads) <= 1 for thread in network_threads: thread.join(timeout) assert not thread.is_alive() # An exception we can raise if we detect a potential disconnect # (p2p or rpc) before the test is complete class EarlyDisconnectError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value)