ledger-app-monero/tests/monero_client/monero_cmd.py

531 lines
18 KiB
Python

"""Simple client for Monero application
From test seed of 12 words: "abandon abandon abandon abandon abandon abandon
abandon abandon abandon abandon abandon about".
* view public:
A = 865cbfab852a1d1ccdfc7328e4dac90f78fc2154257d07522e9b79e637326dfa
* spend public:
B = dae41d6b13568fdd71ec3d20c2f614c65fe819f36ca5da8d24df3bd89b2bad9d
* view secret:
a = 0f3fe25d0c6d4c94dde0c0bcc214b233e9c72927f813728b0f01f28f9d5e1201
* spend secret:
b = 3b094ca7218f175e91fa2402b4ae239a2fe8262792a3e718533a1a357a1e4109
* Stage net address:
5A8FgbMkmG2e3J41sBdjvjaBUyz8qHohsQcGtRf63qEUTMBv
mA45fpp5pSacMdSg7A3b71RejLzB8EkGbfjp5PELVHCRUaE
"""
import struct
from typing import Tuple
from .monero_crypto_cmd import MoneroCryptoCmd
from .monero_types import InsType, Type, SigType
from .crypto.hmac import hmac_sha256
from .exception.device_error import DeviceError
from .utils.varint import encode_varint
from .utils.utils import get_nano_review_instructions
from pathlib import Path
from ragger.navigator import NavInsID, NavIns
PROTOCOL_VERSION: int = 3
TESTS_ROOT_DIR = Path(__file__).parent.parent
class MoneroCmd(MoneroCryptoCmd):
def __init__(self, debug, backend) -> None:
self.backend = backend
MoneroCryptoCmd.__init__(self, backend, debug)
def reset_and_get_version(self,
monero_client_version: bytes
) -> Tuple[int, int, int]:
ins: InsType = InsType.INS_RESET
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=0,
payload=monero_client_version)
sw, response = self.device.recv() # type: int, bytes
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 3
major, minor, patch = struct.unpack(
"BBB",
response
) # type: int, int, int
self.is_in_tx_mode = False
return major, minor, patch
def set_signature_mode(self, sig_type: SigType) -> int:
ins: InsType = InsType.INS_SET_SIGNATURE_MODE
payload: bytes = struct.pack("B", sig_type)
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=1,
p2=0,
option=0,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 4
sig_mode, *_ = struct.unpack(">I", response)
if sig_mode not in (1, 2):
raise Exception("Signature mode should be 1 (real) or 2 (fake).")
return SigType.REAL if sig_mode else SigType.FAKE
def open_tx(self) -> Tuple[bytes, bytes, bytes, bytes]:
ins: InsType = InsType.INS_OPEN_TX
# 4 bytes
account: bytes = struct.pack(">I", 0)
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=1,
p2=0,
option=0,
payload=account)
sw, response = self.device.recv() # type: int, bytes
# Wait for internal backend screen to be up to date before continuing
self.backend.wait_for_screen_change()
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
self.is_in_tx_mode = True
assert len(response) == 224
tx_pub_key: bytes = response[:32] # R = r.G
_tx_priv_key: bytes = response[32:64] # r (encrypted)
hmac_tx_priv_key: bytes = response[64:96]
fake_view_key: bytes = response[96:128]
hmac_fake_view_key: bytes = response[128:160]
fake_spend_key: bytes = response[160:192]
hmac_fake_spend_key: bytes = response[192:]
assert (hmac_tx_priv_key == hmac_sha256(_tx_priv_key,
MoneroCryptoCmd.HMAC_KEY,
Type.SCALAR))
assert (hmac_fake_view_key == hmac_sha256(fake_view_key,
MoneroCryptoCmd.HMAC_KEY,
Type.SCALAR))
assert (hmac_fake_spend_key == hmac_sha256(fake_spend_key,
MoneroCryptoCmd.HMAC_KEY,
Type.SCALAR))
return (tx_pub_key,
_tx_priv_key,
fake_view_key,
fake_spend_key)
def close_tx(self) -> None:
ins: InsType = InsType.INS_CLOSE_TX
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=0)
sw, response = self.device.recv() # type: int, bytes
# Wait for internal backend screen to be up to date before continuing
self.backend.wait_for_screen_change()
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
self.is_in_tx_mode = False
assert len(response) == 0
def gen_txout_keys(self,
_tx_priv_key: bytes,
tx_pub_key: bytes,
dst_pub_view_key: bytes,
dst_pub_spend_key: bytes,
output_index: int,
is_change_addr: bool,
is_subaddress: bool) -> Tuple[bytes, bytes]:
ins: InsType = InsType.INS_GEN_TXOUT_KEYS
payload: bytes = b"".join((
struct.pack('>I', 0), # tx_version
_tx_priv_key, # r (encrypted)
hmac_sha256(_tx_priv_key,
MoneroCryptoCmd.HMAC_KEY,
Type.SCALAR), # hmac
tx_pub_key, # R
dst_pub_view_key, # A_out
dst_pub_spend_key, # B_out
struct.pack('>I', output_index),
b"\x01" if is_change_addr else b"\x00",
b"\x01" if is_subaddress else b"\x00",
b"\x00" * 33, # additional_txkeys
b"\x00" * 33, # use_view_tags
))
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=0,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 96
_ak_amount = response[:32]
hmac_ak_amount = response[32:64]
out_ephemeral_pub_key = response[64:96]
assert (hmac_ak_amount == hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY))
return _ak_amount, out_ephemeral_pub_key
def prefix_hash_init(self, backend, test_name, firmware, navigator, version: int, timelock: int) -> None:
ins: InsType = InsType.INS_PREFIX_HASH
payload: bytes = b"".join([
encode_varint(version),
encode_varint(timelock)
])
if firmware.device == "nanos":
instructions = get_nano_review_instructions(1)
elif firmware.device.startswith("nano"):
instructions = get_nano_review_instructions(1)
else:
instructions = [
NavIns(NavInsID.SWIPE_CENTER_TO_LEFT)
]
with self.device.send_async(cla=PROTOCOL_VERSION,
ins=ins,
p1=1,
p2=0,
option=0,
payload=payload):
navigator.navigate_and_compare(TESTS_ROOT_DIR,
test_name + "_hash_init",
instructions)
sw, response = self.device.async_response() # type: int, bytes
# Screen change already waited in navigate_and_compare() above
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins, message="P1=1 (init)")
assert len(response) == 0
def prefix_hash_update(self, index: int, payload: bytes, is_last: bool) -> bytes:
ins: InsType = InsType.INS_PREFIX_HASH
self.device.send(cla=PROTOCOL_VERSION,
ins=InsType.INS_PREFIX_HASH,
p1=2,
p2=index,
option=0 if is_last else 0x80,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins, message="P1=2 (update)")
if is_last:
assert len(response) == 32
else:
assert len(response) == 0
return response
def gen_commitment_mask(self, _ak_amount: bytes) -> bytes:
ins: InsType = InsType.INS_GEN_COMMITMENT_MASK
payload: bytes = b"".join([
_ak_amount,
hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY)
])
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=0,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 32
return response # mask
def blind(self,
_ak_amount: bytes,
mask: bytes,
amount: int,
is_short: bool) -> Tuple[bytes, bytes]:
ins: InsType = InsType.INS_BLIND
payload: bytes = b"".join([
_ak_amount,
hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY),
mask,
amount.to_bytes(32, byteorder="big")
])
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=2 if is_short else 0,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 64
blinded_amount, blinded_mask = response[:32], response[32:]
return blinded_mask, blinded_amount
def unblind(self,
_ak_amount: bytes,
blinded_mask: bytes,
blinded_amount: bytes,
is_short: bool) -> Tuple[bytes, bytes]:
ins: InsType = InsType.INS_UNBLIND
payload: bytes = b"".join([
_ak_amount,
hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY),
blinded_mask,
blinded_amount
])
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=0,
p2=0,
option=2 if is_short else 0,
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins)
assert len(response) == 64
amount, mask = response[:32], response[32:]
return mask, amount
def validate_prehash_init(self,
backend,
test_name,
firmware,
navigator,
index: int,
txntype: int,
txnfee: int) -> None:
ins: InsType = InsType.INS_VALIDATE
# txntype is skipped in the app
payload: bytes = struct.pack("B", txntype) + encode_varint(txnfee)
if firmware.device == "nanos":
instructions = get_nano_review_instructions(1)
elif firmware.device.startswith("nano"):
instructions = get_nano_review_instructions(1)
else:
instructions = [
NavIns(NavInsID.USE_CASE_REVIEW_TAP)
]
with self.device.send_async(cla=PROTOCOL_VERSION,
ins=ins,
p1=1,
p2=index,
option=0,
payload=payload):
if firmware.device.startswith("nano"):
navigator.navigate_and_compare(TESTS_ROOT_DIR,
test_name + "_prehash_init",
instructions)
else:
pass
sw, response = self.device.async_response() # type: int, bytes
# Screen change already waited in navigate_and_compare() above
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins, message="P1=1 (init)")
assert len(response) == 0
def validate_prehash_update(self,
backend,
test_name,
firmware,
navigator,
index: int,
is_short: bool,
is_change_addr: bool,
is_subaddress: bool,
dst_pub_view_key: bytes,
dst_pub_spend_key: bytes,
_ak_amount: bytes,
commitment: bytes,
blinded_mask: bytes,
blinded_amount: bytes,
is_last: bool) -> None:
ins: InsType = InsType.INS_VALIDATE
payload: bytes = b"".join((
b"\x01" if is_subaddress else b"\x00",
b"\x01" if is_change_addr else b"\x00",
dst_pub_view_key,
dst_pub_spend_key,
_ak_amount,
hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY),
commitment,
blinded_mask,
blinded_amount
))
if firmware.device == "nanos":
instructions = get_nano_review_instructions(7)
elif firmware.device.startswith("nano"):
instructions = get_nano_review_instructions(3)
else:
instructions = [
NavIns(NavInsID.SWIPE_CENTER_TO_LEFT),
NavIns(NavInsID.USE_CASE_REVIEW_TAP),
NavIns(NavInsID.USE_CASE_REVIEW_CONFIRM)
]
backend.wait_for_text_on_screen("Processing")
with self.device.send_async(cla=PROTOCOL_VERSION,
ins=ins,
p1=2,
p2=index,
option=(0 if is_last else 0x80) | (
0x02 if is_short else 0),
payload=payload):
navigator.navigate_and_compare(TESTS_ROOT_DIR,
test_name + "_prehash_update",
instructions,
screen_change_after_last_instruction=False, timeout=10000)
sw, response = self.device.async_response() # type: int, bytes
# Screen change already waited in navigate_and_compare() above
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins, message="P1=2 (update)")
assert len(response) == 0
def validate_prehash_finalize(self,
index: int,
is_short: bool,
is_change_addr: bool,
is_subaddress: bool,
dst_pub_view_key: bytes,
dst_pub_spend_key: bytes,
_ak_amount: bytes,
commitment: bytes,
blinded_mask: bytes,
blinded_amount: bytes,
is_last: bool) -> None:
ins: InsType = InsType.INS_VALIDATE
payload: bytes = b"".join((
b"\x01" if is_subaddress else b"\x00",
b"\x01" if is_change_addr else b"\x00",
dst_pub_view_key,
dst_pub_spend_key,
_ak_amount,
hmac_sha256(_ak_amount,
MoneroCryptoCmd.HMAC_KEY,
Type.AMOUNT_KEY),
commitment,
blinded_mask,
blinded_amount
))
self.device.send(cla=PROTOCOL_VERSION,
ins=ins,
p1=3,
p2=index,
option=(0 if is_last else 0x80) | (0x02 if is_short else 0),
payload=payload)
sw, response = self.device.recv() # type: int, bytes
# No screen change expected
if not sw & 0x9000:
raise DeviceError(error_code=sw, ins=ins, message="P1=3 (finalize)")
assert len(response) == 32