#!/usr/bin/env python2 # Copyright (c) 2014-2015 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # # Test addressindex generation and fetching # import time from test_framework.test_framework import BitcoinTestFramework from test_framework.util import * from test_framework.script import * from test_framework.mininode import * import binascii class AddressIndexTest(BitcoinTestFramework): def setup_chain(self): print("Initializing test directory "+self.options.tmpdir) initialize_chain_clean(self.options.tmpdir, 4) def setup_network(self): self.nodes = [] # Nodes 0/1 are "wallet" nodes self.nodes.append(start_node(0, self.options.tmpdir, ["-debug"])) self.nodes.append(start_node(1, self.options.tmpdir, ["-debug", "-addressindex"])) # Nodes 2/3 are used for testing self.nodes.append(start_node(2, self.options.tmpdir, ["-debug", "-addressindex"])) self.nodes.append(start_node(3, self.options.tmpdir, ["-debug", "-addressindex"])) connect_nodes(self.nodes[0], 1) connect_nodes(self.nodes[0], 2) connect_nodes(self.nodes[0], 3) self.is_network_split = False self.sync_all() def run_test(self): print "Mining blocks..." self.nodes[0].generate(105) self.sync_all() chain_height = self.nodes[1].getblockcount() assert_equal(chain_height, 105) assert_equal(self.nodes[1].getbalance(), 0) assert_equal(self.nodes[2].getbalance(), 0) # Check that balances are correct balance0 = self.nodes[1].getaddressbalance("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br") assert_equal(balance0["balance"], 0) # Check p2pkh and p2sh address indexes print "Testing p2pkh and p2sh address index..." txid0 = self.nodes[0].sendtoaddress("mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs", 10) self.nodes[0].generate(1) txidb0 = self.nodes[0].sendtoaddress("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br", 10) self.nodes[0].generate(1) txid1 = self.nodes[0].sendtoaddress("mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs", 15) self.nodes[0].generate(1) txidb1 = self.nodes[0].sendtoaddress("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br", 15) self.nodes[0].generate(1) txid2 = self.nodes[0].sendtoaddress("mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs", 20) self.nodes[0].generate(1) txidb2 = self.nodes[0].sendtoaddress("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br", 20) self.nodes[0].generate(1) self.sync_all() txids = self.nodes[1].getaddresstxids("mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs") assert_equal(len(txids), 3) assert_equal(txids[0], txid0) assert_equal(txids[1], txid1) assert_equal(txids[2], txid2) txidsb = self.nodes[1].getaddresstxids("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br") assert_equal(len(txidsb), 3) assert_equal(txidsb[0], txidb0) assert_equal(txidsb[1], txidb1) assert_equal(txidsb[2], txidb2) # Check that limiting by height works chain_height = self.nodes[1].getblockcount() height_txids = self.nodes[1].getaddresstxids({ "addresses": ["2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br"], "start": 111, "end": 111 }) assert_equal(len(height_txids), 1) assert_equal(height_txids[0], txidb2) # Check that multiple addresses works multitxids = self.nodes[1].getaddresstxids({"addresses": ["2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br", "mo9ncXisMeAoXwqcV5EWuyncbmCcQN4rVs"]}) assert_equal(len(multitxids), 6) assert_equal(multitxids[0], txid0) assert_equal(multitxids[1], txidb0) assert_equal(multitxids[2], txid1) assert_equal(multitxids[3], txidb1) assert_equal(multitxids[4], txid2) assert_equal(multitxids[5], txidb2) # Check that balances are correct balance0 = self.nodes[1].getaddressbalance("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br") assert_equal(balance0["balance"], 45 * 100000000) # Check that outputs with the same address will only return one txid print "Testing for txid uniqueness..." addressHash = "6349a418fc4578d10a372b54b45c280cc8c4382f".decode("hex") scriptPubKey = CScript([OP_HASH160, addressHash, OP_EQUAL]) unspent = self.nodes[0].listunspent() tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(unspent[0]["txid"], 16), unspent[0]["vout"]))] tx.vout = [CTxOut(10, scriptPubKey), CTxOut(11, scriptPubKey)] tx.rehash() signed_tx = self.nodes[0].signrawtransaction(binascii.hexlify(tx.serialize()).decode("utf-8")) sent_txid = self.nodes[0].sendrawtransaction(signed_tx["hex"], True) self.nodes[0].generate(1) self.sync_all() txidsmany = self.nodes[1].getaddresstxids("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br") assert_equal(len(txidsmany), 4) assert_equal(txidsmany[3], sent_txid) # Check that balances are correct print "Testing balances..." balance0 = self.nodes[1].getaddressbalance("2N2JD6wb56AfK4tfmM6PwdVmoYk2dCKf4Br") assert_equal(balance0["balance"], 45 * 100000000 + 21) # Check that balances are correct after spending print "Testing balances after spending..." privkey2 = "cSdkPxkAjA4HDr5VHgsebAPDEh9Gyub4HK8UJr2DFGGqKKy4K5sG" address2 = "mgY65WSfEmsyYaYPQaXhmXMeBhwp4EcsQW" addressHash2 = "0b2f0a0c31bfe0406b0ccc1381fdbe311946dadc".decode("hex") scriptPubKey2 = CScript([OP_DUP, OP_HASH160, addressHash2, OP_EQUALVERIFY, OP_CHECKSIG]) self.nodes[0].importprivkey(privkey2) unspent = self.nodes[0].listunspent() tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(unspent[0]["txid"], 16), unspent[0]["vout"]))] amount = unspent[0]["amount"] * 100000000 tx.vout = [CTxOut(amount, scriptPubKey2)] tx.rehash() signed_tx = self.nodes[0].signrawtransaction(binascii.hexlify(tx.serialize()).decode("utf-8")) spending_txid = self.nodes[0].sendrawtransaction(signed_tx["hex"], True) self.nodes[0].generate(1) self.sync_all() balance1 = self.nodes[1].getaddressbalance(address2) assert_equal(balance1["balance"], amount) tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(spending_txid, 16), 0))] send_amount = 1 * 100000000 + 12840 change_amount = amount - send_amount - 10000 tx.vout = [CTxOut(change_amount, scriptPubKey2), CTxOut(send_amount, scriptPubKey)] tx.rehash() signed_tx = self.nodes[0].signrawtransaction(binascii.hexlify(tx.serialize()).decode("utf-8")) sent_txid = self.nodes[0].sendrawtransaction(signed_tx["hex"], True) self.nodes[0].generate(1) self.sync_all() balance2 = self.nodes[1].getaddressbalance(address2) assert_equal(balance2["balance"], change_amount) # Check that deltas are returned correctly deltas = self.nodes[1].getaddressdeltas({"addresses": [address2], "start": 0, "end": 200}) balance3 = 0 for delta in deltas: balance3 += delta["satoshis"] assert_equal(balance3, change_amount) # Check that deltas can be returned from range of block heights deltas = self.nodes[1].getaddressdeltas({"addresses": [address2], "start": 113, "end": 113}) assert_equal(len(deltas), 1) # Check that unspent outputs can be queried print "Testing utxos..." utxos = self.nodes[1].getaddressutxos({"addresses": [address2]}) assert_equal(len(utxos), 1) assert_equal(utxos[0]["satoshis"], change_amount) # Check that indexes will be updated with a reorg print "Testing reorg..." best_hash = self.nodes[0].getbestblockhash() self.nodes[0].invalidateblock(best_hash) self.nodes[1].invalidateblock(best_hash) self.nodes[2].invalidateblock(best_hash) self.nodes[3].invalidateblock(best_hash) self.sync_all() balance4 = self.nodes[1].getaddressbalance(address2) assert_equal(balance4, balance1) utxos2 = self.nodes[1].getaddressutxos({"addresses": [address2]}) assert_equal(len(utxos2), 1) assert_equal(utxos2[0]["satoshis"], 5000000000) # Check sorting of utxos self.nodes[2].generate(150) txidsort1 = self.nodes[2].sendtoaddress(address2, 50) self.nodes[2].generate(1) txidsort2 = self.nodes[2].sendtoaddress(address2, 50) self.nodes[2].generate(1) self.sync_all() utxos3 = self.nodes[1].getaddressutxos({"addresses": [address2]}) assert_equal(len(utxos3), 3) assert_equal(utxos3[0]["height"], 114) assert_equal(utxos3[1]["height"], 264) assert_equal(utxos3[2]["height"], 265) # Check mempool indexing print "Testing mempool indexing..." privKey3 = "cVfUn53hAbRrDEuMexyfgDpZPhF7KqXpS8UZevsyTDaugB7HZ3CD" address3 = "mw4ynwhS7MmrQ27hr82kgqu7zryNDK26JB" addressHash3 = "aa9872b5bbcdb511d89e0e11aa27da73fd2c3f50".decode("hex") scriptPubKey3 = CScript([OP_DUP, OP_HASH160, addressHash3, OP_EQUALVERIFY, OP_CHECKSIG]) unspent = self.nodes[2].listunspent() tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(unspent[0]["txid"], 16), unspent[0]["vout"]))] amount = unspent[0]["amount"] * 100000000 tx.vout = [CTxOut(amount, scriptPubKey3)] tx.rehash() signed_tx = self.nodes[2].signrawtransaction(binascii.hexlify(tx.serialize()).decode("utf-8")) memtxid1 = self.nodes[2].sendrawtransaction(signed_tx["hex"], True) time.sleep(2) tx2 = CTransaction() tx2.vin = [CTxIn(COutPoint(int(unspent[1]["txid"], 16), unspent[1]["vout"]))] amount = unspent[1]["amount"] * 100000000 tx2.vout = [CTxOut(amount, scriptPubKey3)] tx2.rehash() signed_tx2 = self.nodes[2].signrawtransaction(binascii.hexlify(tx2.serialize()).decode("utf-8")) memtxid2 = self.nodes[2].sendrawtransaction(signed_tx2["hex"], True) time.sleep(2) mempool = self.nodes[2].getaddressmempool({"addresses": [address3]}) assert_equal(len(mempool), 2) assert_equal(mempool[0]["txid"], memtxid1) assert_equal(mempool[1]["txid"], memtxid2) self.nodes[2].generate(1); self.sync_all(); mempool2 = self.nodes[2].getaddressmempool({"addresses": [address3]}) assert_equal(len(mempool2), 0) print "Passed\n" if __name__ == '__main__': AddressIndexTest().main()