dash/test/functional/example_test.py

216 lines
8.6 KiB
Python
Raw Normal View History

2017-06-15 21:29:53 +02:00
#!/usr/bin/env python3
# Copyright (c) 2017 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""An example functional test
The module-level docstring should include a high-level description of
what the test is doing. It's the first thing people see when they open
the file and should give the reader information about *what* the test
is testing and *how* it's being tested
"""
# Imports should be in PEP8 ordering (std library first, then third party
# libraries then local imports).
from collections import defaultdict
# Avoid wildcard * imports if possible
from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.mininode import (
CInv,
NetworkThread,
P2PInterface,
2017-06-15 21:29:53 +02:00
mininode_lock,
msg_block,
msg_getdata,
NODE_NETWORK,
2017-06-15 21:29:53 +02:00
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
connect_nodes,
2017-08-16 18:17:34 +02:00
wait_until,
2017-06-15 21:29:53 +02:00
)
# P2PInterface is a class containing callbacks to be executed when a P2P
# message is received from the node-under-test. Subclass P2PInterface and
2017-06-15 21:29:53 +02:00
# override the on_*() methods if you need custom behaviour.
class BaseNode(P2PInterface):
2017-06-15 21:29:53 +02:00
def __init__(self):
"""Initialize the P2PInterface
2017-06-15 21:29:53 +02:00
Used to inialize custom properties for the Node that aren't
included by default in the base class. Be aware that the P2PInterface
2017-06-15 21:29:53 +02:00
base class already stores a counter for each P2P message type and the
last received message of each type, which should be sufficient for the
needs of most tests.
Call super().__init__() first for standard initialization and then
initialize custom properties."""
super().__init__()
# Stores a dictionary of all blocks received
self.block_receive_map = defaultdict(int)
def on_block(self, message):
2017-06-15 21:29:53 +02:00
"""Override the standard on_block callback
Store the hash of a received block in the dictionary."""
message.block.calc_sha256()
self.block_receive_map[message.block.sha256] += 1
def on_inv(self, message):
2017-08-10 21:35:29 +02:00
"""Override the standard on_inv callback"""
pass
2017-06-15 21:29:53 +02:00
def custom_function():
"""Do some custom behaviour
If this function is more generally useful for other tests, consider
moving it to a module in test_framework."""
# self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework
pass
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
# Override the set_test_params(), add_options(), setup_chain(), setup_network()
2017-06-15 21:29:53 +02:00
# and setup_nodes() methods to customize the test setup as required.
def set_test_params(self):
"""Override test parameters for your individual test.
This method must be overridden and num_nodes must be exlicitly set."""
2017-06-15 21:29:53 +02:00
self.setup_clean_chain = True
self.num_nodes = 3
# Use self.extra_args to change command-line arguments for the nodes
self.extra_args = [[], ["-logips"], []]
2017-09-01 20:24:30 +02:00
# self.log.info("I've finished set_test_params") # Oops! Can't run self.log before run_test()
2017-06-15 21:29:53 +02:00
# Use add_options() to add specific command-line options for your test.
# In practice this is not used very much, since the tests are mostly written
# to be run in automated environments without command-line options.
# def add_options()
# pass
# Use setup_chain() to customize the node data directories. In practice
# this is not used very much since the default behaviour is almost always
# fine
# def setup_chain():
# pass
def setup_network(self):
"""Setup the test network topology
Often you won't need to override this, since the standard network topology
(linear: node0 <-> node1 <-> node2 <-> ...) is fine for most tests.
If you do override this method, remember to start the nodes, assign
them to self.nodes, connect them and then sync."""
self.setup_nodes()
# In this test, we're not connecting node2 to node0 or node1. Calls to
# sync_all() should not include node2, since we're not expecting it to
# sync.
connect_nodes(self.nodes[0], 1)
self.sync_all([self.nodes[0:1]])
# Use setup_nodes() to customize the node start behaviour (for example if
# you don't want to start all nodes at the start of the test).
# def setup_nodes():
# pass
def custom_method(self):
"""Do some custom behaviour for this test
Define it in a method here because you're going to use it repeatedly.
If you think it's useful in general, consider moving it to the base
BitcoinTestFramework class so other tests can use it."""
self.log.info("Running custom_method")
def run_test(self):
"""Main test logic"""
# Create a P2P connection to one of the nodes
self.nodes[0].add_p2p_connection(BaseNode())
2017-06-15 21:29:53 +02:00
# Start up network handling in another thread. This needs to be called
# after the P2P connections have been created.
NetworkThread().start()
# wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack()
2017-06-15 21:29:53 +02:00
# Generating a block on one of the nodes will get us out of IBD
blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)]
self.sync_all([self.nodes[0:1]])
# Notice above how we called an RPC by calling a method with the same
# name on the node object. Notice also how we used a keyword argument
# to specify a named RPC argument. Neither of those are defined on the
# node object. Instead there's some __getattr__() magic going on under
# the covers to dispatch unrecognised attribute calls to the RPC
# interface.
# Logs are nice. Do plenty of them. They can be used in place of comments for
# breaking the test into sub-sections.
self.log.info("Starting test!")
self.log.info("Calling a custom function")
custom_function()
self.log.info("Calling a custom method")
self.custom_method()
self.log.info("Create some blocks")
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
height = 1
for i in range(10):
# Use the mininode and blocktools functionality to manually build a block
# Calling the generate() rpc is easier, but this allows us to exactly
# control the blocks and transactions.
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.solve()
block_message = msg_block(block)
# Send message is used to send a P2P message to the node over our P2PInterface
self.nodes[0].p2p.send_message(block_message)
2017-06-15 21:29:53 +02:00
self.tip = block.sha256
blocks.append(self.tip)
self.block_time += 1
height += 1
self.log.info("Wait for node1 to reach current tip (height 11) using RPC")
self.nodes[1].waitforblockheight(11)
self.log.info("Connect node2 and node1")
connect_nodes(self.nodes[1], 2)
self.log.info("Add P2P connection to node2")
self.nodes[2].add_p2p_connection(BaseNode())
self.nodes[2].p2p.wait_for_verack()
2017-06-15 21:29:53 +02:00
2017-08-16 00:24:39 +02:00
self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us")
2017-06-15 21:29:53 +02:00
2017-08-10 21:35:29 +02:00
getdata_request = msg_getdata()
2017-06-15 21:29:53 +02:00
for block in blocks:
getdata_request.inv.append(CInv(2, block))
self.nodes[2].p2p.send_message(getdata_request)
2017-06-15 21:29:53 +02:00
# wait_until() will loop until a predicate condition is met. Use it to test properties of the
# P2PInterface objects.
wait_until(lambda: sorted(blocks) == sorted(list(self.nodes[2].p2p.block_receive_map.keys())), timeout=5, lock=mininode_lock)
2017-06-15 21:29:53 +02:00
self.log.info("Check that each block was received only once")
# The network thread uses a global lock on data access to the P2PConnection objects when sending and receiving
# messages. The test thread should acquire the global lock before accessing any P2PConnection data to avoid locking
2017-06-15 21:29:53 +02:00
# and synchronization issues. Note wait_until() acquires this global lock when testing the predicate.
with mininode_lock:
for block in self.nodes[2].p2p.block_receive_map.values():
2017-06-15 21:29:53 +02:00
assert_equal(block, 1)
if __name__ == '__main__':
ExampleTest().main()