mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 20:12:57 +01:00
Merge #15826: Pure python EC
b67978529a Add comments to Python ECDSA implementation (John Newbery) 8c7b9324ca Pure python EC (Pieter Wuille) Pull request description: This removes the dependency on OpenSSL for the interaction tests, by providing a pure-Python toy implementation of secp256k1. ACKs for commit b67978: jnewbery: utACK b67978529ad02fc2665f2362418dc53db2e25e17 Tree-SHA512: 181445eb08b316c46937b80dc10aa50d103ab1fdddaf834896c0ea22204889f7b13fd33cbcbd00ddba15f7e4686fe0d9f8e8bb4c0ad0e9587490c90be83966dc
This commit is contained in:
parent
4bf6dc07d4
commit
790c9e784b
@ -32,7 +32,7 @@ Start three nodes:
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from test_framework.blocktools import (create_block, create_coinbase)
|
from test_framework.blocktools import (create_block, create_coinbase)
|
||||||
from test_framework.key import CECKey
|
from test_framework.key import ECKey
|
||||||
from test_framework.messages import (
|
from test_framework.messages import (
|
||||||
CBlockHeader,
|
CBlockHeader,
|
||||||
COutPoint,
|
COutPoint,
|
||||||
@ -105,9 +105,9 @@ class AssumeValidTest(BitcoinTestFramework):
|
|||||||
self.blocks = []
|
self.blocks = []
|
||||||
|
|
||||||
# Get a pubkey for the coinbase TXO
|
# Get a pubkey for the coinbase TXO
|
||||||
coinbase_key = CECKey()
|
coinbase_key = ECKey()
|
||||||
coinbase_key.set_secretbytes(b"horsebattery")
|
coinbase_key.generate()
|
||||||
coinbase_pubkey = coinbase_key.get_pubkey()
|
coinbase_pubkey = coinbase_key.get_pubkey().get_bytes()
|
||||||
|
|
||||||
# Create the first block with a coinbase output to our key
|
# Create the first block with a coinbase output to our key
|
||||||
height = 1
|
height = 1
|
||||||
|
@ -7,7 +7,7 @@ import copy
|
|||||||
import struct
|
import struct
|
||||||
|
|
||||||
from test_framework.blocktools import create_block, create_coinbase, create_tx_with_script, get_legacy_sigopcount_block
|
from test_framework.blocktools import create_block, create_coinbase, create_tx_with_script, get_legacy_sigopcount_block
|
||||||
from test_framework.key import CECKey
|
from test_framework.key import ECKey
|
||||||
from test_framework.messages import (
|
from test_framework.messages import (
|
||||||
CBlock,
|
CBlock,
|
||||||
COIN,
|
COIN,
|
||||||
@ -85,9 +85,9 @@ class FullBlockTest(BitcoinTestFramework):
|
|||||||
self.bootstrap_p2p() # Add one p2p connection to the node
|
self.bootstrap_p2p() # Add one p2p connection to the node
|
||||||
|
|
||||||
self.block_heights = {}
|
self.block_heights = {}
|
||||||
self.coinbase_key = CECKey()
|
self.coinbase_key = ECKey()
|
||||||
self.coinbase_key.set_secretbytes(b"horsebattery")
|
self.coinbase_key.generate()
|
||||||
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
|
self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes()
|
||||||
self.tip = None
|
self.tip = None
|
||||||
self.blocks = {}
|
self.blocks = {}
|
||||||
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
|
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
|
||||||
@ -481,7 +481,7 @@ class FullBlockTest(BitcoinTestFramework):
|
|||||||
tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b''))
|
tx.vin.append(CTxIn(COutPoint(b39.vtx[i].sha256, 0), b''))
|
||||||
# Note: must pass the redeem_script (not p2sh_script) to the signature hash function
|
# Note: must pass the redeem_script (not p2sh_script) to the signature hash function
|
||||||
(sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL)
|
(sighash, err) = SignatureHash(redeem_script, tx, 1, SIGHASH_ALL)
|
||||||
sig = self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))
|
sig = self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))
|
||||||
scriptSig = CScript([sig, redeem_script])
|
scriptSig = CScript([sig, redeem_script])
|
||||||
|
|
||||||
tx.vin[1].scriptSig = scriptSig
|
tx.vin[1].scriptSig = scriptSig
|
||||||
@ -1225,7 +1225,7 @@ class FullBlockTest(BitcoinTestFramework):
|
|||||||
tx.vin[0].scriptSig = CScript()
|
tx.vin[0].scriptSig = CScript()
|
||||||
return
|
return
|
||||||
(sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL)
|
(sighash, err) = SignatureHash(spend_tx.vout[0].scriptPubKey, tx, 0, SIGHASH_ALL)
|
||||||
tx.vin[0].scriptSig = CScript([self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))])
|
tx.vin[0].scriptSig = CScript([self.coinbase_key.sign_ecdsa(sighash) + bytes(bytearray([SIGHASH_ALL]))])
|
||||||
|
|
||||||
def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])):
|
def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])):
|
||||||
tx = self.create_tx(spend_tx, 0, value, script)
|
tx = self.create_tx(spend_tx, 0, value, script)
|
||||||
|
@ -1,216 +1,386 @@
|
|||||||
# Copyright (c) 2011 Sam Rushing
|
# Copyright (c) 2019 Pieter Wuille
|
||||||
"""ECC secp256k1 OpenSSL wrapper.
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
"""Test-only secp256k1 elliptic curve implementation
|
||||||
|
|
||||||
WARNING: This module does not mlock() secrets; your private keys may end up on
|
WARNING: This code is slow, uses bad randomness, does not properly protect
|
||||||
disk in swap! Use with caution!
|
keys, and is trivially vulnerable to side channel attacks. Do not use for
|
||||||
|
anything but tests."""
|
||||||
|
import random
|
||||||
|
|
||||||
This file is modified from python-bitcoinlib.
|
def modinv(a, n):
|
||||||
"""
|
"""Compute the modular inverse of a modulo n
|
||||||
|
|
||||||
import ctypes
|
See https://en.wikipedia.org/wiki/Extended_Euclidean_algorithm#Modular_integers.
|
||||||
import ctypes.util
|
"""
|
||||||
import hashlib
|
t1, t2 = 0, 1
|
||||||
|
r1, r2 = n, a
|
||||||
|
while r2 != 0:
|
||||||
|
q = r1 // r2
|
||||||
|
t1, t2 = t2, t1 - q * t2
|
||||||
|
r1, r2 = r2, r1 - q * r2
|
||||||
|
if r1 > 1:
|
||||||
|
return None
|
||||||
|
if t1 < 0:
|
||||||
|
t1 += n
|
||||||
|
return t1
|
||||||
|
|
||||||
ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library ('ssl') or 'libeay32')
|
def jacobi_symbol(n, k):
|
||||||
|
"""Compute the Jacobi symbol of n modulo k
|
||||||
|
|
||||||
ssl.BN_new.restype = ctypes.c_void_p
|
See http://en.wikipedia.org/wiki/Jacobi_symbol
|
||||||
ssl.BN_new.argtypes = []
|
|
||||||
|
|
||||||
ssl.BN_bin2bn.restype = ctypes.c_void_p
|
For our application k is always prime, so this is the same as the Legendre symbol."""
|
||||||
ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p]
|
assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k"
|
||||||
|
n %= k
|
||||||
|
t = 0
|
||||||
|
while n != 0:
|
||||||
|
while n & 1 == 0:
|
||||||
|
n >>= 1
|
||||||
|
r = k & 7
|
||||||
|
t ^= (r == 3 or r == 5)
|
||||||
|
n, k = k, n
|
||||||
|
t ^= (n & k & 3 == 3)
|
||||||
|
n = n % k
|
||||||
|
if k == 1:
|
||||||
|
return -1 if t else 1
|
||||||
|
return 0
|
||||||
|
|
||||||
ssl.BN_CTX_free.restype = None
|
def modsqrt(a, p):
|
||||||
ssl.BN_CTX_free.argtypes = [ctypes.c_void_p]
|
"""Compute the square root of a modulo p when p % 4 = 3.
|
||||||
|
|
||||||
ssl.BN_CTX_new.restype = ctypes.c_void_p
|
The Tonelli-Shanks algorithm can be used. See https://en.wikipedia.org/wiki/Tonelli-Shanks_algorithm
|
||||||
ssl.BN_CTX_new.argtypes = []
|
|
||||||
|
|
||||||
ssl.ECDH_compute_key.restype = ctypes.c_int
|
Limiting this function to only work for p % 4 = 3 means we don't need to
|
||||||
ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
|
iterate through the loop. The highest n such that p - 1 = 2^n Q with Q odd
|
||||||
|
is n = 1. Therefore Q = (p-1)/2 and sqrt = a^((Q+1)/2) = a^((p+1)/4)
|
||||||
|
|
||||||
ssl.ECDSA_sign.restype = ctypes.c_int
|
secp256k1's is defined over field of size 2**256 - 2**32 - 977, which is 3 mod 4.
|
||||||
ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
"""
|
||||||
|
if p % 4 != 3:
|
||||||
|
raise NotImplementedError("modsqrt only implemented for p % 4 = 3")
|
||||||
|
sqrt = pow(a, (p + 1)//4, p)
|
||||||
|
if pow(sqrt, 2, p) == a % p:
|
||||||
|
return sqrt
|
||||||
|
return None
|
||||||
|
|
||||||
ssl.ECDSA_verify.restype = ctypes.c_int
|
class EllipticCurve:
|
||||||
ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p]
|
def __init__(self, p, a, b):
|
||||||
|
"""Initialize elliptic curve y^2 = x^3 + a*x + b over GF(p)."""
|
||||||
|
self.p = p
|
||||||
|
self.a = a % p
|
||||||
|
self.b = b % p
|
||||||
|
|
||||||
ssl.EC_KEY_free.restype = None
|
def affine(self, p1):
|
||||||
ssl.EC_KEY_free.argtypes = [ctypes.c_void_p]
|
"""Convert a Jacobian point tuple p1 to affine form, or None if at infinity.
|
||||||
|
|
||||||
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
|
An affine point is represented as the Jacobian (x, y, 1)"""
|
||||||
ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int]
|
x1, y1, z1 = p1
|
||||||
|
if z1 == 0:
|
||||||
|
return None
|
||||||
|
inv = modinv(z1, self.p)
|
||||||
|
inv_2 = (inv**2) % self.p
|
||||||
|
inv_3 = (inv_2 * inv) % self.p
|
||||||
|
return ((inv_2 * x1) % self.p, (inv_3 * y1) % self.p, 1)
|
||||||
|
|
||||||
ssl.EC_KEY_get0_group.restype = ctypes.c_void_p
|
def negate(self, p1):
|
||||||
ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p]
|
"""Negate a Jacobian point tuple p1."""
|
||||||
|
x1, y1, z1 = p1
|
||||||
|
return (x1, (self.p - y1) % self.p, z1)
|
||||||
|
|
||||||
ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p
|
def on_curve(self, p1):
|
||||||
ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p]
|
"""Determine whether a Jacobian tuple p is on the curve (and not infinity)"""
|
||||||
|
x1, y1, z1 = p1
|
||||||
|
z2 = pow(z1, 2, self.p)
|
||||||
|
z4 = pow(z2, 2, self.p)
|
||||||
|
return z1 != 0 and (pow(x1, 3, self.p) + self.a * x1 * z4 + self.b * z2 * z4 - pow(y1, 2, self.p)) % self.p == 0
|
||||||
|
|
||||||
ssl.EC_KEY_set_private_key.restype = ctypes.c_int
|
def is_x_coord(self, x):
|
||||||
ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
"""Test whether x is a valid X coordinate on the curve."""
|
||||||
|
x_3 = pow(x, 3, self.p)
|
||||||
|
return jacobi_symbol(x_3 + self.a * x + self.b, self.p) != -1
|
||||||
|
|
||||||
ssl.EC_KEY_set_conv_form.restype = None
|
def lift_x(self, x):
|
||||||
ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int]
|
"""Given an X coordinate on the curve, return a corresponding affine point."""
|
||||||
|
x_3 = pow(x, 3, self.p)
|
||||||
|
v = x_3 + self.a * x + self.b
|
||||||
|
y = modsqrt(v, self.p)
|
||||||
|
if y is None:
|
||||||
|
return None
|
||||||
|
return (x, y, 1)
|
||||||
|
|
||||||
ssl.EC_KEY_set_public_key.restype = ctypes.c_int
|
def double(self, p1):
|
||||||
ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
"""Double a Jacobian tuple p1
|
||||||
|
|
||||||
ssl.i2o_ECPublicKey.restype = ctypes.c_void_p
|
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Doubling"""
|
||||||
ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
x1, y1, z1 = p1
|
||||||
|
if z1 == 0:
|
||||||
|
return (0, 1, 0)
|
||||||
|
y1_2 = (y1**2) % self.p
|
||||||
|
y1_4 = (y1_2**2) % self.p
|
||||||
|
x1_2 = (x1**2) % self.p
|
||||||
|
s = (4*x1*y1_2) % self.p
|
||||||
|
m = 3*x1_2
|
||||||
|
if self.a:
|
||||||
|
m += self.a * pow(z1, 4, self.p)
|
||||||
|
m = m % self.p
|
||||||
|
x2 = (m**2 - 2*s) % self.p
|
||||||
|
y2 = (m*(s - x2) - 8*y1_4) % self.p
|
||||||
|
z2 = (2*y1*z1) % self.p
|
||||||
|
return (x2, y2, z2)
|
||||||
|
|
||||||
ssl.EC_POINT_new.restype = ctypes.c_void_p
|
def add_mixed(self, p1, p2):
|
||||||
ssl.EC_POINT_new.argtypes = [ctypes.c_void_p]
|
"""Add a Jacobian tuple p1 and an affine tuple p2
|
||||||
|
|
||||||
ssl.EC_POINT_free.restype = None
|
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
|
||||||
ssl.EC_POINT_free.argtypes = [ctypes.c_void_p]
|
x1, y1, z1 = p1
|
||||||
|
x2, y2, z2 = p2
|
||||||
|
assert(z2 == 1)
|
||||||
|
# Adding to the point at infinity is a no-op
|
||||||
|
if z1 == 0:
|
||||||
|
return p2
|
||||||
|
z1_2 = (z1**2) % self.p
|
||||||
|
z1_3 = (z1_2 * z1) % self.p
|
||||||
|
u2 = (x2 * z1_2) % self.p
|
||||||
|
s2 = (y2 * z1_3) % self.p
|
||||||
|
if x1 == u2:
|
||||||
|
if (y1 != s2):
|
||||||
|
# p1 and p2 are inverses. Return the point at infinity.
|
||||||
|
return (0, 1, 0)
|
||||||
|
# p1 == p2. The formulas below fail when the two points are equal.
|
||||||
|
return self.double(p1)
|
||||||
|
h = u2 - x1
|
||||||
|
r = s2 - y1
|
||||||
|
h_2 = (h**2) % self.p
|
||||||
|
h_3 = (h_2 * h) % self.p
|
||||||
|
u1_h_2 = (x1 * h_2) % self.p
|
||||||
|
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
|
||||||
|
y3 = (r*(u1_h_2 - x3) - y1*h_3) % self.p
|
||||||
|
z3 = (h*z1) % self.p
|
||||||
|
return (x3, y3, z3)
|
||||||
|
|
||||||
ssl.EC_POINT_mul.restype = ctypes.c_int
|
def add(self, p1, p2):
|
||||||
ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
"""Add two Jacobian tuples p1 and p2
|
||||||
|
|
||||||
# this specifies the curve used with ECDSA.
|
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition"""
|
||||||
NID_secp256k1 = 714 # from openssl/obj_mac.h
|
x1, y1, z1 = p1
|
||||||
|
x2, y2, z2 = p2
|
||||||
|
# Adding the point at infinity is a no-op
|
||||||
|
if z1 == 0:
|
||||||
|
return p2
|
||||||
|
if z2 == 0:
|
||||||
|
return p1
|
||||||
|
# Adding an Affine to a Jacobian is more efficient since we save field multiplications and squarings when z = 1
|
||||||
|
if z1 == 1:
|
||||||
|
return self.add_mixed(p2, p1)
|
||||||
|
if z2 == 1:
|
||||||
|
return self.add_mixed(p1, p2)
|
||||||
|
z1_2 = (z1**2) % self.p
|
||||||
|
z1_3 = (z1_2 * z1) % self.p
|
||||||
|
z2_2 = (z2**2) % self.p
|
||||||
|
z2_3 = (z2_2 * z2) % self.p
|
||||||
|
u1 = (x1 * z2_2) % self.p
|
||||||
|
u2 = (x2 * z1_2) % self.p
|
||||||
|
s1 = (y1 * z2_3) % self.p
|
||||||
|
s2 = (y2 * z1_3) % self.p
|
||||||
|
if u1 == u2:
|
||||||
|
if (s1 != s2):
|
||||||
|
# p1 and p2 are inverses. Return the point at infinity.
|
||||||
|
return (0, 1, 0)
|
||||||
|
# p1 == p2. The formulas below fail when the two points are equal.
|
||||||
|
return self.double(p1)
|
||||||
|
h = u2 - u1
|
||||||
|
r = s2 - s1
|
||||||
|
h_2 = (h**2) % self.p
|
||||||
|
h_3 = (h_2 * h) % self.p
|
||||||
|
u1_h_2 = (u1 * h_2) % self.p
|
||||||
|
x3 = (r**2 - h_3 - 2*u1_h_2) % self.p
|
||||||
|
y3 = (r*(u1_h_2 - x3) - s1*h_3) % self.p
|
||||||
|
z3 = (h*z1*z2) % self.p
|
||||||
|
return (x3, y3, z3)
|
||||||
|
|
||||||
|
def mul(self, ps):
|
||||||
|
"""Compute a (multi) point multiplication
|
||||||
|
|
||||||
|
ps is a list of (Jacobian tuple, scalar) pairs.
|
||||||
|
"""
|
||||||
|
r = (0, 1, 0)
|
||||||
|
for i in range(255, -1, -1):
|
||||||
|
r = self.double(r)
|
||||||
|
for (p, n) in ps:
|
||||||
|
if ((n >> i) & 1):
|
||||||
|
r = self.add(r, p)
|
||||||
|
return r
|
||||||
|
|
||||||
|
SECP256K1 = EllipticCurve(2**256 - 2**32 - 977, 0, 7)
|
||||||
|
SECP256K1_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8, 1)
|
||||||
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
||||||
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
|
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
|
||||||
|
|
||||||
# Thx to Sam Devlin for the ctypes magic 64-bit fix.
|
class ECPubKey():
|
||||||
def _check_result(val, func, args):
|
"""A secp256k1 public key"""
|
||||||
if val == 0:
|
|
||||||
raise ValueError
|
|
||||||
else:
|
|
||||||
return ctypes.c_void_p (val)
|
|
||||||
|
|
||||||
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
|
|
||||||
ssl.EC_KEY_new_by_curve_name.errcheck = _check_result
|
|
||||||
|
|
||||||
class CECKey():
|
|
||||||
"""Wrapper around OpenSSL's EC_KEY"""
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1)
|
"""Construct an uninitialized public key"""
|
||||||
|
self.valid = False
|
||||||
|
|
||||||
def __del__(self):
|
def set(self, data):
|
||||||
if ssl:
|
"""Construct a public key from a serialization in compressed or uncompressed format"""
|
||||||
ssl.EC_KEY_free(self.k)
|
if (len(data) == 65 and data[0] == 0x04):
|
||||||
self.k = None
|
p = (int.from_bytes(data[1:33], 'big'), int.from_bytes(data[33:65], 'big'), 1)
|
||||||
|
self.valid = SECP256K1.on_curve(p)
|
||||||
def set_secretbytes(self, secret):
|
if self.valid:
|
||||||
priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new())
|
self.p = p
|
||||||
group = ssl.EC_KEY_get0_group(self.k)
|
self.compressed = False
|
||||||
pub_key = ssl.EC_POINT_new(group)
|
elif (len(data) == 33 and (data[0] == 0x02 or data[0] == 0x03)):
|
||||||
ctx = ssl.BN_CTX_new()
|
x = int.from_bytes(data[1:33], 'big')
|
||||||
if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx):
|
if SECP256K1.is_x_coord(x):
|
||||||
raise ValueError("Could not derive public key from the supplied secret.")
|
p = SECP256K1.lift_x(x)
|
||||||
ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx)
|
# if the oddness of the y co-ord isn't correct, find the other
|
||||||
ssl.EC_KEY_set_private_key(self.k, priv_key)
|
# valid y
|
||||||
ssl.EC_KEY_set_public_key(self.k, pub_key)
|
if (p[1] & 1) != (data[0] & 1):
|
||||||
ssl.EC_POINT_free(pub_key)
|
p = SECP256K1.negate(p)
|
||||||
ssl.BN_CTX_free(ctx)
|
self.p = p
|
||||||
return self.k
|
self.valid = True
|
||||||
|
self.compressed = True
|
||||||
def set_privkey(self, key):
|
else:
|
||||||
self.mb = ctypes.create_string_buffer(key)
|
self.valid = False
|
||||||
return ssl.d2i_ECPrivateKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
|
|
||||||
|
|
||||||
def set_pubkey(self, key):
|
|
||||||
self.mb = ctypes.create_string_buffer(key)
|
|
||||||
return ssl.o2i_ECPublicKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
|
|
||||||
|
|
||||||
def get_privkey(self):
|
|
||||||
size = ssl.i2d_ECPrivateKey(self.k, 0)
|
|
||||||
mb_pri = ctypes.create_string_buffer(size)
|
|
||||||
ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri)))
|
|
||||||
return mb_pri.raw
|
|
||||||
|
|
||||||
def get_pubkey(self):
|
|
||||||
size = ssl.i2o_ECPublicKey(self.k, 0)
|
|
||||||
mb = ctypes.create_string_buffer(size)
|
|
||||||
ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb)))
|
|
||||||
return mb.raw
|
|
||||||
|
|
||||||
def get_raw_ecdh_key(self, other_pubkey):
|
|
||||||
ecdh_keybuffer = ctypes.create_string_buffer(32)
|
|
||||||
r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32,
|
|
||||||
ssl.EC_KEY_get0_public_key(other_pubkey.k),
|
|
||||||
self.k, 0)
|
|
||||||
if r != 32:
|
|
||||||
raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed')
|
|
||||||
return ecdh_keybuffer.raw
|
|
||||||
|
|
||||||
def get_ecdh_key(self, other_pubkey, kdf=lambda k: hashlib.sha256(k).digest()):
|
|
||||||
# FIXME: be warned it's not clear what the kdf should be as a default
|
|
||||||
r = self.get_raw_ecdh_key(other_pubkey)
|
|
||||||
return kdf(r)
|
|
||||||
|
|
||||||
def sign(self, hash, low_s = True):
|
|
||||||
# FIXME: need unit tests for below cases
|
|
||||||
if not isinstance(hash, bytes):
|
|
||||||
raise TypeError('Hash must be bytes instance; got %r' % hash.__class__)
|
|
||||||
if len(hash) != 32:
|
|
||||||
raise ValueError('Hash must be exactly 32 bytes long')
|
|
||||||
|
|
||||||
sig_size0 = ctypes.c_uint32()
|
|
||||||
sig_size0.value = ssl.ECDSA_size(self.k)
|
|
||||||
mb_sig = ctypes.create_string_buffer(sig_size0.value)
|
|
||||||
result = ssl.ECDSA_sign(0, hash, len(hash), mb_sig, ctypes.byref(sig_size0), self.k)
|
|
||||||
assert 1 == result
|
|
||||||
assert mb_sig.raw[0] == 0x30
|
|
||||||
assert mb_sig.raw[1] == sig_size0.value - 2
|
|
||||||
total_size = mb_sig.raw[1]
|
|
||||||
assert mb_sig.raw[2] == 2
|
|
||||||
r_size = mb_sig.raw[3]
|
|
||||||
assert mb_sig.raw[4 + r_size] == 2
|
|
||||||
s_size = mb_sig.raw[5 + r_size]
|
|
||||||
s_value = int.from_bytes(mb_sig.raw[6+r_size:6+r_size+s_size], byteorder='big')
|
|
||||||
if (not low_s) or s_value <= SECP256K1_ORDER_HALF:
|
|
||||||
return mb_sig.raw[:sig_size0.value]
|
|
||||||
else:
|
else:
|
||||||
low_s_value = SECP256K1_ORDER - s_value
|
self.valid = False
|
||||||
low_s_bytes = (low_s_value).to_bytes(33, byteorder='big')
|
|
||||||
while len(low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80:
|
|
||||||
low_s_bytes = low_s_bytes[1:]
|
|
||||||
new_s_size = len(low_s_bytes)
|
|
||||||
new_total_size_byte = (total_size + new_s_size - s_size).to_bytes(1,byteorder='big')
|
|
||||||
new_s_size_byte = (new_s_size).to_bytes(1,byteorder='big')
|
|
||||||
return b'\x30' + new_total_size_byte + mb_sig.raw[2:5+r_size] + new_s_size_byte + low_s_bytes
|
|
||||||
|
|
||||||
def verify(self, hash, sig):
|
|
||||||
"""Verify a DER signature"""
|
|
||||||
return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1
|
|
||||||
|
|
||||||
|
|
||||||
class CPubKey(bytes):
|
|
||||||
"""An encapsulated public key
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
|
|
||||||
is_valid - Corresponds to CPubKey.IsValid()
|
|
||||||
is_fullyvalid - Corresponds to CPubKey.IsFullyValid()
|
|
||||||
is_compressed - Corresponds to CPubKey.IsCompressed()
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __new__(cls, buf, _cec_key=None):
|
|
||||||
self = super(CPubKey, cls).__new__(cls, buf)
|
|
||||||
if _cec_key is None:
|
|
||||||
_cec_key = CECKey()
|
|
||||||
self._cec_key = _cec_key
|
|
||||||
self.is_fullyvalid = _cec_key.set_pubkey(self) != 0
|
|
||||||
return self
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_valid(self):
|
|
||||||
return len(self) > 0
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_compressed(self):
|
def is_compressed(self):
|
||||||
return len(self) == 33
|
return self.compressed
|
||||||
|
|
||||||
def verify(self, hash, sig):
|
@property
|
||||||
return self._cec_key.verify(hash, sig)
|
def is_valid(self):
|
||||||
|
return self.valid
|
||||||
|
|
||||||
def __str__(self):
|
def get_bytes(self):
|
||||||
return repr(self)
|
assert(self.valid)
|
||||||
|
p = SECP256K1.affine(self.p)
|
||||||
|
if p is None:
|
||||||
|
return None
|
||||||
|
if self.compressed:
|
||||||
|
return bytes([0x02 + (p[1] & 1)]) + p[0].to_bytes(32, 'big')
|
||||||
|
else:
|
||||||
|
return bytes([0x04]) + p[0].to_bytes(32, 'big') + p[1].to_bytes(32, 'big')
|
||||||
|
|
||||||
def __repr__(self):
|
def verify_ecdsa(self, sig, msg, low_s=True):
|
||||||
return '%s(%s)' % (self.__class__.__name__, super(CPubKey, self).__repr__())
|
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.
|
||||||
|
|
||||||
|
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
|
||||||
|
ECDSA verifier algorithm"""
|
||||||
|
assert(self.valid)
|
||||||
|
|
||||||
|
# Extract r and s from the DER formatted signature. Return false for
|
||||||
|
# any DER encoding errors.
|
||||||
|
if (sig[1] + 2 != len(sig)):
|
||||||
|
return False
|
||||||
|
if (len(sig) < 4):
|
||||||
|
return False
|
||||||
|
if (sig[0] != 0x30):
|
||||||
|
return False
|
||||||
|
if (sig[2] != 0x02):
|
||||||
|
return False
|
||||||
|
rlen = sig[3]
|
||||||
|
if (len(sig) < 6 + rlen):
|
||||||
|
return False
|
||||||
|
if rlen < 1 or rlen > 33:
|
||||||
|
return False
|
||||||
|
if sig[4] >= 0x80:
|
||||||
|
return False
|
||||||
|
if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
|
||||||
|
return False
|
||||||
|
r = int.from_bytes(sig[4:4+rlen], 'big')
|
||||||
|
if (sig[4+rlen] != 0x02):
|
||||||
|
return False
|
||||||
|
slen = sig[5+rlen]
|
||||||
|
if slen < 1 or slen > 33:
|
||||||
|
return False
|
||||||
|
if (len(sig) != 6 + rlen + slen):
|
||||||
|
return False
|
||||||
|
if sig[6+rlen] >= 0x80:
|
||||||
|
return False
|
||||||
|
if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
|
||||||
|
return False
|
||||||
|
s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
|
||||||
|
|
||||||
|
# Verify that r and s are within the group order
|
||||||
|
if r < 1 or s < 1 or r >= SECP256K1_ORDER or s >= SECP256K1_ORDER:
|
||||||
|
return False
|
||||||
|
if low_s and s >= SECP256K1_ORDER_HALF:
|
||||||
|
return False
|
||||||
|
z = int.from_bytes(msg, 'big')
|
||||||
|
|
||||||
|
# Run verifier algorithm on r, s
|
||||||
|
w = modinv(s, SECP256K1_ORDER)
|
||||||
|
u1 = z*w % SECP256K1_ORDER
|
||||||
|
u2 = r*w % SECP256K1_ORDER
|
||||||
|
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, u1), (self.p, u2)]))
|
||||||
|
if R is None or R[0] != r:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
class ECKey():
|
||||||
|
"""A secp256k1 private key"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.valid = False
|
||||||
|
|
||||||
|
def set(self, secret, compressed):
|
||||||
|
"""Construct a private key object with given 32-byte secret and compressed flag."""
|
||||||
|
assert(len(secret) == 32)
|
||||||
|
secret = int.from_bytes(secret, 'big')
|
||||||
|
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
|
||||||
|
if self.valid:
|
||||||
|
self.secret = secret
|
||||||
|
self.compressed = compressed
|
||||||
|
|
||||||
|
def generate(self, compressed=True):
|
||||||
|
"""Generate a random private key (compressed or uncompressed)."""
|
||||||
|
self.set(random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big'), compressed)
|
||||||
|
|
||||||
|
def get_bytes(self):
|
||||||
|
"""Retrieve the 32-byte representation of this key."""
|
||||||
|
assert(self.valid)
|
||||||
|
return self.secret.to_bytes(32, 'big')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_valid(self):
|
||||||
|
return self.valid
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_compressed(self):
|
||||||
|
return self.compressed
|
||||||
|
|
||||||
|
def get_pubkey(self):
|
||||||
|
"""Compute an ECPubKey object for this secret key."""
|
||||||
|
assert(self.valid)
|
||||||
|
ret = ECPubKey()
|
||||||
|
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
|
||||||
|
ret.p = p
|
||||||
|
ret.valid = True
|
||||||
|
ret.compressed = self.compressed
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def sign_ecdsa(self, msg, low_s=True):
|
||||||
|
"""Construct a DER-encoded ECDSA signature with this key.
|
||||||
|
|
||||||
|
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
|
||||||
|
ECDSA signer algorithm."""
|
||||||
|
assert(self.valid)
|
||||||
|
z = int.from_bytes(msg, 'big')
|
||||||
|
# Note: no RFC6979, but a simple random nonce (some tests rely on distinct transactions for the same operation)
|
||||||
|
k = random.randrange(1, SECP256K1_ORDER)
|
||||||
|
R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)]))
|
||||||
|
r = R[0] % SECP256K1_ORDER
|
||||||
|
s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER
|
||||||
|
if low_s and s > SECP256K1_ORDER_HALF:
|
||||||
|
s = SECP256K1_ORDER - s
|
||||||
|
# Represent in DER format. The byte representations of r and s have
|
||||||
|
# length rounded up (255 bits becomes 32 bytes and 256 bits becomes 33
|
||||||
|
# bytes).
|
||||||
|
rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
|
||||||
|
sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
|
||||||
|
return b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb
|
||||||
|
Loading…
Reference in New Issue
Block a user