342 lines
12 KiB
Python
342 lines
12 KiB
Python
import unicodedata
|
|
from hashlib import pbkdf2_hmac
|
|
from secrets import token_bytes
|
|
from sys import platform
|
|
from typing import List, Optional, Tuple
|
|
|
|
import keyring as keyring_main
|
|
import pkg_resources
|
|
from bitstring import BitArray
|
|
from blspy import AugSchemeMPL, G1Element, PrivateKey
|
|
from keyrings.cryptfile.cryptfile import CryptFileKeyring
|
|
|
|
from chia.util.hash import std_hash
|
|
|
|
MAX_KEYS = 100
|
|
|
|
if platform == "win32" or platform == "cygwin":
|
|
import keyring.backends.Windows
|
|
|
|
keyring.set_keyring(keyring.backends.Windows.WinVaultKeyring())
|
|
elif platform == "darwin":
|
|
import keyring.backends.macOS
|
|
|
|
keyring.set_keyring(keyring.backends.macOS.Keyring())
|
|
elif platform == "linux":
|
|
keyring = CryptFileKeyring()
|
|
keyring.keyring_key = "your keyring password" # type: ignore
|
|
else:
|
|
keyring = keyring_main
|
|
|
|
|
|
def bip39_word_list() -> str:
|
|
return pkg_resources.resource_string(__name__, "english.txt").decode()
|
|
|
|
|
|
def generate_mnemonic() -> str:
|
|
mnemonic_bytes = token_bytes(32)
|
|
mnemonic = bytes_to_mnemonic(mnemonic_bytes)
|
|
return mnemonic
|
|
|
|
|
|
def bytes_to_mnemonic(mnemonic_bytes: bytes) -> str:
|
|
if len(mnemonic_bytes) not in [16, 20, 24, 28, 32]:
|
|
raise ValueError(
|
|
f"Data length should be one of the following: [16, 20, 24, 28, 32], but it is {len(mnemonic_bytes)}."
|
|
)
|
|
word_list = bip39_word_list().splitlines()
|
|
CS = len(mnemonic_bytes) // 4
|
|
|
|
checksum = BitArray(bytes(std_hash(mnemonic_bytes)))[:CS]
|
|
|
|
bitarray = BitArray(mnemonic_bytes) + checksum
|
|
mnemonics = []
|
|
assert len(bitarray) % 11 == 0
|
|
|
|
for i in range(0, len(bitarray) // 11):
|
|
start = i * 11
|
|
end = start + 11
|
|
bits = bitarray[start:end]
|
|
m_word_position = bits.uint
|
|
m_word = word_list[m_word_position]
|
|
mnemonics.append(m_word)
|
|
|
|
return " ".join(mnemonics)
|
|
|
|
|
|
def bytes_from_mnemonic(mnemonic_str: str) -> bytes:
|
|
mnemonic: List[str] = mnemonic_str.split(" ")
|
|
if len(mnemonic) not in [12, 15, 18, 21, 24]:
|
|
raise ValueError("Invalid mnemonic length")
|
|
|
|
word_list = {word: i for i, word in enumerate(bip39_word_list().splitlines())}
|
|
bit_array = BitArray()
|
|
for i in range(0, len(mnemonic)):
|
|
word = mnemonic[i]
|
|
if word not in word_list:
|
|
raise ValueError(f"'{word}' is not in the mnemonic dictionary; may be misspelled")
|
|
value = word_list[word]
|
|
bit_array.append(BitArray(uint=value, length=11))
|
|
|
|
CS: int = len(mnemonic) // 3
|
|
ENT: int = len(mnemonic) * 11 - CS
|
|
assert len(bit_array) == len(mnemonic) * 11
|
|
assert ENT % 32 == 0
|
|
|
|
entropy_bytes = bit_array[:ENT].bytes
|
|
checksum_bytes = bit_array[ENT:]
|
|
checksum = BitArray(std_hash(entropy_bytes))[:CS]
|
|
|
|
assert len(checksum_bytes) == CS
|
|
|
|
if checksum != checksum_bytes:
|
|
raise ValueError("Invalid order of mnemonic words")
|
|
|
|
return entropy_bytes
|
|
|
|
|
|
def mnemonic_to_seed(mnemonic: str, passphrase: str) -> bytes:
|
|
"""
|
|
Uses BIP39 standard to derive a seed from entropy bytes.
|
|
"""
|
|
salt_str: str = "mnemonic" + passphrase
|
|
salt = unicodedata.normalize("NFKD", salt_str).encode("utf-8")
|
|
mnemonic_normalized = unicodedata.normalize("NFKD", mnemonic).encode("utf-8")
|
|
seed = pbkdf2_hmac("sha512", mnemonic_normalized, salt, 2048)
|
|
|
|
assert len(seed) == 64
|
|
return seed
|
|
|
|
|
|
class Keychain:
|
|
"""
|
|
The keychain stores two types of keys: private keys, which are PrivateKeys from blspy,
|
|
and private key seeds, which are bytes objects that are used as a seed to construct
|
|
PrivateKeys. Private key seeds are converted to mnemonics when shown to users.
|
|
|
|
Both types of keys are stored as hex strings in the python keyring, and the implementation of
|
|
the keyring depends on OS. Both types of keys can be added, and get_private_keys returns a
|
|
list of all keys.
|
|
"""
|
|
|
|
testing: bool
|
|
user: str
|
|
|
|
def __init__(self, user: str = "user-chia-1.8", testing: bool = False):
|
|
self.testing = testing
|
|
self.user = user
|
|
|
|
def _get_service(self) -> str:
|
|
"""
|
|
The keychain stores keys under a different name for tests.
|
|
"""
|
|
if self.testing:
|
|
return f"chia-{self.user}-test"
|
|
else:
|
|
return f"chia-{self.user}"
|
|
|
|
def _get_pk_and_entropy(self, user: str) -> Optional[Tuple[G1Element, bytes]]:
|
|
"""
|
|
Returns the keychain contents for a specific 'user' (key index). The contents
|
|
include an G1Element and the entropy required to generate the private key.
|
|
Note that generating the actual private key also requires the passphrase.
|
|
"""
|
|
read_str = keyring.get_password(self._get_service(), user)
|
|
if read_str is None or len(read_str) == 0:
|
|
return None
|
|
str_bytes = bytes.fromhex(read_str)
|
|
return (
|
|
G1Element.from_bytes(str_bytes[: G1Element.SIZE]),
|
|
str_bytes[G1Element.SIZE :], # flake8: noqa
|
|
)
|
|
|
|
def _get_private_key_user(self, index: int) -> str:
|
|
"""
|
|
Returns the keychain user string for a key index.
|
|
"""
|
|
if self.testing:
|
|
return f"wallet-{self.user}-test-{index}"
|
|
else:
|
|
return f"wallet-{self.user}-{index}"
|
|
|
|
def _get_free_private_key_index(self) -> int:
|
|
"""
|
|
Get the index of the first free spot in the keychain.
|
|
"""
|
|
index = 0
|
|
while True:
|
|
pk = self._get_private_key_user(index)
|
|
pkent = self._get_pk_and_entropy(pk)
|
|
if pkent is None:
|
|
return index
|
|
index += 1
|
|
|
|
def add_private_key(self, mnemonic: str, passphrase: str) -> PrivateKey:
|
|
"""
|
|
Adds a private key to the keychain, with the given entropy and passphrase. The
|
|
keychain itself will store the public key, and the entropy bytes,
|
|
but not the passphrase.
|
|
"""
|
|
seed = mnemonic_to_seed(mnemonic, passphrase)
|
|
entropy = bytes_from_mnemonic(mnemonic)
|
|
index = self._get_free_private_key_index()
|
|
key = AugSchemeMPL.key_gen(seed)
|
|
fingerprint = key.get_g1().get_fingerprint()
|
|
|
|
if fingerprint in [pk.get_fingerprint() for pk in self.get_all_public_keys()]:
|
|
# Prevents duplicate add
|
|
return key
|
|
|
|
keyring.set_password(
|
|
self._get_service(),
|
|
self._get_private_key_user(index),
|
|
bytes(key.get_g1()).hex() + entropy.hex(),
|
|
)
|
|
return key
|
|
|
|
def get_first_private_key(self, passphrases: List[str] = [""]) -> Optional[Tuple[PrivateKey, bytes]]:
|
|
"""
|
|
Returns the first key in the keychain that has one of the passed in passphrases.
|
|
"""
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
for pp in passphrases:
|
|
mnemonic = bytes_to_mnemonic(ent)
|
|
seed = mnemonic_to_seed(mnemonic, pp)
|
|
key = AugSchemeMPL.key_gen(seed)
|
|
if key.get_g1() == pk:
|
|
return (key, ent)
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
return None
|
|
|
|
def get_private_key_by_fingerprint(
|
|
self, fingerprint: int, passphrases: List[str] = [""]
|
|
) -> Optional[Tuple[PrivateKey, bytes]]:
|
|
"""
|
|
Return first private key which have the given public key fingerprint.
|
|
"""
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
for pp in passphrases:
|
|
mnemonic = bytes_to_mnemonic(ent)
|
|
seed = mnemonic_to_seed(mnemonic, pp)
|
|
key = AugSchemeMPL.key_gen(seed)
|
|
if pk.get_fingerprint() == fingerprint:
|
|
return (key, ent)
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
return None
|
|
|
|
def get_all_private_keys(self, passphrases: List[str] = [""]) -> List[Tuple[PrivateKey, bytes]]:
|
|
"""
|
|
Returns all private keys which can be retrieved, with the given passphrases.
|
|
A tuple of key, and entropy bytes (i.e. mnemonic) is returned for each key.
|
|
"""
|
|
all_keys: List[Tuple[PrivateKey, bytes]] = []
|
|
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
for pp in passphrases:
|
|
mnemonic = bytes_to_mnemonic(ent)
|
|
seed = mnemonic_to_seed(mnemonic, pp)
|
|
key = AugSchemeMPL.key_gen(seed)
|
|
if key.get_g1() == pk:
|
|
all_keys.append((key, ent))
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
return all_keys
|
|
|
|
def get_all_public_keys(self) -> List[G1Element]:
|
|
"""
|
|
Returns all public keys.
|
|
"""
|
|
all_keys: List[Tuple[G1Element, bytes]] = []
|
|
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
all_keys.append(pk)
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
return all_keys
|
|
|
|
def get_first_public_key(self) -> Optional[G1Element]:
|
|
"""
|
|
Returns the first public key.
|
|
"""
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
return pk
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
return None
|
|
|
|
def delete_key_by_fingerprint(self, fingerprint: int):
|
|
"""
|
|
Deletes all keys which have the given public key fingerprint.
|
|
"""
|
|
|
|
index = 0
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
while index <= MAX_KEYS:
|
|
if pkent is not None:
|
|
pk, ent = pkent
|
|
if pk.get_fingerprint() == fingerprint:
|
|
keyring.delete_password(self._get_service(), self._get_private_key_user(index))
|
|
index += 1
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
|
|
def delete_all_keys(self):
|
|
"""
|
|
Deletes all keys from the keychain.
|
|
"""
|
|
|
|
index = 0
|
|
delete_exception = False
|
|
pkent = None
|
|
while True:
|
|
try:
|
|
pkent = self._get_pk_and_entropy(self._get_private_key_user(index))
|
|
keyring.delete_password(self._get_service(), self._get_private_key_user(index))
|
|
except Exception:
|
|
# Some platforms might throw on no existing key
|
|
delete_exception = True
|
|
|
|
# Stop when there are no more keys to delete
|
|
if (pkent is None or delete_exception) and index > MAX_KEYS:
|
|
break
|
|
index += 1
|
|
|
|
index = 0
|
|
delete_exception = True
|
|
pkent = None
|
|
while True:
|
|
try:
|
|
pkent = self._get_pk_and_entropy(
|
|
self._get_private_key_user(index)
|
|
) # changed from _get_fingerprint_and_entropy to _get_pk_and_entropy - GH
|
|
keyring.delete_password(self._get_service(), self._get_private_key_user(index))
|
|
except Exception:
|
|
# Some platforms might throw on no existing key
|
|
delete_exception = True
|
|
|
|
# Stop when there are no more keys to delete
|
|
if (pkent is None or delete_exception) and index > MAX_KEYS:
|
|
break
|
|
index += 1
|