mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
e8bf39f2dc
Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
297 lines
15 KiB
Python
Executable File
297 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright (c) 2018 The Bitcoin Core developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
"""Test the Partially Signed Transaction RPCs.
|
|
"""
|
|
|
|
from decimal import Decimal
|
|
from test_framework.test_framework import BitcoinTestFramework
|
|
from test_framework.util import (
|
|
assert_equal,
|
|
assert_greater_than,
|
|
assert_raises_rpc_error,
|
|
find_output
|
|
)
|
|
|
|
import json
|
|
import os
|
|
|
|
# Create one-input, one-output, no-fee transaction:
|
|
class PSBTTest(BitcoinTestFramework):
|
|
|
|
def set_test_params(self):
|
|
self.setup_clean_chain = False
|
|
self.num_nodes = 3
|
|
# TODO: remove -txindex. Currently required for getrawtransaction call.
|
|
self.extra_args = [
|
|
["-txindex"],
|
|
["-txindex"],
|
|
["-txindex"]
|
|
]
|
|
self.supports_cli = False
|
|
|
|
def skip_test_if_missing_module(self):
|
|
self.skip_if_no_wallet()
|
|
|
|
def run_test(self):
|
|
# Create and fund a raw tx for sending 10 DASH
|
|
psbtx1 = self.nodes[0].walletcreatefundedpsbt([], {self.nodes[2].getnewaddress():10})['psbt']
|
|
|
|
# Node 1 should not be able to add anything to it but still return the psbtx same as before
|
|
psbtx = self.nodes[1].walletprocesspsbt(psbtx1)['psbt']
|
|
assert_equal(psbtx1, psbtx)
|
|
|
|
# Sign the transaction and send
|
|
signed_tx = self.nodes[0].walletprocesspsbt(psbtx)['psbt']
|
|
final_tx = self.nodes[0].finalizepsbt(signed_tx)['hex']
|
|
self.nodes[0].sendrawtransaction(final_tx)
|
|
|
|
# Create p2sh and p2pkh addresses
|
|
pubkey0 = self.nodes[0].getaddressinfo(self.nodes[0].getnewaddress())['pubkey']
|
|
pubkey1 = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['pubkey']
|
|
pubkey2 = self.nodes[2].getaddressinfo(self.nodes[2].getnewaddress())['pubkey']
|
|
p2sh = self.nodes[1].addmultisigaddress(2, [pubkey0, pubkey1, pubkey2])['address']
|
|
p2pkh = self.nodes[1].getnewaddress()
|
|
|
|
# fund those addresses
|
|
rawtx = self.nodes[0].createrawtransaction([], {p2sh:10, p2pkh:10})
|
|
rawtx = self.nodes[0].fundrawtransaction(rawtx, {"changePosition":2})
|
|
signed_tx = self.nodes[0].signrawtransactionwithwallet(rawtx['hex'])['hex']
|
|
txid = self.nodes[0].sendrawtransaction(signed_tx)
|
|
self.nodes[0].generate(6)
|
|
self.sync_all()
|
|
|
|
# Find the output pos
|
|
p2sh_pos = -1
|
|
p2pkh_pos = -1
|
|
decoded = self.nodes[0].decoderawtransaction(signed_tx)
|
|
for out in decoded['vout']:
|
|
if out['scriptPubKey']['addresses'][0] == p2sh:
|
|
p2sh_pos = out['n']
|
|
elif out['scriptPubKey']['addresses'][0] == p2pkh:
|
|
p2pkh_pos = out['n']
|
|
|
|
# spend single key from node 1
|
|
rawtx = self.nodes[1].walletcreatefundedpsbt([{"txid":txid,"vout":p2pkh_pos}], {self.nodes[1].getnewaddress():9.99})['psbt']
|
|
walletprocesspsbt_out = self.nodes[1].walletprocesspsbt(rawtx)
|
|
assert_equal(walletprocesspsbt_out['complete'], True)
|
|
self.nodes[1].sendrawtransaction(self.nodes[1].finalizepsbt(walletprocesspsbt_out['psbt'])['hex'])
|
|
|
|
# feeRate of 0.1 DASH / KB produces a total fee slightly below -maxtxfee (~0.06650000):
|
|
res = self.nodes[1].walletcreatefundedpsbt([{"txid":txid,"vout":p2pkh_pos},{"txid":txid,"vout":p2sh_pos},{"txid":txid,"vout":p2pkh_pos}], {self.nodes[1].getnewaddress():29.99}, 0, {"feeRate": 0.1})
|
|
assert_greater_than(res["fee"], 0.06)
|
|
assert_greater_than(0.07, res["fee"])
|
|
|
|
# feeRate of 10 DASH / KB produces a total fee well above -maxtxfee
|
|
# previously this was silently capped at -maxtxfee
|
|
assert_raises_rpc_error(-4, "Fee exceeds maximum configured by -maxtxfee", self.nodes[1].walletcreatefundedpsbt, [{"txid":txid,"vout":p2pkh_pos},{"txid":txid,"vout":p2sh_pos},{"txid":txid,"vout":p2pkh_pos}], {self.nodes[1].getnewaddress():29.99}, 0, {"feeRate": 10})
|
|
|
|
# partially sign multisig things with node 1
|
|
psbtx = self.nodes[1].walletcreatefundedpsbt([{"txid":txid,"vout":p2sh_pos}], {self.nodes[1].getnewaddress():9.99})['psbt']
|
|
walletprocesspsbt_out = self.nodes[1].walletprocesspsbt(psbtx)
|
|
psbtx = walletprocesspsbt_out['psbt']
|
|
assert_equal(walletprocesspsbt_out['complete'], False)
|
|
|
|
# partially sign with node 2. This should be complete and sendable
|
|
walletprocesspsbt_out = self.nodes[2].walletprocesspsbt(psbtx)
|
|
assert_equal(walletprocesspsbt_out['complete'], True)
|
|
self.nodes[2].sendrawtransaction(self.nodes[2].finalizepsbt(walletprocesspsbt_out['psbt'])['hex'])
|
|
|
|
# check that walletprocesspsbt fails to decode a non-psbt
|
|
rawtx = self.nodes[1].createrawtransaction([{"txid":txid,"vout":p2pkh_pos}], {self.nodes[1].getnewaddress():9.99})
|
|
assert_raises_rpc_error(-22, "TX decode failed", self.nodes[1].walletprocesspsbt, rawtx)
|
|
|
|
# Convert a non-psbt to psbt and make sure we can decode it
|
|
rawtx = self.nodes[0].createrawtransaction([], {self.nodes[1].getnewaddress():10})
|
|
rawtx = self.nodes[0].fundrawtransaction(rawtx)
|
|
new_psbt = self.nodes[0].converttopsbt(rawtx['hex'])
|
|
self.nodes[0].decodepsbt(new_psbt)
|
|
|
|
# Make sure that a psbt with signatures cannot be converted
|
|
signedtx = self.nodes[0].signrawtransactionwithwallet(rawtx['hex'])
|
|
assert_raises_rpc_error(-22, "Inputs must not have scriptSigs", self.nodes[0].converttopsbt, signedtx['hex'])
|
|
assert_raises_rpc_error(-22, "Inputs must not have scriptSigs", self.nodes[0].converttopsbt, signedtx['hex'], False)
|
|
# Unless we allow it to convert and strip signatures
|
|
self.nodes[0].converttopsbt(signedtx['hex'], True)
|
|
|
|
# Explicitly allow converting non-empty txs
|
|
new_psbt = self.nodes[0].converttopsbt(rawtx['hex'])
|
|
self.nodes[0].decodepsbt(new_psbt)
|
|
|
|
# Create outputs to nodes 1 and 2
|
|
node1_addr = self.nodes[1].getnewaddress()
|
|
node2_addr = self.nodes[2].getnewaddress()
|
|
txid1 = self.nodes[0].sendtoaddress(node1_addr, 13)
|
|
txid2 = self.nodes[0].sendtoaddress(node2_addr, 13)
|
|
blockhash = self.nodes[0].generate(6)[0]
|
|
self.sync_all()
|
|
vout1 = find_output(self.nodes[1], txid1, 13, blockhash=blockhash)
|
|
vout2 = find_output(self.nodes[2], txid2, 13, blockhash=blockhash)
|
|
|
|
# Create a psbt spending outputs from nodes 1 and 2
|
|
psbt_orig = self.nodes[0].createpsbt([{"txid":txid1, "vout":vout1}, {"txid":txid2, "vout":vout2}], {self.nodes[0].getnewaddress():25.999})
|
|
|
|
# Update psbts, should only have data for one input and not the other
|
|
psbt1 = self.nodes[1].walletprocesspsbt(psbt_orig)['psbt']
|
|
psbt1_decoded = self.nodes[0].decodepsbt(psbt1)
|
|
assert psbt1_decoded['inputs'][0] and not psbt1_decoded['inputs'][1]
|
|
psbt2 = self.nodes[2].walletprocesspsbt(psbt_orig)['psbt']
|
|
psbt2_decoded = self.nodes[0].decodepsbt(psbt2)
|
|
assert not psbt2_decoded['inputs'][0] and psbt2_decoded['inputs'][1]
|
|
|
|
# Combine, finalize, and send the psbts
|
|
combined = self.nodes[0].combinepsbt([psbt1, psbt2])
|
|
finalized = self.nodes[0].finalizepsbt(combined)['hex']
|
|
self.nodes[0].sendrawtransaction(finalized)
|
|
self.nodes[0].generate(6)
|
|
self.sync_all()
|
|
|
|
# BIP 174 Test Vectors
|
|
|
|
# Check that unknown values are just passed through
|
|
unknown_psbt = "cHNidP8BAD8CAAAAAf//////////////////////////////////////////AAAAAAD/////AQAAAAAAAAAAA2oBAAAAAAAACg8BAgMEBQYHCAkPAQIDBAUGBwgJCgsMDQ4PAAA="
|
|
unknown_out = self.nodes[0].walletprocesspsbt(unknown_psbt)['psbt']
|
|
assert_equal(unknown_psbt, unknown_out)
|
|
|
|
# Open the data file
|
|
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_psbt.json'), encoding='utf-8') as f:
|
|
d = json.load(f)
|
|
invalids = d['invalid']
|
|
valids = d['valid']
|
|
creators = d['creator']
|
|
signers = d['signer']
|
|
combiners = d['combiner']
|
|
finalizers = d['finalizer']
|
|
extractors = d['extractor']
|
|
|
|
# Invalid PSBTs
|
|
for invalid in invalids:
|
|
assert_raises_rpc_error(-22, "TX decode failed", self.nodes[0].decodepsbt, invalid)
|
|
|
|
# Valid PSBTs
|
|
for valid in valids:
|
|
self.nodes[0].decodepsbt(valid)
|
|
|
|
# Creator Tests
|
|
for creator in creators:
|
|
created_tx = self.nodes[0].createpsbt(creator['inputs'], creator['outputs'])
|
|
assert_equal(created_tx, creator['result'])
|
|
|
|
# Signer tests
|
|
for i, signer in enumerate(signers):
|
|
self.nodes[2].createwallet("wallet{}".format(i))
|
|
wrpc = self.nodes[2].get_wallet_rpc("wallet{}".format(i))
|
|
for key in signer['privkeys']:
|
|
wrpc.importprivkey(key)
|
|
signed_tx = wrpc.walletprocesspsbt(signer['psbt'])['psbt']
|
|
assert_equal(signed_tx, signer['result'])
|
|
|
|
# Combiner test
|
|
for combiner in combiners:
|
|
combined = self.nodes[2].combinepsbt(combiner['combine'])
|
|
assert_equal(combined, combiner['result'])
|
|
|
|
# Empty combiner test
|
|
assert_raises_rpc_error(-8, "Parameter 'txs' cannot be empty", self.nodes[0].combinepsbt, [])
|
|
|
|
# Finalizer test
|
|
for finalizer in finalizers:
|
|
finalized = self.nodes[2].finalizepsbt(finalizer['finalize'], False)['psbt']
|
|
assert_equal(finalized, finalizer['result'])
|
|
|
|
# Extractor test
|
|
for extractor in extractors:
|
|
extracted = self.nodes[2].finalizepsbt(extractor['extract'], True)['hex']
|
|
assert_equal(extracted, extractor['result'])
|
|
|
|
# Test that psbts with p2pkh outputs are created properly
|
|
p2pkh = self.nodes[0].getnewaddress()
|
|
psbt = self.nodes[1].walletcreatefundedpsbt([], [{p2pkh : 1}], 0, {"includeWatching" : True}, True)
|
|
self.nodes[0].decodepsbt(psbt['psbt'])
|
|
|
|
# Test decoding error: invalid base64
|
|
assert_raises_rpc_error(-22, "TX decode failed invalid base64", self.nodes[0].decodepsbt, ";definitely not base64;")
|
|
|
|
# Send to all types of addresses
|
|
addr1 = self.nodes[1].getnewaddress()
|
|
txid1 = self.nodes[0].sendtoaddress(addr1, 11)
|
|
vout1 = find_output(self.nodes[0], txid1, 11)
|
|
addr2 = self.nodes[1].getnewaddress()
|
|
txid2 = self.nodes[0].sendtoaddress(addr2, 11)
|
|
vout2 = find_output(self.nodes[0], txid2, 11)
|
|
addr3 = self.nodes[1].getnewaddress()
|
|
txid3 = self.nodes[0].sendtoaddress(addr3, 11)
|
|
vout3 = find_output(self.nodes[0], txid3, 11)
|
|
self.sync_all()
|
|
|
|
def test_psbt_input_keys(psbt_input, keys):
|
|
"""Check that the psbt input has only the expected keys."""
|
|
assert_equal(set(keys), set(psbt_input.keys()))
|
|
|
|
# Create a PSBT. None of the inputs are filled initially
|
|
psbt = self.nodes[1].createpsbt([{"txid":txid1, "vout":vout1},{"txid":txid2, "vout":vout2},{"txid":txid3, "vout":vout3}], {self.nodes[0].getnewaddress():32.999})
|
|
decoded = self.nodes[1].decodepsbt(psbt)
|
|
test_psbt_input_keys(decoded['inputs'][0], [])
|
|
test_psbt_input_keys(decoded['inputs'][1], [])
|
|
test_psbt_input_keys(decoded['inputs'][2], [])
|
|
|
|
# Update a PSBT with UTXOs from the node
|
|
# No inputs should be filled because they are non-witness
|
|
updated = self.nodes[1].utxoupdatepsbt(psbt)
|
|
decoded = self.nodes[1].decodepsbt(updated)
|
|
test_psbt_input_keys(decoded['inputs'][1], [])
|
|
test_psbt_input_keys(decoded['inputs'][2], [])
|
|
|
|
# Try again, now while providing descriptors
|
|
descs = [self.nodes[1].getaddressinfo(addr)['desc'] for addr in [addr1,addr2,addr3]]
|
|
updated = self.nodes[1].utxoupdatepsbt(psbt, descs)
|
|
decoded = self.nodes[1].decodepsbt(updated)
|
|
test_psbt_input_keys(decoded['inputs'][0], [])
|
|
test_psbt_input_keys(decoded['inputs'][1], [])
|
|
test_psbt_input_keys(decoded['inputs'][2], [])
|
|
|
|
# Two PSBTs with a common input should not be joinable
|
|
psbt1 = self.nodes[1].createpsbt([{"txid":txid1, "vout":vout1}], {self.nodes[0].getnewaddress():Decimal('10.999')})
|
|
assert_raises_rpc_error(-8, "exists in multiple PSBTs", self.nodes[1].joinpsbts, [psbt1, updated])
|
|
|
|
# Join two distinct PSBTs
|
|
addr4 = self.nodes[1].getnewaddress()
|
|
txid4 = self.nodes[0].sendtoaddress(addr4, 5)
|
|
vout4 = find_output(self.nodes[0], txid4, 5)
|
|
self.nodes[0].generate(6)
|
|
self.sync_all()
|
|
psbt2 = self.nodes[1].createpsbt([{"txid":txid4, "vout":vout4}], {self.nodes[0].getnewaddress():Decimal('4.999')})
|
|
psbt2 = self.nodes[1].walletprocesspsbt(psbt2)['psbt']
|
|
psbt2_decoded = self.nodes[0].decodepsbt(psbt2)
|
|
assert "final_scriptwitness" not in psbt2_decoded['inputs'][0] and "final_scriptSig" in psbt2_decoded['inputs'][0]
|
|
joined = self.nodes[0].joinpsbts([psbt, psbt2])
|
|
joined_decoded = self.nodes[0].decodepsbt(joined)
|
|
assert len(joined_decoded['inputs']) == 4 and len(joined_decoded['outputs']) == 2 and "final_scriptwitness" not in joined_decoded['inputs'][3] and "final_scriptSig" not in joined_decoded['inputs'][3]
|
|
|
|
# Newly created PSBT needs UTXOs and updating
|
|
addr = self.nodes[1].getnewaddress()
|
|
txid = self.nodes[0].sendtoaddress(addr, 7)
|
|
self.nodes[0].generate(6)
|
|
self.sync_all()
|
|
vout = find_output(self.nodes[0], txid, 7)
|
|
psbt = self.nodes[1].createpsbt([{"txid":txid, "vout":vout}], {self.nodes[0].getnewaddress():Decimal('6.999')})
|
|
analyzed = self.nodes[0].analyzepsbt(psbt)
|
|
assert not analyzed['inputs'][0]['has_utxo'] and not analyzed['inputs'][0]['is_final'] and analyzed['inputs'][0]['next'] == 'updater' and analyzed['next'] == 'updater'
|
|
|
|
# After update with wallet, only needs signing
|
|
updated = self.nodes[1].walletprocesspsbt(psbt, False, 'ALL', True)['psbt']
|
|
analyzed = self.nodes[0].analyzepsbt(updated)
|
|
assert analyzed['inputs'][0]['has_utxo'] and not analyzed['inputs'][0]['is_final'] and analyzed['inputs'][0]['next'] == 'signer' and analyzed['next'] == 'signer'
|
|
|
|
# Check fee and size things
|
|
assert analyzed['fee'] == Decimal('0.00100000') and analyzed['estimated_vsize'] == 191 and analyzed['estimated_feerate'] == Decimal('0.00523560')
|
|
|
|
# After signing and finalizing, needs extracting
|
|
signed = self.nodes[1].walletprocesspsbt(updated)['psbt']
|
|
analyzed = self.nodes[0].analyzepsbt(signed)
|
|
assert analyzed['inputs'][0]['has_utxo'] and analyzed['inputs'][0]['is_final'] and analyzed['next'] == 'extractor'
|
|
|
|
if __name__ == '__main__':
|
|
PSBTTest().main()
|