mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
* [tests] Remove mininode periodic (half-hour) ping messages * [tests] Tidy up mininode Add docstrings and renames some methods. Also removes the redundant NodeConn.readable() method override. * [tests] Move only: move NodeConnCB below NodeConn This is required since NodeConnCB will inherit from NodeConn after the next commit. * [tests] Make NodeConnCB a subclass of NodeConn This makes NodeConnCB a subclass of NodeConn, and removes the need for the client code to know anything about the implementation details of NodeConnCB. NodeConn can now be swapped out for any other implementation of a low-level connection without changing client code. * [tests] Move version message sending from NodeConn to NodeConnCB This commit moves the logic that sends a version message on connection from NodeConn to NodeConnCB. NodeConn should not be aware of the semantics or meaning of the P2P payloads. * remove witness Signed-off-by: Pasta <pasta@dashboost.org> * Fix 11712 Co-authored-by: John Newbery <john@johnnewbery.com> Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
This commit is contained in:
parent
72bf8cc3a0
commit
52bf616cc4
@ -69,7 +69,7 @@ class AssumeValidTest(BitcoinTestFramework):
|
||||
def send_blocks_until_disconnected(self, p2p_conn):
|
||||
"""Keep sending blocks to the node until we're disconnected."""
|
||||
for i in range(len(self.blocks)):
|
||||
if not p2p_conn.connection:
|
||||
if p2p_conn.state != "connected":
|
||||
break
|
||||
try:
|
||||
p2p_conn.send_message(msg_block(self.blocks[i]))
|
||||
|
@ -20,7 +20,7 @@ class TestNode(NodeConnCB):
|
||||
super().__init__()
|
||||
self.last_mnlistdiff = None
|
||||
|
||||
def on_mnlistdiff(self, conn, message):
|
||||
def on_mnlistdiff(self, message):
|
||||
self.last_mnlistdiff = message
|
||||
|
||||
def wait_for_mnlistdiff(self, timeout=30):
|
||||
|
@ -50,14 +50,14 @@ class BaseNode(NodeConnCB):
|
||||
# Stores a dictionary of all blocks received
|
||||
self.block_receive_map = defaultdict(int)
|
||||
|
||||
def on_block(self, conn, message):
|
||||
def on_block(self, message):
|
||||
"""Override the standard on_block callback
|
||||
|
||||
Store the hash of a received block in the dictionary."""
|
||||
message.block.calc_sha256()
|
||||
self.block_receive_map[message.block.sha256] += 1
|
||||
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
"""Override the standard on_inv callback"""
|
||||
pass
|
||||
|
||||
|
@ -38,7 +38,7 @@ class TestNode(NodeConnCB):
|
||||
inv = msg_inv([CInv(30, hash)])
|
||||
self.send_message(inv)
|
||||
|
||||
def on_getdata(self, conn, message):
|
||||
def on_getdata(self, message):
|
||||
for inv in message.inv:
|
||||
if inv.hash in self.clsigs:
|
||||
self.send_message(self.clsigs[inv.hash])
|
||||
|
@ -22,10 +22,10 @@ class TestNode(NodeConnCB):
|
||||
super().__init__()
|
||||
self.block_receive_map = defaultdict(int)
|
||||
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
pass
|
||||
|
||||
def on_block(self, conn, message):
|
||||
def on_block(self, message):
|
||||
message.block.calc_sha256()
|
||||
self.block_receive_map[message.block.sha256] += 1
|
||||
|
||||
|
@ -9,7 +9,7 @@ from test_framework.mininode import *
|
||||
class BaseNode(NodeConnCB):
|
||||
nServices = 0
|
||||
firstAddrnServices = 0
|
||||
def on_version(self, conn, message):
|
||||
def on_version(self, message):
|
||||
self.nServices = message.nServices
|
||||
|
||||
class NodeNetworkLimitedTest(BitcoinTestFramework):
|
||||
|
@ -22,21 +22,21 @@ class TestNode(NodeConnCB):
|
||||
# so we can eg wait until a particular block is announced.
|
||||
self.announced_blockhashes = set()
|
||||
|
||||
def on_sendcmpct(self, conn, message):
|
||||
def on_sendcmpct(self, message):
|
||||
self.last_sendcmpct.append(message)
|
||||
|
||||
def on_cmpctblock(self, conn, message):
|
||||
def on_cmpctblock(self, message):
|
||||
self.block_announced = True
|
||||
self.last_message["cmpctblock"].header_and_shortids.header.calc_sha256()
|
||||
self.announced_blockhashes.add(self.last_message["cmpctblock"].header_and_shortids.header.sha256)
|
||||
|
||||
def on_headers(self, conn, message):
|
||||
def on_headers(self, message):
|
||||
self.block_announced = True
|
||||
for x in self.last_message["headers"].headers:
|
||||
x.calc_sha256()
|
||||
self.announced_blockhashes.add(x.sha256)
|
||||
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
for x in self.last_message["inv"].inv:
|
||||
if x.type == 2:
|
||||
self.block_announced = True
|
||||
@ -57,7 +57,7 @@ class TestNode(NodeConnCB):
|
||||
msg = msg_getheaders()
|
||||
msg.locator.vHave = locator
|
||||
msg.hashstop = hashstop
|
||||
self.connection.send_message(msg)
|
||||
self.send_message(msg)
|
||||
|
||||
def send_header_for_blocks(self, new_blocks):
|
||||
headers_message = msg_headers()
|
||||
@ -83,7 +83,7 @@ class TestNode(NodeConnCB):
|
||||
This is used when we want to send a message into the node that we expect
|
||||
will get us disconnected, eg an invalid block."""
|
||||
self.send_message(message)
|
||||
wait_until(lambda: not self.connected, timeout=timeout, lock=mininode_lock)
|
||||
wait_until(lambda: self.state != "connected", timeout=timeout, lock=mininode_lock)
|
||||
|
||||
class CompactBlocksTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
|
@ -27,42 +27,41 @@ class CLazyNode(NodeConnCB):
|
||||
self.unexpected_msg = True
|
||||
self.log.info("should not have received message: %s" % message.command)
|
||||
|
||||
def on_open(self, conn):
|
||||
self.connected = True
|
||||
def on_open(self):
|
||||
self.ever_connected = True
|
||||
|
||||
def on_version(self, conn, message): self.bad_message(message)
|
||||
def on_verack(self, conn, message): self.bad_message(message)
|
||||
def on_reject(self, conn, message): self.bad_message(message)
|
||||
def on_inv(self, conn, message): self.bad_message(message)
|
||||
def on_addr(self, conn, message): self.bad_message(message)
|
||||
def on_getdata(self, conn, message): self.bad_message(message)
|
||||
def on_getblocks(self, conn, message): self.bad_message(message)
|
||||
def on_tx(self, conn, message): self.bad_message(message)
|
||||
def on_block(self, conn, message): self.bad_message(message)
|
||||
def on_getaddr(self, conn, message): self.bad_message(message)
|
||||
def on_headers(self, conn, message): self.bad_message(message)
|
||||
def on_getheaders(self, conn, message): self.bad_message(message)
|
||||
def on_ping(self, conn, message): self.bad_message(message)
|
||||
def on_mempool(self, conn): self.bad_message(message)
|
||||
def on_pong(self, conn, message): self.bad_message(message)
|
||||
def on_sendheaders(self, conn, message): self.bad_message(message)
|
||||
def on_sendcmpct(self, conn, message): self.bad_message(message)
|
||||
def on_cmpctblock(self, conn, message): self.bad_message(message)
|
||||
def on_getblocktxn(self, conn, message): self.bad_message(message)
|
||||
def on_blocktxn(self, conn, message): self.bad_message(message)
|
||||
def on_version(self, message): self.bad_message(message)
|
||||
def on_verack(self, message): self.bad_message(message)
|
||||
def on_reject(self, message): self.bad_message(message)
|
||||
def on_inv(self, message): self.bad_message(message)
|
||||
def on_addr(self, message): self.bad_message(message)
|
||||
def on_getdata(self, message): self.bad_message(message)
|
||||
def on_getblocks(self, message): self.bad_message(message)
|
||||
def on_tx(self, message): self.bad_message(message)
|
||||
def on_block(self, message): self.bad_message(message)
|
||||
def on_getaddr(self, message): self.bad_message(message)
|
||||
def on_headers(self, message): self.bad_message(message)
|
||||
def on_getheaders(self, message): self.bad_message(message)
|
||||
def on_ping(self, message): self.bad_message(message)
|
||||
def on_mempool(self, message): self.bad_message(message)
|
||||
def on_pong(self, message): self.bad_message(message)
|
||||
def on_sendheaders(self, message): self.bad_message(message)
|
||||
def on_sendcmpct(self, message): self.bad_message(message)
|
||||
def on_cmpctblock(self, message): self.bad_message(message)
|
||||
def on_getblocktxn(self, message): self.bad_message(message)
|
||||
def on_blocktxn(self, message): self.bad_message(message)
|
||||
|
||||
# Node that never sends a version. We'll use this to send a bunch of messages
|
||||
# anyway, and eventually get disconnected.
|
||||
class CNodeNoVersionBan(CLazyNode):
|
||||
# send a bunch of veracks without sending a message. This should get us disconnected.
|
||||
# NOTE: implementation-specific check here. Remove if bitcoind ban behavior changes
|
||||
def on_open(self, conn):
|
||||
super().on_open(conn)
|
||||
def on_open(self):
|
||||
super().on_open()
|
||||
for i in range(banscore):
|
||||
self.send_message(msg_verack())
|
||||
|
||||
def on_reject(self, conn, message): pass
|
||||
def on_reject(self, message): pass
|
||||
|
||||
# Node that never sends a version. This one just sits idle and hopes to receive
|
||||
# any message (it shouldn't!)
|
||||
@ -76,15 +75,15 @@ class CNodeNoVerackIdle(CLazyNode):
|
||||
self.version_received = False
|
||||
super().__init__()
|
||||
|
||||
def on_reject(self, conn, message): pass
|
||||
def on_verack(self, conn, message): pass
|
||||
def on_reject(self, message): pass
|
||||
def on_verack(self, message): pass
|
||||
# When version is received, don't reply with a verack. Instead, see if the
|
||||
# node will give us a message that it shouldn't. This is not an exhaustive
|
||||
# list!
|
||||
def on_version(self, conn, message):
|
||||
def on_version(self, message):
|
||||
self.version_received = True
|
||||
conn.send_message(msg_ping())
|
||||
conn.send_message(msg_getaddr())
|
||||
self.send_message(msg_ping())
|
||||
self.send_message(msg_getaddr())
|
||||
|
||||
class P2PLeakTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
@ -110,7 +109,7 @@ class P2PLeakTest(BitcoinTestFramework):
|
||||
time.sleep(5)
|
||||
|
||||
#This node should have been banned
|
||||
assert not no_version_bannode.connected
|
||||
assert no_version_bannode.state != "connected"
|
||||
|
||||
self.nodes[0].disconnect_p2ps()
|
||||
|
||||
|
@ -28,7 +28,7 @@ from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import *
|
||||
|
||||
class TestNode(NodeConnCB):
|
||||
def on_version(self, conn, message):
|
||||
def on_version(self, message):
|
||||
# Don't send a verack in response
|
||||
pass
|
||||
|
||||
|
@ -24,7 +24,7 @@ WARN_UNKNOWN_RULES_ACTIVE = "unknown new rules activated (versionbit {})".format
|
||||
VB_PATTERN = re.compile("^Warning.*versionbit")
|
||||
|
||||
class TestNode(NodeConnCB):
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
pass
|
||||
|
||||
class VersionBitsWarningTest(BitcoinTestFramework):
|
||||
|
@ -112,6 +112,7 @@ DIRECT_FETCH_RESPONSE_TIME = 0.05
|
||||
class BaseNode(NodeConnCB):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.block_announced = False
|
||||
self.last_blockhash_announced = None
|
||||
|
||||
@ -120,18 +121,18 @@ class BaseNode(NodeConnCB):
|
||||
msg = msg_getdata()
|
||||
for x in block_hashes:
|
||||
msg.inv.append(CInv(2, x))
|
||||
self.connection.send_message(msg)
|
||||
self.send_message(msg)
|
||||
|
||||
def send_get_headers(self, locator, hashstop):
|
||||
msg = msg_getheaders()
|
||||
msg.locator.vHave = locator
|
||||
msg.hashstop = hashstop
|
||||
self.connection.send_message(msg)
|
||||
self.send_message(msg)
|
||||
|
||||
def send_block_inv(self, blockhash):
|
||||
msg = msg_inv()
|
||||
msg.inv = [CInv(2, blockhash)]
|
||||
self.connection.send_message(msg)
|
||||
self.send_message(msg)
|
||||
|
||||
def send_header_for_blocks(self, new_blocks):
|
||||
headers_message = msg_headers()
|
||||
@ -154,11 +155,11 @@ class BaseNode(NodeConnCB):
|
||||
test_function = lambda: self.last_blockhash_announced == block_hash
|
||||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
self.block_announced = True
|
||||
self.last_blockhash_announced = message.inv[-1].hash
|
||||
|
||||
def on_headers(self, conn, message):
|
||||
def on_headers(self, message):
|
||||
if len(message.headers):
|
||||
self.block_announced = True
|
||||
message.headers[-1].calc_sha256()
|
||||
|
@ -22,10 +22,10 @@ import logging
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
|
||||
from test_framework.messages import *
|
||||
from test_framework.util import wait_until
|
||||
|
||||
MSG_TX = 1
|
||||
MSG_BLOCK = 2
|
||||
@ -75,15 +75,210 @@ MAGIC_BYTES = {
|
||||
"devnet": b"\xe2\xca\xff\xce", # devnet
|
||||
}
|
||||
|
||||
class NodeConnCB():
|
||||
"""Callback and helper functions for P2P connection to a bitcoind node.
|
||||
class NodeConn(asyncore.dispatcher):
|
||||
"""A low-level connection object to a node's P2P interface.
|
||||
|
||||
This class is responsible for:
|
||||
|
||||
- opening and closing the TCP connection to the node
|
||||
- reading bytes from and writing bytes to the socket
|
||||
- deserializing and serializing the P2P message header
|
||||
- logging messages as they are sent and received
|
||||
|
||||
This class contains no logic for handing the P2P message payloads. It must be
|
||||
sub-classed and the on_message() callback overridden.
|
||||
|
||||
TODO: rename this class P2PConnection."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(map=mininode_socket_map)
|
||||
|
||||
def peer_connect(self, dstaddr, dstport, net="regtest", devnet_name=None):
|
||||
self.dstaddr = dstaddr
|
||||
self.dstport = dstport
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
self.sendbuf = b""
|
||||
self.recvbuf = b""
|
||||
self.state = "connecting"
|
||||
self.network = net
|
||||
self.devnet_name = devnet_name
|
||||
self.disconnect = False
|
||||
|
||||
logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport))
|
||||
|
||||
try:
|
||||
self.connect((dstaddr, dstport))
|
||||
except:
|
||||
self.handle_close()
|
||||
|
||||
def peer_disconnect(self):
|
||||
# Connection could have already been closed by other end.
|
||||
if self.state == "connected":
|
||||
self.disconnect_node()
|
||||
|
||||
# Connection and disconnection methods
|
||||
|
||||
def handle_connect(self):
|
||||
"""asyncore callback when a connection is opened."""
|
||||
if self.state != "connected":
|
||||
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
|
||||
self.state = "connected"
|
||||
self.on_open()
|
||||
|
||||
def handle_close(self):
|
||||
"""asyncore callback when a connection is closed."""
|
||||
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
|
||||
self.state = "closed"
|
||||
self.recvbuf = b""
|
||||
self.sendbuf = b""
|
||||
try:
|
||||
self.close()
|
||||
except:
|
||||
pass
|
||||
self.on_close()
|
||||
|
||||
def disconnect_node(self):
|
||||
"""Disconnect the p2p connection.
|
||||
|
||||
Called by the test logic thread. Causes the p2p connection
|
||||
to be disconnected on the next iteration of the asyncore loop."""
|
||||
self.disconnect = True
|
||||
|
||||
# Socket read methods
|
||||
|
||||
def handle_read(self):
|
||||
"""asyncore callback when data is read from the socket."""
|
||||
t = self.recv(8192)
|
||||
if len(t) > 0:
|
||||
self.recvbuf += t
|
||||
self._on_data()
|
||||
|
||||
def _on_data(self):
|
||||
"""Try to read P2P messages from the recv buffer.
|
||||
|
||||
This method reads data from the buffer in a loop. It deserializes,
|
||||
parses and verifies the P2P header, then passes the P2P payload to
|
||||
the on_message callback for processing."""
|
||||
try:
|
||||
while True:
|
||||
if len(self.recvbuf) < 4:
|
||||
return
|
||||
if self.recvbuf[:4] != MAGIC_BYTES[self.network]:
|
||||
raise ValueError("got garbage %s" % repr(self.recvbuf))
|
||||
if len(self.recvbuf) < 4 + 12 + 4 + 4:
|
||||
return
|
||||
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
|
||||
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
|
||||
checksum = self.recvbuf[4+12+4:4+12+4+4]
|
||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
|
||||
return
|
||||
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
|
||||
th = sha256(msg)
|
||||
h = sha256(th)
|
||||
if checksum != h[:4]:
|
||||
raise ValueError("got bad checksum " + repr(self.recvbuf))
|
||||
self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
|
||||
if command not in MESSAGEMAP:
|
||||
raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg)))
|
||||
if MESSAGEMAP[command] is None:
|
||||
# Command is known but we don't want/need to handle it
|
||||
continue
|
||||
f = BytesIO(msg)
|
||||
t = MESSAGEMAP[command]()
|
||||
t.deserialize(f)
|
||||
self._log_message("receive", t)
|
||||
self.on_message(t)
|
||||
except Exception as e:
|
||||
logger.exception('Error reading message:', repr(e))
|
||||
raise
|
||||
|
||||
def on_message(self, message):
|
||||
"""Callback for processing a P2P payload. Must be overridden by derived class."""
|
||||
raise NotImplementedError
|
||||
|
||||
# Socket write methods
|
||||
|
||||
def writable(self):
|
||||
"""asyncore method to determine whether the handle_write() callback should be called on the next loop."""
|
||||
with mininode_lock:
|
||||
pre_connection = self.state == "connecting"
|
||||
length = len(self.sendbuf)
|
||||
return (length > 0 or pre_connection)
|
||||
|
||||
def handle_write(self):
|
||||
"""asyncore callback when data should be written to the socket."""
|
||||
with mininode_lock:
|
||||
# asyncore does not expose socket connection, only the first read/write
|
||||
# event, thus we must check connection manually here to know when we
|
||||
# actually connect
|
||||
if self.state == "connecting":
|
||||
self.handle_connect()
|
||||
if not self.writable():
|
||||
return
|
||||
|
||||
try:
|
||||
sent = self.send(self.sendbuf)
|
||||
except:
|
||||
self.handle_close()
|
||||
return
|
||||
self.sendbuf = self.sendbuf[sent:]
|
||||
|
||||
def send_message(self, message, pushbuf=False):
|
||||
"""Send a P2P message over the socket.
|
||||
|
||||
This method takes a P2P payload, builds the P2P header and adds
|
||||
the message to the send buffer to be sent over the socket."""
|
||||
if self.state != "connected" and not pushbuf:
|
||||
raise IOError('Not connected, no pushbuf')
|
||||
self._log_message("send", message)
|
||||
command = message.command
|
||||
data = message.serialize()
|
||||
tmsg = MAGIC_BYTES[self.network]
|
||||
tmsg += command
|
||||
tmsg += b"\x00" * (12 - len(command))
|
||||
tmsg += struct.pack("<I", len(data))
|
||||
th = sha256(data)
|
||||
h = sha256(th)
|
||||
tmsg += h[:4]
|
||||
tmsg += data
|
||||
with mininode_lock:
|
||||
if (len(self.sendbuf) == 0 and not pushbuf):
|
||||
try:
|
||||
sent = self.send(tmsg)
|
||||
self.sendbuf = tmsg[sent:]
|
||||
except BlockingIOError:
|
||||
self.sendbuf = tmsg
|
||||
else:
|
||||
self.sendbuf += tmsg
|
||||
|
||||
# Class utility methods
|
||||
|
||||
def _log_message(self, direction, msg):
|
||||
"""Logs a message being sent or received over the connection."""
|
||||
if direction == "send":
|
||||
log_message = "Send message to "
|
||||
elif direction == "receive":
|
||||
log_message = "Received message from "
|
||||
log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500])
|
||||
if len(log_message) > 500:
|
||||
log_message += "... (msg truncated)"
|
||||
logger.debug(log_message)
|
||||
|
||||
|
||||
class NodeConnCB(NodeConn):
|
||||
"""A high-level P2P interface class for communicating with a Bitcoin node.
|
||||
|
||||
This class provides high-level callbacks for processing P2P message
|
||||
payloads, as well as convenience methods for interacting with the
|
||||
node over P2P.
|
||||
|
||||
Individual testcases should subclass this and override the on_* methods
|
||||
if they want to alter message handling behaviour."""
|
||||
if they want to alter message handling behaviour.
|
||||
|
||||
TODO: rename this class P2PInterface"""
|
||||
def __init__(self):
|
||||
# Track whether we have a P2P connection open to the node
|
||||
self.connected = False
|
||||
self.connection = None
|
||||
super().__init__()
|
||||
|
||||
# Track number of messages of each type received and the most recent
|
||||
# message of each type
|
||||
@ -93,9 +288,27 @@ class NodeConnCB():
|
||||
# A count of the number of ping messages we've sent to the node
|
||||
self.ping_counter = 1
|
||||
|
||||
# The network services received from the peer
|
||||
self.nServices = 0
|
||||
|
||||
def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs):
|
||||
super().peer_connect(*args, **kwargs)
|
||||
|
||||
if send_version:
|
||||
# Send a version msg
|
||||
vt = msg_version()
|
||||
vt.nServices = services
|
||||
vt.addrTo.ip = self.dstaddr
|
||||
vt.addrTo.port = self.dstport
|
||||
vt.addrFrom.ip = "0.0.0.0"
|
||||
vt.addrFrom.port = 0
|
||||
if self.network == "devnet" and self.devnet_name is not None:
|
||||
vt.strSubVer = MY_SUBVERSION_DEVNET % self.devnet_name.encode()
|
||||
self.send_message(vt, True)
|
||||
|
||||
# Message receiving methods
|
||||
|
||||
def deliver(self, conn, message):
|
||||
def on_message(self, message):
|
||||
"""Receive message and dispatch message to appropriate callback.
|
||||
|
||||
We keep a count of how many of each message type has been received
|
||||
@ -105,70 +318,65 @@ class NodeConnCB():
|
||||
command = message.command.decode('ascii')
|
||||
self.message_count[command] += 1
|
||||
self.last_message[command] = message
|
||||
getattr(self, 'on_' + command)(conn, message)
|
||||
getattr(self, 'on_' + command)(message)
|
||||
except:
|
||||
print("ERROR delivering %s (%s)" % (repr(message),
|
||||
sys.exc_info()[0]))
|
||||
print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0]))
|
||||
raise
|
||||
|
||||
# Callback methods. Can be overridden by subclasses in individual test
|
||||
# cases to provide custom message handling behaviour.
|
||||
|
||||
def on_open(self, conn):
|
||||
self.connected = True
|
||||
def on_open(self):
|
||||
pass
|
||||
|
||||
def on_close(self, conn):
|
||||
self.connected = False
|
||||
self.connection = None
|
||||
def on_close(self):
|
||||
pass
|
||||
|
||||
def on_addr(self, conn, message): pass
|
||||
def on_block(self, conn, message): pass
|
||||
def on_blocktxn(self, conn, message): pass
|
||||
def on_cmpctblock(self, conn, message): pass
|
||||
def on_feefilter(self, conn, message): pass
|
||||
def on_getaddr(self, conn, message): pass
|
||||
def on_getblocks(self, conn, message): pass
|
||||
def on_getblocktxn(self, conn, message): pass
|
||||
def on_getdata(self, conn, message): pass
|
||||
def on_getheaders(self, conn, message): pass
|
||||
def on_headers(self, conn, message): pass
|
||||
def on_mempool(self, conn): pass
|
||||
def on_pong(self, conn, message): pass
|
||||
def on_reject(self, conn, message): pass
|
||||
def on_sendcmpct(self, conn, message): pass
|
||||
def on_sendheaders(self, conn, message): pass
|
||||
def on_tx(self, conn, message): pass
|
||||
def on_addr(self, message): pass
|
||||
def on_block(self, message): pass
|
||||
def on_blocktxn(self, message): pass
|
||||
def on_cmpctblock(self, message): pass
|
||||
def on_feefilter(self, message): pass
|
||||
def on_getaddr(self, message): pass
|
||||
def on_getblocks(self, message): pass
|
||||
def on_getblocktxn(self, message): pass
|
||||
def on_getdata(self, message): pass
|
||||
def on_getheaders(self, message): pass
|
||||
def on_headers(self, message): pass
|
||||
def on_mempool(self, message): pass
|
||||
def on_pong(self, message): pass
|
||||
def on_reject(self, message): pass
|
||||
def on_sendcmpct(self, message): pass
|
||||
def on_sendheaders(self, message): pass
|
||||
def on_tx(self, message): pass
|
||||
|
||||
def on_inv(self, conn, message):
|
||||
def on_inv(self, message):
|
||||
want = msg_getdata()
|
||||
for i in message.inv:
|
||||
if i.type != 0:
|
||||
want.inv.append(i)
|
||||
if len(want.inv):
|
||||
conn.send_message(want)
|
||||
self.send_message(want)
|
||||
|
||||
def on_ping(self, conn, message):
|
||||
conn.send_message(msg_pong(message.nonce))
|
||||
def on_ping(self, message):
|
||||
self.send_message(msg_pong(message.nonce))
|
||||
|
||||
def on_mnlistdiff(self, conn, message): pass
|
||||
def on_clsig(self, conn, message): pass
|
||||
def on_islock(self, conn, message): pass
|
||||
def on_mnlistdiff(self, message): pass
|
||||
def on_clsig(self, message): pass
|
||||
def on_islock(self, message): pass
|
||||
|
||||
def on_verack(self, conn, message):
|
||||
def on_verack(self, message):
|
||||
self.verack_received = True
|
||||
|
||||
def on_version(self, conn, message):
|
||||
def on_version(self, message):
|
||||
assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED)
|
||||
conn.send_message(msg_verack())
|
||||
conn.nServices = message.nServices
|
||||
self.send_message(msg_verack())
|
||||
self.nServices = message.nServices
|
||||
|
||||
# Connection helper methods
|
||||
|
||||
def add_connection(self, conn):
|
||||
self.connection = conn
|
||||
|
||||
def wait_for_disconnect(self, timeout=60):
|
||||
test_function = lambda: not self.connected
|
||||
test_function = lambda: self.state != "connected"
|
||||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
# Message receiving helper methods
|
||||
@ -212,12 +420,6 @@ class NodeConnCB():
|
||||
|
||||
# Message sending helper functions
|
||||
|
||||
def send_message(self, message):
|
||||
if self.connection:
|
||||
self.connection.send_message(message)
|
||||
else:
|
||||
logger.error("Cannot send message. No connection to node!")
|
||||
|
||||
def send_and_ping(self, message):
|
||||
self.send_message(message)
|
||||
self.sync_with_ping()
|
||||
@ -229,185 +431,6 @@ class NodeConnCB():
|
||||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
self.ping_counter += 1
|
||||
|
||||
class NodeConn(asyncore.dispatcher):
|
||||
"""The actual NodeConn class
|
||||
|
||||
This class provides an interface for a p2p connection to a specified node."""
|
||||
|
||||
def __init__(self, dstaddr, dstport, callback, net="regtest", services=NODE_NETWORK, send_version=True, devnet_name=None):
|
||||
asyncore.dispatcher.__init__(self, map=mininode_socket_map)
|
||||
self.dstaddr = dstaddr
|
||||
self.dstport = dstport
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
self.sendbuf = b""
|
||||
self.recvbuf = b""
|
||||
self.last_sent = 0
|
||||
self.state = "connecting"
|
||||
self.network = net
|
||||
self.devnet_name = devnet_name
|
||||
self.cb = callback
|
||||
self.disconnect = False
|
||||
self.nServices = 0
|
||||
|
||||
if send_version:
|
||||
# stuff version msg into sendbuf
|
||||
vt = msg_version()
|
||||
vt.nServices = services
|
||||
vt.addrTo.ip = self.dstaddr
|
||||
vt.addrTo.port = self.dstport
|
||||
vt.addrFrom.ip = "0.0.0.0"
|
||||
vt.addrFrom.port = 0
|
||||
vt.strSubVer = MY_SUBVERSION
|
||||
if self.network == "devnet" and self.devnet_name is not None:
|
||||
vt.strSubVer = MY_SUBVERSION_DEVNET % self.devnet_name.encode()
|
||||
self.send_message(vt, True)
|
||||
|
||||
logger.debug('Connecting to Dash Node: %s:%d' % (self.dstaddr, self.dstport))
|
||||
|
||||
try:
|
||||
self.connect((dstaddr, dstport))
|
||||
except:
|
||||
self.handle_close()
|
||||
|
||||
# Connection and disconnection methods
|
||||
|
||||
def handle_connect(self):
|
||||
if self.state != "connected":
|
||||
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
|
||||
self.state = "connected"
|
||||
self.cb.on_open(self)
|
||||
|
||||
def handle_close(self):
|
||||
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
|
||||
self.state = "closed"
|
||||
self.recvbuf = b""
|
||||
self.sendbuf = b""
|
||||
try:
|
||||
self.close()
|
||||
except:
|
||||
pass
|
||||
self.cb.on_close(self)
|
||||
|
||||
def disconnect_node(self):
|
||||
""" Disconnect the p2p connection.
|
||||
|
||||
Called by the test logic thread. Causes the p2p connection
|
||||
to be disconnected on the next iteration of the asyncore loop."""
|
||||
self.disconnect = True
|
||||
|
||||
# Socket read methods
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def handle_read(self):
|
||||
t = self.recv(8192)
|
||||
if len(t) > 0:
|
||||
self.recvbuf += t
|
||||
self.got_data()
|
||||
|
||||
def got_data(self):
|
||||
try:
|
||||
while True:
|
||||
if len(self.recvbuf) < 4:
|
||||
return
|
||||
if self.recvbuf[:4] != MAGIC_BYTES[self.network]:
|
||||
raise ValueError("got garbage %s" % repr(self.recvbuf))
|
||||
if len(self.recvbuf) < 4 + 12 + 4 + 4:
|
||||
return
|
||||
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
|
||||
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
|
||||
checksum = self.recvbuf[4+12+4:4+12+4+4]
|
||||
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
|
||||
return
|
||||
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
|
||||
th = sha256(msg)
|
||||
h = sha256(th)
|
||||
if checksum != h[:4]:
|
||||
raise ValueError("got bad checksum " + repr(self.recvbuf))
|
||||
self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
|
||||
if command not in MESSAGEMAP:
|
||||
raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg)))
|
||||
if MESSAGEMAP[command] is None:
|
||||
# Command is known but we don't want/need to handle it
|
||||
continue
|
||||
f = BytesIO(msg)
|
||||
t = MESSAGEMAP[command]()
|
||||
t.deserialize(f)
|
||||
self.got_message(t)
|
||||
except Exception as e:
|
||||
logger.exception('Error reading message:', repr(e))
|
||||
raise
|
||||
|
||||
def got_message(self, message):
|
||||
if self.last_sent + 30 * 60 < time.time():
|
||||
self.send_message(MESSAGEMAP[b'ping']())
|
||||
self._log_message("receive", message)
|
||||
self.cb.deliver(self, message)
|
||||
|
||||
# Socket write methods
|
||||
|
||||
def writable(self):
|
||||
with mininode_lock:
|
||||
pre_connection = self.state == "connecting"
|
||||
length = len(self.sendbuf)
|
||||
return (length > 0 or pre_connection)
|
||||
|
||||
def handle_write(self):
|
||||
with mininode_lock:
|
||||
# asyncore does not expose socket connection, only the first read/write
|
||||
# event, thus we must check connection manually here to know when we
|
||||
# actually connect
|
||||
if self.state == "connecting":
|
||||
self.handle_connect()
|
||||
if not self.writable():
|
||||
return
|
||||
|
||||
try:
|
||||
sent = self.send(self.sendbuf)
|
||||
except:
|
||||
self.handle_close()
|
||||
return
|
||||
self.sendbuf = self.sendbuf[sent:]
|
||||
|
||||
def send_message(self, message, pushbuf=False):
|
||||
if self.state != "connected" and not pushbuf:
|
||||
raise IOError('Not connected, no pushbuf')
|
||||
self._log_message("send", message)
|
||||
command = message.command
|
||||
data = message.serialize()
|
||||
tmsg = MAGIC_BYTES[self.network]
|
||||
tmsg += command
|
||||
tmsg += b"\x00" * (12 - len(command))
|
||||
tmsg += struct.pack("<I", len(data))
|
||||
th = sha256(data)
|
||||
h = sha256(th)
|
||||
tmsg += h[:4]
|
||||
tmsg += data
|
||||
with mininode_lock:
|
||||
if (len(self.sendbuf) == 0 and not pushbuf):
|
||||
try:
|
||||
sent = self.send(tmsg)
|
||||
self.sendbuf = tmsg[sent:]
|
||||
except BlockingIOError:
|
||||
self.sendbuf = tmsg
|
||||
else:
|
||||
self.sendbuf += tmsg
|
||||
self.last_sent = time.time()
|
||||
|
||||
# Class utility methods
|
||||
|
||||
def _log_message(self, direction, msg):
|
||||
if direction == "send":
|
||||
log_message = "Send message to "
|
||||
elif direction == "receive":
|
||||
log_message = "Received message from "
|
||||
log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500])
|
||||
if len(log_message) > 500:
|
||||
log_message += "... (msg truncated)"
|
||||
logger.debug(log_message)
|
||||
|
||||
|
||||
# Keep our own socket map for asyncore, so that we can track disconnects
|
||||
# ourselves (to workaround an issue with closing an asyncore socket when
|
||||
@ -476,7 +499,7 @@ class P2PDataStore(NodeConnCB):
|
||||
self.tx_store = {}
|
||||
self.getdata_requests = []
|
||||
|
||||
def on_getdata(self, conn, message):
|
||||
def on_getdata(self, message):
|
||||
"""Check for the tx/block in our stores and if found, reply with an inv message."""
|
||||
for inv in message.inv:
|
||||
self.getdata_requests.append(inv.hash)
|
||||
@ -487,7 +510,7 @@ class P2PDataStore(NodeConnCB):
|
||||
else:
|
||||
logger.debug('getdata message type {} received.'.format(hex(inv.type)))
|
||||
|
||||
def on_getheaders(self, conn, message):
|
||||
def on_getheaders(self, message):
|
||||
"""Search back through our block store for the locator, and reply with a headers message if found."""
|
||||
|
||||
locator, hash_stop = message.locator, message.hashstop
|
||||
@ -519,7 +542,7 @@ class P2PDataStore(NodeConnCB):
|
||||
if response is not None:
|
||||
self.send_message(response)
|
||||
|
||||
def on_reject(self, conn, message):
|
||||
def on_reject(self, message):
|
||||
"""Store reject reason and code for testing."""
|
||||
self.reject_code_received = message.code
|
||||
self.reject_reason_received = message.reason
|
||||
|
@ -15,7 +15,6 @@ import subprocess
|
||||
import time
|
||||
|
||||
from .authproxy import JSONRPCException
|
||||
from .mininode import NodeConn
|
||||
from .util import (
|
||||
assert_equal,
|
||||
delete_cookie_file,
|
||||
@ -190,7 +189,7 @@ class TestNode():
|
||||
self.encryptwallet(passphrase)
|
||||
self.wait_until_stopped()
|
||||
|
||||
def add_p2p_connection(self, p2p_conn, **kwargs):
|
||||
def add_p2p_connection(self, p2p_conn, *args, **kwargs):
|
||||
"""Add a p2p connection to the node.
|
||||
|
||||
This method adds the p2p connection to the self.p2ps list and also
|
||||
@ -199,9 +198,9 @@ class TestNode():
|
||||
kwargs['dstport'] = p2p_port(self.index)
|
||||
if 'dstaddr' not in kwargs:
|
||||
kwargs['dstaddr'] = '127.0.0.1'
|
||||
|
||||
p2p_conn.peer_connect(*args, **kwargs)
|
||||
self.p2ps.append(p2p_conn)
|
||||
kwargs.update({'callback': p2p_conn})
|
||||
p2p_conn.add_connection(NodeConn(**kwargs))
|
||||
|
||||
return p2p_conn
|
||||
|
||||
@ -217,10 +216,8 @@ class TestNode():
|
||||
def disconnect_p2ps(self):
|
||||
"""Close all p2p connections to the node."""
|
||||
for p in self.p2ps:
|
||||
# Connection could have already been closed by other end.
|
||||
if p.connection is not None:
|
||||
p.connection.disconnect_node()
|
||||
self.p2ps = []
|
||||
p.peer_disconnect()
|
||||
del self.p2ps[:]
|
||||
|
||||
class TestNodeCLIAttr:
|
||||
def __init__(self, cli, command):
|
||||
|
Loading…
Reference in New Issue
Block a user