mirror of
https://github.com/dashpay/dash.git
synced 2024-12-30 14:25:53 +01:00
4ae641ca18
67d6ee1
remove redundant tests in p2p-segwit.py (Johnson Lau)9260085
test segwit uncompressed key fixes (Johnson Lau)248f3a7
Fix ismine and addwitnessaddress: no uncompressed keys in segwit (Pieter Wuille)b811124
[qa] Add tests for uncompressed pubkeys in segwit (Suhas Daftuar)9f0397a
Make test framework produce lowS signatures (Johnson Lau)4c0c25a
Require compressed keys in segwit as policy and disable signing with uncompressed keys for segwit scripts (Johnson Lau)3ade2f6
Add standard limits for P2WSH with tests (Johnson Lau)
227 lines
8.0 KiB
Python
227 lines
8.0 KiB
Python
# Copyright (c) 2011 Sam Rushing
|
|
"""ECC secp256k1 OpenSSL wrapper.
|
|
|
|
WARNING: This module does not mlock() secrets; your private keys may end up on
|
|
disk in swap! Use with caution!
|
|
|
|
This file is modified from python-bitcoinlib.
|
|
"""
|
|
|
|
import ctypes
|
|
import ctypes.util
|
|
import hashlib
|
|
|
|
ssl = ctypes.cdll.LoadLibrary(ctypes.util.find_library ('ssl') or 'libeay32')
|
|
|
|
ssl.BN_new.restype = ctypes.c_void_p
|
|
ssl.BN_new.argtypes = []
|
|
|
|
ssl.BN_bin2bn.restype = ctypes.c_void_p
|
|
ssl.BN_bin2bn.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p]
|
|
|
|
ssl.BN_CTX_free.restype = None
|
|
ssl.BN_CTX_free.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.BN_CTX_new.restype = ctypes.c_void_p
|
|
ssl.BN_CTX_new.argtypes = []
|
|
|
|
ssl.ECDH_compute_key.restype = ctypes.c_int
|
|
ssl.ECDH_compute_key.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
ssl.ECDSA_sign.restype = ctypes.c_int
|
|
ssl.ECDSA_sign.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
ssl.ECDSA_verify.restype = ctypes.c_int
|
|
ssl.ECDSA_verify.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p]
|
|
|
|
ssl.EC_KEY_free.restype = None
|
|
ssl.EC_KEY_free.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
|
|
ssl.EC_KEY_new_by_curve_name.argtypes = [ctypes.c_int]
|
|
|
|
ssl.EC_KEY_get0_group.restype = ctypes.c_void_p
|
|
ssl.EC_KEY_get0_group.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.EC_KEY_get0_public_key.restype = ctypes.c_void_p
|
|
ssl.EC_KEY_get0_public_key.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.EC_KEY_set_private_key.restype = ctypes.c_int
|
|
ssl.EC_KEY_set_private_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
ssl.EC_KEY_set_conv_form.restype = None
|
|
ssl.EC_KEY_set_conv_form.argtypes = [ctypes.c_void_p, ctypes.c_int]
|
|
|
|
ssl.EC_KEY_set_public_key.restype = ctypes.c_int
|
|
ssl.EC_KEY_set_public_key.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
ssl.i2o_ECPublicKey.restype = ctypes.c_void_p
|
|
ssl.i2o_ECPublicKey.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
ssl.EC_POINT_new.restype = ctypes.c_void_p
|
|
ssl.EC_POINT_new.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.EC_POINT_free.restype = None
|
|
ssl.EC_POINT_free.argtypes = [ctypes.c_void_p]
|
|
|
|
ssl.EC_POINT_mul.restype = ctypes.c_int
|
|
ssl.EC_POINT_mul.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
# this specifies the curve used with ECDSA.
|
|
NID_secp256k1 = 714 # from openssl/obj_mac.h
|
|
|
|
SECP256K1_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
|
SECP256K1_ORDER_HALF = SECP256K1_ORDER // 2
|
|
|
|
# Thx to Sam Devlin for the ctypes magic 64-bit fix.
|
|
def _check_result(val, func, args):
|
|
if val == 0:
|
|
raise ValueError
|
|
else:
|
|
return ctypes.c_void_p (val)
|
|
|
|
ssl.EC_KEY_new_by_curve_name.restype = ctypes.c_void_p
|
|
ssl.EC_KEY_new_by_curve_name.errcheck = _check_result
|
|
|
|
class CECKey():
|
|
"""Wrapper around OpenSSL's EC_KEY"""
|
|
|
|
POINT_CONVERSION_COMPRESSED = 2
|
|
POINT_CONVERSION_UNCOMPRESSED = 4
|
|
|
|
def __init__(self):
|
|
self.k = ssl.EC_KEY_new_by_curve_name(NID_secp256k1)
|
|
|
|
def __del__(self):
|
|
if ssl:
|
|
ssl.EC_KEY_free(self.k)
|
|
self.k = None
|
|
|
|
def set_secretbytes(self, secret):
|
|
priv_key = ssl.BN_bin2bn(secret, 32, ssl.BN_new())
|
|
group = ssl.EC_KEY_get0_group(self.k)
|
|
pub_key = ssl.EC_POINT_new(group)
|
|
ctx = ssl.BN_CTX_new()
|
|
if not ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx):
|
|
raise ValueError("Could not derive public key from the supplied secret.")
|
|
ssl.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx)
|
|
ssl.EC_KEY_set_private_key(self.k, priv_key)
|
|
ssl.EC_KEY_set_public_key(self.k, pub_key)
|
|
ssl.EC_POINT_free(pub_key)
|
|
ssl.BN_CTX_free(ctx)
|
|
return self.k
|
|
|
|
def set_privkey(self, key):
|
|
self.mb = ctypes.create_string_buffer(key)
|
|
return ssl.d2i_ECPrivateKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
|
|
|
|
def set_pubkey(self, key):
|
|
self.mb = ctypes.create_string_buffer(key)
|
|
return ssl.o2i_ECPublicKey(ctypes.byref(self.k), ctypes.byref(ctypes.pointer(self.mb)), len(key))
|
|
|
|
def get_privkey(self):
|
|
size = ssl.i2d_ECPrivateKey(self.k, 0)
|
|
mb_pri = ctypes.create_string_buffer(size)
|
|
ssl.i2d_ECPrivateKey(self.k, ctypes.byref(ctypes.pointer(mb_pri)))
|
|
return mb_pri.raw
|
|
|
|
def get_pubkey(self):
|
|
size = ssl.i2o_ECPublicKey(self.k, 0)
|
|
mb = ctypes.create_string_buffer(size)
|
|
ssl.i2o_ECPublicKey(self.k, ctypes.byref(ctypes.pointer(mb)))
|
|
return mb.raw
|
|
|
|
def get_raw_ecdh_key(self, other_pubkey):
|
|
ecdh_keybuffer = ctypes.create_string_buffer(32)
|
|
r = ssl.ECDH_compute_key(ctypes.pointer(ecdh_keybuffer), 32,
|
|
ssl.EC_KEY_get0_public_key(other_pubkey.k),
|
|
self.k, 0)
|
|
if r != 32:
|
|
raise Exception('CKey.get_ecdh_key(): ECDH_compute_key() failed')
|
|
return ecdh_keybuffer.raw
|
|
|
|
def get_ecdh_key(self, other_pubkey, kdf=lambda k: hashlib.sha256(k).digest()):
|
|
# FIXME: be warned it's not clear what the kdf should be as a default
|
|
r = self.get_raw_ecdh_key(other_pubkey)
|
|
return kdf(r)
|
|
|
|
def sign(self, hash, low_s = True):
|
|
# FIXME: need unit tests for below cases
|
|
if not isinstance(hash, bytes):
|
|
raise TypeError('Hash must be bytes instance; got %r' % hash.__class__)
|
|
if len(hash) != 32:
|
|
raise ValueError('Hash must be exactly 32 bytes long')
|
|
|
|
sig_size0 = ctypes.c_uint32()
|
|
sig_size0.value = ssl.ECDSA_size(self.k)
|
|
mb_sig = ctypes.create_string_buffer(sig_size0.value)
|
|
result = ssl.ECDSA_sign(0, hash, len(hash), mb_sig, ctypes.byref(sig_size0), self.k)
|
|
assert 1 == result
|
|
assert mb_sig.raw[0] == 0x30
|
|
assert mb_sig.raw[1] == sig_size0.value - 2
|
|
total_size = mb_sig.raw[1]
|
|
assert mb_sig.raw[2] == 2
|
|
r_size = mb_sig.raw[3]
|
|
assert mb_sig.raw[4 + r_size] == 2
|
|
s_size = mb_sig.raw[5 + r_size]
|
|
s_value = int.from_bytes(mb_sig.raw[6+r_size:6+r_size+s_size], byteorder='big')
|
|
if (not low_s) or s_value <= SECP256K1_ORDER_HALF:
|
|
return mb_sig.raw[:sig_size0.value]
|
|
else:
|
|
low_s_value = SECP256K1_ORDER - s_value
|
|
low_s_bytes = (low_s_value).to_bytes(33, byteorder='big')
|
|
while len(low_s_bytes) > 1 and low_s_bytes[0] == 0 and low_s_bytes[1] < 0x80:
|
|
low_s_bytes = low_s_bytes[1:]
|
|
new_s_size = len(low_s_bytes)
|
|
new_total_size_byte = (total_size + new_s_size - s_size).to_bytes(1,byteorder='big')
|
|
new_s_size_byte = (new_s_size).to_bytes(1,byteorder='big')
|
|
return b'\x30' + new_total_size_byte + mb_sig.raw[2:5+r_size] + new_s_size_byte + low_s_bytes
|
|
|
|
def verify(self, hash, sig):
|
|
"""Verify a DER signature"""
|
|
return ssl.ECDSA_verify(0, hash, len(hash), sig, len(sig), self.k) == 1
|
|
|
|
def set_compressed(self, compressed):
|
|
if compressed:
|
|
form = self.POINT_CONVERSION_COMPRESSED
|
|
else:
|
|
form = self.POINT_CONVERSION_UNCOMPRESSED
|
|
ssl.EC_KEY_set_conv_form(self.k, form)
|
|
|
|
|
|
class CPubKey(bytes):
|
|
"""An encapsulated public key
|
|
|
|
Attributes:
|
|
|
|
is_valid - Corresponds to CPubKey.IsValid()
|
|
is_fullyvalid - Corresponds to CPubKey.IsFullyValid()
|
|
is_compressed - Corresponds to CPubKey.IsCompressed()
|
|
"""
|
|
|
|
def __new__(cls, buf, _cec_key=None):
|
|
self = super(CPubKey, cls).__new__(cls, buf)
|
|
if _cec_key is None:
|
|
_cec_key = CECKey()
|
|
self._cec_key = _cec_key
|
|
self.is_fullyvalid = _cec_key.set_pubkey(self) != 0
|
|
return self
|
|
|
|
@property
|
|
def is_valid(self):
|
|
return len(self) > 0
|
|
|
|
@property
|
|
def is_compressed(self):
|
|
return len(self) == 33
|
|
|
|
def verify(self, hash, sig):
|
|
return self._cec_key.verify(hash, sig)
|
|
|
|
def __str__(self):
|
|
return repr(self)
|
|
|
|
def __repr__(self):
|
|
return '%s(%s)' % (self.__class__.__name__, super(CPubKey, self).__repr__())
|
|
|