mirror of
https://github.com/dashpay/dash.git
synced 2024-12-26 04:22:55 +01:00
merge bitcoin#21008: fix zmq test flakiness, improve speed
This commit is contained in:
parent
5e87efd04b
commit
7b0c725c59
@ -33,28 +33,31 @@ def dashhash_reversed(byte_str):
|
|||||||
|
|
||||||
class ZMQSubscriber:
|
class ZMQSubscriber:
|
||||||
def __init__(self, socket, topic):
|
def __init__(self, socket, topic):
|
||||||
self.sequence = 0
|
self.sequence = None # no sequence number received yet
|
||||||
self.socket = socket
|
self.socket = socket
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
|
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
||||||
|
|
||||||
def receive(self):
|
# Receive message from publisher and verify that topic and sequence match
|
||||||
|
def _receive_from_publisher_and_check(self):
|
||||||
topic, body, seq = self.socket.recv_multipart()
|
topic, body, seq = self.socket.recv_multipart()
|
||||||
# Topic should match the subscriber topic.
|
# Topic should match the subscriber topic.
|
||||||
assert_equal(topic, self.topic)
|
assert_equal(topic, self.topic)
|
||||||
# Sequence should be incremental.
|
# Sequence should be incremental.
|
||||||
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
|
received_seq = struct.unpack('<I', seq)[-1]
|
||||||
|
if self.sequence is None:
|
||||||
|
self.sequence = received_seq
|
||||||
|
else:
|
||||||
|
assert_equal(received_seq, self.sequence)
|
||||||
self.sequence += 1
|
self.sequence += 1
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
def receive(self):
|
||||||
|
return self._receive_from_publisher_and_check()
|
||||||
|
|
||||||
def receive_sequence(self):
|
def receive_sequence(self):
|
||||||
topic, body, seq = self.socket.recv_multipart()
|
body = self._receive_from_publisher_and_check()
|
||||||
# Topic should match the subscriber topic.
|
|
||||||
assert_equal(topic, self.topic)
|
|
||||||
# Sequence should be incremental.
|
|
||||||
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
|
|
||||||
self.sequence += 1
|
|
||||||
hash = body[:32].hex()
|
hash = body[:32].hex()
|
||||||
label = chr(body[32])
|
label = chr(body[32])
|
||||||
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
|
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
|
||||||
@ -70,6 +73,9 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
self.num_nodes = 2
|
self.num_nodes = 2
|
||||||
if self.is_wallet_compiled():
|
if self.is_wallet_compiled():
|
||||||
self.requires_wallet = True
|
self.requires_wallet = True
|
||||||
|
# This test isn't testing txn relay/timing, so set whitelist on the
|
||||||
|
# peers for instant txn relay. This speeds up the test run time 2-3x.
|
||||||
|
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
|
||||||
|
|
||||||
def skip_test_if_missing_module(self):
|
def skip_test_if_missing_module(self):
|
||||||
self.skip_if_no_py3_zmq()
|
self.skip_if_no_py3_zmq()
|
||||||
@ -96,23 +102,46 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
|
|
||||||
# Restart node with the specified zmq notifications enabled, subscribe to
|
# Restart node with the specified zmq notifications enabled, subscribe to
|
||||||
# all of them and return the corresponding ZMQSubscriber objects.
|
# all of them and return the corresponding ZMQSubscriber objects.
|
||||||
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
|
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
|
||||||
subscribers = []
|
subscribers = []
|
||||||
for topic, address in services:
|
for topic, address in services:
|
||||||
socket = self.ctx.socket(zmq.SUB)
|
socket = self.ctx.socket(zmq.SUB)
|
||||||
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
|
|
||||||
subscribers.append(ZMQSubscriber(socket, topic.encode()))
|
subscribers.append(ZMQSubscriber(socket, topic.encode()))
|
||||||
|
|
||||||
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
|
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
|
||||||
|
self.extra_args[0])
|
||||||
if connect_nodes:
|
|
||||||
self.connect_nodes(0, 1)
|
|
||||||
|
|
||||||
for i, sub in enumerate(subscribers):
|
for i, sub in enumerate(subscribers):
|
||||||
sub.socket.connect(services[i][1])
|
sub.socket.connect(services[i][1])
|
||||||
|
|
||||||
# Relax so that the subscribers are ready before publishing zmq messages
|
# Ensure that all zmq publisher notification interfaces are ready by
|
||||||
sleep(0.2)
|
# running the following "sync up" procedure:
|
||||||
|
# 1. Generate a block on the node
|
||||||
|
# 2. Try to receive a notification on all subscribers
|
||||||
|
# 3. If all subscribers get a message within the timeout (1 second),
|
||||||
|
# we are done, otherwise repeat starting from step 1
|
||||||
|
for sub in subscribers:
|
||||||
|
sub.socket.set(zmq.RCVTIMEO, 1000)
|
||||||
|
while True:
|
||||||
|
self.nodes[0].generate(1)
|
||||||
|
recv_failed = False
|
||||||
|
for sub in subscribers:
|
||||||
|
try:
|
||||||
|
sub.receive()
|
||||||
|
except zmq.error.Again:
|
||||||
|
self.log.debug("Didn't receive sync-up notification, trying again.")
|
||||||
|
recv_failed = True
|
||||||
|
if not recv_failed:
|
||||||
|
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
|
||||||
|
break
|
||||||
|
|
||||||
|
# set subscriber's desired timeout for the test
|
||||||
|
for sub in subscribers:
|
||||||
|
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
|
||||||
|
|
||||||
|
self.connect_nodes(0, 1)
|
||||||
|
if sync_blocks:
|
||||||
|
self.sync_blocks()
|
||||||
|
|
||||||
return subscribers
|
return subscribers
|
||||||
|
|
||||||
@ -123,9 +152,7 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
self.zmq_context = zmq.Context()
|
self.zmq_context = zmq.Context()
|
||||||
|
|
||||||
address = 'tcp://127.0.0.1:28332'
|
address = 'tcp://127.0.0.1:28332'
|
||||||
subs = self.setup_zmq_test(
|
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
|
||||||
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
|
|
||||||
connect_nodes=True)
|
|
||||||
|
|
||||||
hashblock = subs[0]
|
hashblock = subs[0]
|
||||||
hashtx = subs[1]
|
hashtx = subs[1]
|
||||||
@ -203,6 +230,7 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
hashblock, hashtx = self.setup_zmq_test(
|
hashblock, hashtx = self.setup_zmq_test(
|
||||||
[(topic, address) for topic in ["hashblock", "hashtx"]],
|
[(topic, address) for topic in ["hashblock", "hashtx"]],
|
||||||
recv_timeout=2) # 2 second timeout to check end of notifications
|
recv_timeout=2) # 2 second timeout to check end of notifications
|
||||||
|
self.disconnect_nodes(0, 1)
|
||||||
|
|
||||||
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
|
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
|
||||||
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
|
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
|
||||||
@ -251,6 +279,7 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
"""
|
"""
|
||||||
self.log.info("Testing 'sequence' publisher")
|
self.log.info("Testing 'sequence' publisher")
|
||||||
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
|
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
|
||||||
|
self.disconnect_nodes(0, 1)
|
||||||
|
|
||||||
# Mempool sequence number starts at 1
|
# Mempool sequence number starts at 1
|
||||||
seq_num = 1
|
seq_num = 1
|
||||||
@ -385,7 +414,7 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
return
|
return
|
||||||
|
|
||||||
self.log.info("Testing 'mempool sync' usage of sequence notifier")
|
self.log.info("Testing 'mempool sync' usage of sequence notifier")
|
||||||
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
|
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
|
||||||
|
|
||||||
# In-memory counter, should always start at 1
|
# In-memory counter, should always start at 1
|
||||||
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
|
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
|
||||||
@ -483,10 +512,13 @@ class ZMQTest (BitcoinTestFramework):
|
|||||||
|
|
||||||
def test_multiple_interfaces(self):
|
def test_multiple_interfaces(self):
|
||||||
# Set up two subscribers with different addresses
|
# Set up two subscribers with different addresses
|
||||||
|
# (note that after the reorg test, syncing would fail due to different
|
||||||
|
# chain lengths on node0 and node1; for this test we only need node0, so
|
||||||
|
# we can disable syncing blocks on the setup)
|
||||||
subscribers = self.setup_zmq_test([
|
subscribers = self.setup_zmq_test([
|
||||||
("hashblock", "tcp://127.0.0.1:28334"),
|
("hashblock", "tcp://127.0.0.1:28334"),
|
||||||
("hashblock", "tcp://127.0.0.1:28335"),
|
("hashblock", "tcp://127.0.0.1:28335"),
|
||||||
])
|
], sync_blocks=False)
|
||||||
|
|
||||||
# Generate 1 block in nodes[0] and receive all notifications
|
# Generate 1 block in nodes[0] and receive all notifications
|
||||||
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
|
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
|
||||||
|
Loading…
Reference in New Issue
Block a user