dash/test/functional/p2p_permissions.py
MarcoFalke a8820d894f Merge #19474: doc: Use precise permission flags where possible
fab558612278909df93bdf88f5727b04f13aef0f doc: Use precise permission flags where possible (MarcoFalke)

Pull request description:

  Instead of mentioning the all-encompassing `-whitelist*` settings, change the docs to mention the exact permission flag that will influence the behaviour.

  This is needed because in the future, the too-broad `-whitelist*` settings (they either include *all* permission flags or apply to *all* peers) might be deprecated to require the permission flags to be enumerated.

  Alternatively, in the future there could be an RPC to set the net permission flags on an existing connection, in which case the `-whitelist*` terminology is of no help.

ACKs for top commit:
  jnewbery:
    reACK fab558612278909df93bdf88f5727b04f13aef0f
  fjahr:
    Code review ACK fab558612278909df93bdf88f5727b04f13aef0f
  jonatack:
    ACK fab558612278909df93bdf88f5727b04f13aef0f

Tree-SHA512: c7dea3e577d90103bb2b0ffab7b7c8640b388932a3a880f69e2b70747fc9213dc1f437085671fd54c902ec2a578458b8a2fae6dbe076642fb88efbf9fa9e679c
2023-01-19 23:37:39 -06:00

172 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2015-2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test p2p permission message.
Test that permissions are correctly calculated and applied
"""
from test_framework.address import ADDRESS_BCRT1_P2SH_OP_TRUE
from test_framework.messages import (
CTransaction,
FromHex,
)
from test_framework.mininode import P2PDataStore
from test_framework.script import (
CScript,
OP_TRUE,
)
from test_framework.test_node import ErrorMatch
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
p2p_port,
wait_until,
)
class P2PPermissionsTests(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
def run_test(self):
self.check_tx_relay()
self.checkpermission(
# default permissions (no specific permissions)
["-whitelist=127.0.0.1"],
["relay", "noban", "mempool"],
True)
self.checkpermission(
# no permission (even with forcerelay)
["-whitelist=@127.0.0.1", "-whitelistforcerelay=1"],
[],
False)
self.checkpermission(
# relay permission removed (no specific permissions)
["-whitelist=127.0.0.1", "-whitelistrelay=0"],
["noban", "mempool"],
True)
self.checkpermission(
# forcerelay and relay permission added
# Legacy parameter interaction which set whitelistrelay to true
# if whitelistforcerelay is true
["-whitelist=127.0.0.1", "-whitelistforcerelay"],
["forcerelay", "relay", "noban", "mempool"],
True)
# Let's make sure permissions are merged correctly
# For this, we need to use whitebind instead of bind
# by modifying the configuration file.
ip_port = "127.0.0.1:{}".format(p2p_port(1))
self.replaceinconfig(1, "bind=127.0.0.1", "whitebind=bloomfilter,forcerelay@" + ip_port)
self.checkpermission(
["-whitelist=noban@127.0.0.1"],
# Check parameter interaction forcerelay should activate relay
["noban", "bloomfilter", "forcerelay", "relay"],
False)
self.replaceinconfig(1, "whitebind=bloomfilter,forcerelay@" + ip_port, "bind=127.0.0.1")
self.checkpermission(
# legacy whitelistrelay should be ignored
["-whitelist=noban,mempool@127.0.0.1", "-whitelistrelay"],
["noban", "mempool"],
False)
self.checkpermission(
# legacy whitelistforcerelay should be ignored
["-whitelist=noban,mempool@127.0.0.1", "-whitelistforcerelay"],
["noban", "mempool"],
False)
self.checkpermission(
# missing mempool permission to be considered legacy whitelisted
["-whitelist=noban@127.0.0.1"],
["noban"],
False)
self.checkpermission(
# all permission added
["-whitelist=all@127.0.0.1"],
["forcerelay", "noban", "mempool", "bloomfilter", "relay"],
False)
self.stop_node(1)
self.nodes[1].assert_start_raises_init_error(["-whitelist=oopsie@127.0.0.1"], "Invalid P2P permission", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2SH_OP_TRUE)[0])
self.sync_all()
self.log.debug("Create a connection from a forcerelay peer that rebroadcasts raw txs")
# A python mininode is needed to send the raw transaction directly. If a full node was used, it could only
# rebroadcast via the inv-getdata mechanism. However, even for forcerelay connections, a full node would
# currently not request a txid that is already in the mempool.
self.restart_node(1, extra_args=["-whitelist=forcerelay@127.0.0.1"])
p2p_rebroadcast_wallet = self.nodes[1].add_p2p_connection(P2PDataStore())
self.log.debug("Send a tx from the wallet initially")
tx = FromHex(
CTransaction(),
self.nodes[0].createrawtransaction(
inputs=[{
'txid': block_op_true['tx'][0],
'vout': 0,
}], outputs=[{
ADDRESS_BCRT1_P2SH_OP_TRUE: 5,
}]),
)
tx.vin[0].scriptSig = CScript([CScript([OP_TRUE])])
txid = tx.rehash()
self.log.debug("Wait until tx is in node[1]'s mempool")
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
self.log.debug("Check that node[1] will send the tx to node[0] even though it is already in the mempool")
self.connect_nodes(1, 0)
def in_mempool():
self.bump_mocktime(1)
return txid in self.nodes[0].getrawmempool()
with self.nodes[1].assert_debug_log(["Force relaying tx {} from peer=0".format(txid)]):
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
wait_until(in_mempool)
self.log.debug("Check that node[1] will not send an invalid tx to node[0]")
tx.vout[0].nValue += 1
txid = tx.rehash()
p2p_rebroadcast_wallet.send_txs_and_test(
[tx],
self.nodes[1],
success=False,
reject_reason='Not relaying non-mempool transaction {} from forcerelay peer=0'.format(txid),
)
def checkpermission(self, args, expectedPermissions, whitelisted):
self.restart_node(1, args)
self.connect_nodes(0, 1)
peerinfo = self.nodes[1].getpeerinfo()[0]
assert_equal(peerinfo['whitelisted'], whitelisted)
assert_equal(len(expectedPermissions), len(peerinfo['permissions']))
for p in expectedPermissions:
if not p in peerinfo['permissions']:
raise AssertionError("Expected permissions %r is not granted." % p)
def replaceinconfig(self, nodeid, old, new):
with open(self.nodes[nodeid].bitcoinconf, encoding="utf8") as f:
newText = f.read().replace(old, new)
with open(self.nodes[nodeid].bitcoinconf, 'w', encoding="utf8") as f:
f.write(newText)
if __name__ == '__main__':
P2PPermissionsTests().main()