mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 20:12:57 +01:00
Adds helper functions to NodeConnCB
This commit adds some helper functions to NodeConnCB which are useful for many tests: - NodeConnCB now keeps track of the number of each message type that it's received and the most recent message of each type. Many tests assert on the most recent block, tx or reject message. - NodeConnCB now keeps track of its connection state by setting a connected boolean in on_open() and on_close() - NodeConnCB now has wait_for_block, wait_for_getdata, wait_for_getheaders, wait_for_inv and wait_for_verack methods I have updated the individual test cases to make sure that there are no namespace problems that cause them to fail with these new definitions. Future commits will remove the duplicate code.
This commit is contained in:
parent
2584925077
commit
52e15aa4d0
@ -44,15 +44,6 @@ class TestNode(NodeConnCB):
|
|||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
self.block_receive_map[message.block.sha256] = 1
|
self.block_receive_map[message.block.sha256] = 1
|
||||||
|
|
||||||
# Spin until verack message is received from the node.
|
|
||||||
# We use this to signal that our test can begin. This
|
|
||||||
# is called from the testing thread, so it needs to acquire
|
|
||||||
# the global lock.
|
|
||||||
def wait_for_verack(self):
|
|
||||||
def veracked():
|
|
||||||
return self.verack_received
|
|
||||||
return wait_until(veracked, timeout=10)
|
|
||||||
|
|
||||||
def wait_for_disconnect(self):
|
def wait_for_disconnect(self):
|
||||||
def disconnected():
|
def disconnected():
|
||||||
return self.peer_disconnected
|
return self.peer_disconnected
|
||||||
|
@ -70,17 +70,6 @@ class TestNode(NodeConnCB):
|
|||||||
def on_getdata(self, conn, message):
|
def on_getdata(self, conn, message):
|
||||||
self.last_getdata = message
|
self.last_getdata = message
|
||||||
|
|
||||||
# Spin until verack message is received from the node.
|
|
||||||
# We use this to signal that our test can begin. This
|
|
||||||
# is called from the testing thread, so it needs to acquire
|
|
||||||
# the global lock.
|
|
||||||
def wait_for_verack(self):
|
|
||||||
while True:
|
|
||||||
with mininode_lock:
|
|
||||||
if self.verack_received:
|
|
||||||
return
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
# Wrapper for the NodeConn's send_message function
|
# Wrapper for the NodeConn's send_message function
|
||||||
def send_message(self, message):
|
def send_message(self, message):
|
||||||
self.connection.send_message(message)
|
self.connection.send_message(message)
|
||||||
|
@ -23,6 +23,7 @@ class CLazyNode(NodeConnCB):
|
|||||||
self.connection = None
|
self.connection = None
|
||||||
self.unexpected_msg = False
|
self.unexpected_msg = False
|
||||||
self.connected = False
|
self.connected = False
|
||||||
|
self.ever_connected = False
|
||||||
|
|
||||||
def add_connection(self, conn):
|
def add_connection(self, conn):
|
||||||
self.connection = conn
|
self.connection = conn
|
||||||
@ -36,6 +37,7 @@ class CLazyNode(NodeConnCB):
|
|||||||
|
|
||||||
def on_open(self, conn):
|
def on_open(self, conn):
|
||||||
self.connected = True
|
self.connected = True
|
||||||
|
self.ever_connected = True
|
||||||
|
|
||||||
def on_version(self, conn, message): self.bad_message(message)
|
def on_version(self, conn, message): self.bad_message(message)
|
||||||
def on_verack(self, conn, message): self.bad_message(message)
|
def on_verack(self, conn, message): self.bad_message(message)
|
||||||
@ -121,7 +123,9 @@ class P2PLeakTest(BitcoinTestFramework):
|
|||||||
|
|
||||||
NetworkThread().start() # Start up network handling in another thread
|
NetworkThread().start() # Start up network handling in another thread
|
||||||
|
|
||||||
assert(wait_until(lambda: no_version_bannode.connected and no_version_idlenode.connected and no_verack_idlenode.version_received, timeout=10))
|
assert wait_until(lambda: no_version_bannode.ever_connected, timeout=10)
|
||||||
|
assert wait_until(lambda: no_version_idlenode.ever_connected, timeout=10)
|
||||||
|
assert wait_until(lambda: no_verack_idlenode.version_received, timeout=10)
|
||||||
|
|
||||||
# Mine a block and make sure that it's not sent to the connected nodes
|
# Mine a block and make sure that it's not sent to the connected nodes
|
||||||
self.nodes[0].generate(1)
|
self.nodes[0].generate(1)
|
||||||
@ -130,7 +134,7 @@ class P2PLeakTest(BitcoinTestFramework):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
#This node should have been banned
|
#This node should have been banned
|
||||||
assert(no_version_bannode.connection.state == "closed")
|
assert not no_version_bannode.connected
|
||||||
|
|
||||||
[conn.disconnect_node() for conn in connections]
|
[conn.disconnect_node() for conn in connections]
|
||||||
|
|
||||||
|
@ -38,15 +38,6 @@ class TestNode(NodeConnCB):
|
|||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
self.block_receive_map[message.block.sha256] = 1
|
self.block_receive_map[message.block.sha256] = 1
|
||||||
|
|
||||||
# Spin until verack message is received from the node.
|
|
||||||
# We use this to signal that our test can begin. This
|
|
||||||
# is called from the testing thread, so it needs to acquire
|
|
||||||
# the global lock.
|
|
||||||
def wait_for_verack(self):
|
|
||||||
def veracked():
|
|
||||||
return self.verack_received
|
|
||||||
return wait_until(veracked, timeout=10)
|
|
||||||
|
|
||||||
def wait_for_disconnect(self):
|
def wait_for_disconnect(self):
|
||||||
def disconnected():
|
def disconnected():
|
||||||
return self.peer_disconnected
|
return self.peer_disconnected
|
||||||
|
@ -192,9 +192,7 @@ class TestManager(object):
|
|||||||
return wait_until(disconnected, timeout=10)
|
return wait_until(disconnected, timeout=10)
|
||||||
|
|
||||||
def wait_for_verack(self):
|
def wait_for_verack(self):
|
||||||
def veracked():
|
[node.wait_for_verack() for node in self.test_nodes]
|
||||||
return all(node.verack_received for node in self.test_nodes)
|
|
||||||
return wait_until(veracked, timeout=10)
|
|
||||||
|
|
||||||
def wait_for_pings(self, counter):
|
def wait_for_pings(self, counter):
|
||||||
def received_pongs():
|
def received_pongs():
|
||||||
|
@ -20,21 +20,22 @@ msg_block, msg_tx, msg_headers, etc.:
|
|||||||
ser_*, deser_*: functions that handle serialization/deserialization
|
ser_*, deser_*: functions that handle serialization/deserialization
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import struct
|
|
||||||
import socket
|
|
||||||
import asyncore
|
import asyncore
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
import random
|
|
||||||
from .util import hex_str_to_bytes, bytes_to_hex_str
|
|
||||||
from io import BytesIO
|
|
||||||
from codecs import encode
|
from codecs import encode
|
||||||
import hashlib
|
from collections import defaultdict
|
||||||
from threading import RLock
|
|
||||||
from threading import Thread
|
|
||||||
import logging
|
|
||||||
import copy
|
import copy
|
||||||
|
import hashlib
|
||||||
|
from io import BytesIO
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from threading import RLock, Thread
|
||||||
|
|
||||||
from test_framework.siphash import siphash256
|
from test_framework.siphash import siphash256
|
||||||
|
from test_framework.util import hex_str_to_bytes, bytes_to_hex_str
|
||||||
|
|
||||||
BIP0031_VERSION = 60000
|
BIP0031_VERSION = 60000
|
||||||
MY_VERSION = 70014 # past bip-31 for ping/pong
|
MY_VERSION = 70014 # past bip-31 for ping/pong
|
||||||
@ -1465,30 +1466,57 @@ class msg_witness_blocktxn(msg_blocktxn):
|
|||||||
r += self.block_transactions.serialize(with_witness=True)
|
r += self.block_transactions.serialize(with_witness=True)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
# This is what a callback should look like for NodeConn
|
|
||||||
# Reimplement the on_* functions to provide handling for events
|
|
||||||
class NodeConnCB(object):
|
class NodeConnCB(object):
|
||||||
|
"""Callback and helper functions for P2P connection to a bitcoind node.
|
||||||
|
|
||||||
|
Individual testcases should subclass this and override the on_* methods
|
||||||
|
if they want to alter message handling behaviour.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.verack_received = False
|
# Track whether we have a P2P connection open to the node
|
||||||
|
self.connected = False
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
|
# Track number of messages of each type received and the most recent
|
||||||
|
# message of each type
|
||||||
|
self.message_count = defaultdict(int)
|
||||||
|
self.last_message = {}
|
||||||
|
|
||||||
|
# A count of the number of ping messages we've sent to the node
|
||||||
|
self.ping_counter = 1
|
||||||
|
|
||||||
# deliver_sleep_time is helpful for debugging race conditions in p2p
|
# deliver_sleep_time is helpful for debugging race conditions in p2p
|
||||||
# tests; it causes message delivery to sleep for the specified time
|
# tests; it causes message delivery to sleep for the specified time
|
||||||
# before acquiring the global lock and delivering the next message.
|
# before acquiring the global lock and delivering the next message.
|
||||||
self.deliver_sleep_time = None
|
self.deliver_sleep_time = None
|
||||||
|
|
||||||
# Remember the services our peer has advertised
|
# Remember the services our peer has advertised
|
||||||
self.peer_services = None
|
self.peer_services = None
|
||||||
self.connection = None
|
|
||||||
self.ping_counter = 1
|
# Message receiving methods
|
||||||
self.last_pong = msg_pong()
|
|
||||||
|
|
||||||
def deliver(self, conn, message):
|
def deliver(self, conn, message):
|
||||||
|
"""Receive message and dispatch message to appropriate callback.
|
||||||
|
|
||||||
|
We keep a count of how many of each message type has been received
|
||||||
|
and the most recent message of each type.
|
||||||
|
|
||||||
|
Optionally waits for deliver_sleep_time before dispatching message.
|
||||||
|
"""
|
||||||
|
|
||||||
deliver_sleep = self.get_deliver_sleep_time()
|
deliver_sleep = self.get_deliver_sleep_time()
|
||||||
if deliver_sleep is not None:
|
if deliver_sleep is not None:
|
||||||
time.sleep(deliver_sleep)
|
time.sleep(deliver_sleep)
|
||||||
with mininode_lock:
|
with mininode_lock:
|
||||||
try:
|
try:
|
||||||
getattr(self, 'on_' + message.command.decode('ascii'))(conn, message)
|
command = message.command.decode('ascii')
|
||||||
|
self.message_count[command] += 1
|
||||||
|
self.last_message[command] = message
|
||||||
|
getattr(self, 'on_' + command)(conn, message)
|
||||||
except:
|
except:
|
||||||
logger.exception("ERROR delivering %s" % repr(message))
|
print("ERROR delivering %s (%s)" % (repr(message),
|
||||||
|
sys.exc_info()[0]))
|
||||||
|
|
||||||
def set_deliver_sleep_time(self, value):
|
def set_deliver_sleep_time(self, value):
|
||||||
with mininode_lock:
|
with mininode_lock:
|
||||||
@ -1498,14 +1526,20 @@ class NodeConnCB(object):
|
|||||||
with mininode_lock:
|
with mininode_lock:
|
||||||
return self.deliver_sleep_time
|
return self.deliver_sleep_time
|
||||||
|
|
||||||
# Callbacks which can be overridden by subclasses
|
# Callback methods. Can be overridden by subclasses in individual test
|
||||||
#################################################
|
# cases to provide custom message handling behaviour.
|
||||||
|
|
||||||
|
def on_open(self, conn):
|
||||||
|
self.connected = True
|
||||||
|
|
||||||
|
def on_close(self, conn):
|
||||||
|
self.connected = False
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
def on_addr(self, conn, message): pass
|
def on_addr(self, conn, message): pass
|
||||||
def on_alert(self, conn, message): pass
|
def on_alert(self, conn, message): pass
|
||||||
def on_block(self, conn, message): pass
|
def on_block(self, conn, message): pass
|
||||||
def on_blocktxn(self, conn, message): pass
|
def on_blocktxn(self, conn, message): pass
|
||||||
def on_close(self, conn): pass
|
|
||||||
def on_cmpctblock(self, conn, message): pass
|
def on_cmpctblock(self, conn, message): pass
|
||||||
def on_feefilter(self, conn, message): pass
|
def on_feefilter(self, conn, message): pass
|
||||||
def on_getaddr(self, conn, message): pass
|
def on_getaddr(self, conn, message): pass
|
||||||
@ -1515,7 +1549,7 @@ class NodeConnCB(object):
|
|||||||
def on_getheaders(self, conn, message): pass
|
def on_getheaders(self, conn, message): pass
|
||||||
def on_headers(self, conn, message): pass
|
def on_headers(self, conn, message): pass
|
||||||
def on_mempool(self, conn): pass
|
def on_mempool(self, conn): pass
|
||||||
def on_open(self, conn): pass
|
def on_pong(self, conn, message): pass
|
||||||
def on_reject(self, conn, message): pass
|
def on_reject(self, conn, message): pass
|
||||||
def on_sendcmpct(self, conn, message): pass
|
def on_sendcmpct(self, conn, message): pass
|
||||||
def on_sendheaders(self, conn, message): pass
|
def on_sendheaders(self, conn, message): pass
|
||||||
@ -1533,9 +1567,6 @@ class NodeConnCB(object):
|
|||||||
if conn.ver_send > BIP0031_VERSION:
|
if conn.ver_send > BIP0031_VERSION:
|
||||||
conn.send_message(msg_pong(message.nonce))
|
conn.send_message(msg_pong(message.nonce))
|
||||||
|
|
||||||
def on_pong(self, conn, message):
|
|
||||||
self.last_pong = message
|
|
||||||
|
|
||||||
def on_verack(self, conn, message):
|
def on_verack(self, conn, message):
|
||||||
conn.ver_recv = conn.ver_send
|
conn.ver_recv = conn.ver_send
|
||||||
self.verack_received = True
|
self.verack_received = True
|
||||||
@ -1548,15 +1579,53 @@ class NodeConnCB(object):
|
|||||||
conn.ver_recv = conn.ver_send
|
conn.ver_recv = conn.ver_send
|
||||||
conn.nServices = message.nServices
|
conn.nServices = message.nServices
|
||||||
|
|
||||||
# Helper functions
|
# Connection helper methods
|
||||||
##################
|
|
||||||
|
|
||||||
def add_connection(self, conn):
|
def add_connection(self, conn):
|
||||||
self.connection = conn
|
self.connection = conn
|
||||||
|
|
||||||
# Wrapper for the NodeConn's send_message function
|
def wait_for_disconnect(self, timeout=60):
|
||||||
|
test_function = lambda: not self.connected
|
||||||
|
assert wait_until(test_function, timeout=timeout)
|
||||||
|
|
||||||
|
# Message receiving helper methods
|
||||||
|
|
||||||
|
def sync(self, test_function, timeout=60):
|
||||||
|
while timeout > 0:
|
||||||
|
with mininode_lock:
|
||||||
|
if test_function():
|
||||||
|
return
|
||||||
|
time.sleep(0.05)
|
||||||
|
timeout -= 0.05
|
||||||
|
raise AssertionError("Sync failed to complete")
|
||||||
|
|
||||||
|
def wait_for_block(self, blockhash, timeout=60):
|
||||||
|
test_function = lambda: self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash
|
||||||
|
self.sync(test_function, timeout)
|
||||||
|
|
||||||
|
def wait_for_getdata(self, timeout=60):
|
||||||
|
test_function = lambda: self.last_message.get("getdata")
|
||||||
|
self.sync(test_function, timeout)
|
||||||
|
|
||||||
|
def wait_for_getheaders(self, timeout=60):
|
||||||
|
test_function = lambda: self.last_message.get("getheaders")
|
||||||
|
self.sync(test_function, timeout)
|
||||||
|
|
||||||
|
def wait_for_inv(self, expected_inv, timeout=60):
|
||||||
|
test_function = lambda: self.last_message.get("inv") and self.last_message["inv"] != expected_inv
|
||||||
|
self.sync(test_function, timeout)
|
||||||
|
|
||||||
|
def wait_for_verack(self, timeout=60):
|
||||||
|
test_function = lambda: self.message_count["verack"]
|
||||||
|
self.sync(test_function, timeout)
|
||||||
|
|
||||||
|
# Message sending helper functions
|
||||||
|
|
||||||
def send_message(self, message):
|
def send_message(self, message):
|
||||||
self.connection.send_message(message)
|
if self.connection:
|
||||||
|
self.connection.send_message(message)
|
||||||
|
else:
|
||||||
|
logger.error("Cannot send message. No connection to node!")
|
||||||
|
|
||||||
def send_and_ping(self, message):
|
def send_and_ping(self, message):
|
||||||
self.send_message(message)
|
self.send_message(message)
|
||||||
@ -1564,28 +1633,15 @@ class NodeConnCB(object):
|
|||||||
|
|
||||||
# Sync up with the node
|
# Sync up with the node
|
||||||
def sync_with_ping(self, timeout=60):
|
def sync_with_ping(self, timeout=60):
|
||||||
def received_pong():
|
|
||||||
return (self.last_pong.nonce == self.ping_counter)
|
|
||||||
self.send_message(msg_ping(nonce=self.ping_counter))
|
self.send_message(msg_ping(nonce=self.ping_counter))
|
||||||
success = wait_until(received_pong, timeout=timeout)
|
test_function = lambda: self.last_message.get("pong") and self.last_message["pong"].nonce == self.ping_counter
|
||||||
|
success = wait_until(test_function, timeout = timeout)
|
||||||
if not success:
|
if not success:
|
||||||
logger.error("sync_with_ping failed!")
|
logger.error("sync_with_ping failed!")
|
||||||
raise AssertionError("sync_with_ping failed!")
|
raise AssertionError("sync_with_ping failed!")
|
||||||
self.ping_counter += 1
|
self.ping_counter += 1
|
||||||
|
|
||||||
return success
|
return success
|
||||||
|
|
||||||
# Spin until verack message is received from the node.
|
|
||||||
# Tests may want to use this as a signal that the test can begin.
|
|
||||||
# This can be called from the testing thread, so it needs to acquire the
|
|
||||||
# global lock.
|
|
||||||
def wait_for_verack(self):
|
|
||||||
while True:
|
|
||||||
with mininode_lock:
|
|
||||||
if self.verack_received:
|
|
||||||
return
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
# The actual NodeConn class
|
# The actual NodeConn class
|
||||||
# This class provides an interface for a p2p connection to a specified node
|
# This class provides an interface for a p2p connection to a specified node
|
||||||
class NodeConn(asyncore.dispatcher):
|
class NodeConn(asyncore.dispatcher):
|
||||||
|
Loading…
Reference in New Issue
Block a user