dash/test/functional/rpc_users.py
MarcoFalke 2972dbb951
Merge #13867: qa: Make extended tests pass on native Windows
fafe73a626 qa: Raise feature_help timeout to 5s (MarcoFalke)
faabd7bc47 qa: Use files for stdout/stderr to support Windows (MarcoFalke)
facb56ffaf qa: Run gen_rpcauth with sys.executable (MarcoFalke)
fada8966c5 qa: Close stdout and stderr file when node stops (MarcoFalke)

Pull request description:

  ### qa: Close stdout and stderr file when node stops

  Since these files are potentially deleted by the test framework for cleanup, they should be closed first. Otherwise this will lead to errors on Windows when the tests finish successfully.

  Side note: After the patch, it is no longer possible to reopen the file on Windows (see https://docs.python.org/3/library/tempfile.html#tempfile.NamedTemporaryFile)

  ### qa: Run gen_rpcauth with sys.executable

  Similar to `test_runner.py`, the `sys.executable` needs to be passed down into subprocesses to pass on native Windows. (Should have no effect on Linux)

  ###   qa: Use files for stdout/stderr to support Windows

  It seems that using PIPE is not supported on Windows. Also, it is easier to just use the files that capture the stdout and stderr within the test node class.

Tree-SHA512: ec675012b10705978606b7fcbdb287c39a8e6e3732aae2fa4041d963a3c6993c6eac6a9a3cbd5479514e7d8017fe74c12235d1ed6fed2e8af8f3c71981e91864
2021-07-06 20:29:31 +03:00

207 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test multiple RPC users."""
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
get_datadir_path,
str_to_b64str,
)
import os
import http.client
import urllib.parse
import subprocess
from random import SystemRandom
import string
import configparser
import sys
class HTTPBasicsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
def setup_chain(self):
super().setup_chain()
#Append rpcauth to dash.conf before initialization
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
rpcauth2 = "rpcauth=rt2:f8607b1a88861fac29dfccf9b52ff9f$ff36a0c23c8c62b4846112e50fa888416e94c17bfd4c42f88fd8f55ec6a3137e"
rpcuser = "rpcuser=rpcuser💻"
rpcpassword = "rpcpassword=rpcpassword🔑"
self.user = ''.join(SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(10))
config = configparser.ConfigParser()
config.read_file(open(self.options.configfile))
gen_rpcauth = config['environment']['RPCAUTH']
p = subprocess.Popen([sys.executable, gen_rpcauth, self.user], stdout=subprocess.PIPE, universal_newlines=True)
lines = p.stdout.read().splitlines()
rpcauth3 = lines[1]
self.password = lines[3]
with open(os.path.join(get_datadir_path(self.options.tmpdir, 0), "dash.conf"), 'a', encoding='utf8') as f:
f.write(rpcauth+"\n")
f.write(rpcauth2+"\n")
f.write(rpcauth3+"\n")
with open(os.path.join(get_datadir_path(self.options.tmpdir, 1), "dash.conf"), 'a', encoding='utf8') as f:
f.write(rpcuser+"\n")
f.write(rpcpassword+"\n")
def run_test(self):
##################################################
# Check correctness of the rpcauth config option #
##################################################
url = urllib.parse.urlparse(self.nodes[0].url)
#Old authpair
authpair = url.username + ':' + url.password
#New authpair generated via share/rpcauth tool
password = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM="
#Second authpair with different username
password2 = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI="
authpairnew = "rt:"+password
self.log.info('Correct...')
headers = {"Authorization": "Basic " + str_to_b64str(authpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
#Use new authpair to confirm both work
self.log.info('Correct...')
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
#Wrong login name with rt's password
self.log.info('Wrong...')
authpairnew = "rtwrong:"+password
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
#Wrong password for rt
self.log.info('Wrong...')
authpairnew = "rt:"+password+"wrong"
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
#Correct for rt2
self.log.info('Correct...')
authpairnew = "rt2:"+password2
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
#Wrong password for rt2
self.log.info('Wrong...')
authpairnew = "rt2:"+password2+"wrong"
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
#Correct for randomly generated user
self.log.info('Correct...')
authpairnew = self.user+":"+self.password
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
#Wrong password for randomly generated user
self.log.info('Wrong...')
authpairnew = self.user+":"+self.password+"Wrong"
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
###############################################################
# Check correctness of the rpcuser/rpcpassword config options #
###############################################################
url = urllib.parse.urlparse(self.nodes[1].url)
# rpcuser and rpcpassword authpair
self.log.info('Correct...')
rpcuserauthpair = "rpcuser💻:rpcpassword🔑"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
#Wrong login name with rpcuser's password
rpcuserauthpair = "rpcuserwrong:rpcpassword"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
#Wrong password for rpcuser
self.log.info('Wrong...')
rpcuserauthpair = "rpcuser:rpcpasswordwrong"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
if __name__ == '__main__':
HTTPBasicsTest ().main ()