matrix-nio/nio/crypto/sas.py

729 lines
24 KiB
Python

# -*- coding: utf-8 -*-
# Copyright © 2019 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
from builtins import bytes, super
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Tuple
from uuid import uuid4
import olm
from future.moves.itertools import zip_longest
from ..api import Api
from ..event_builders import ToDeviceMessage
from ..events import KeyVerificationEvent, KeyVerificationStart
from ..exceptions import LocalProtocolError
from .device import OlmDevice
class SasState(Enum):
"""Short Authentication String enum.
This enum tracks the current state of our verification process.
"""
created = 0
started = 1
accepted = 2
key_received = 3
mac_received = 4
canceled = 5
class Sas(olm.Sas):
"""Matrix Short Authentication String class.
This class implements a state machine to handle device verification using
short authentication strings.
Attributes:
we_started_it (bool): Is true if the verification process was started
by us, otherwise false.
sas_accepted (bool): Is true if we accepted that the short
authentication string matches on both devices.
verified_devices(List[str]): The list of device ids that were verified
during the verification process.
Args:
own_user (str): The user id of our own user.
own_device (str): The device id of our own user.
own_fp_key (str): The fingerprint key of our own device that will
be verified by the other client.
other_olm_device (OlmDevice): The OlmDevice which we would like to
verify.
transaction_id (str, optional): A string that will uniquely identify
this verification process. A random and unique string will be
generated if one isn't provided.
short_auth_string (List[str], optional): A list of valid short
authentication methods that the client would like to allow for this
authentication session. By default the 'emoji' and 'decimal'
methods are allowed.
"""
_sas_method_v1 = "m.sas.v1"
_key_agreement_v1 = "curve25519"
_key_agreement_v2 = "curve25519-hkdf-sha256"
_key_agreeemnt_protocols = [_key_agreement_v1, _key_agreement_v2]
_hash_v1 = "sha256"
_mac_normal = "hkdf-hmac-sha256"
_mac_old = "hmac-sha256"
_mac_v1 = [_mac_normal, _mac_old]
_strings_v1 = ["emoji", "decimal"]
_user_cancel_error = ("m.user", "Canceled by user")
_timeout_error = ("m.timeout", "Timed out")
_txid_error = ("m.unknown_transaction", "Unknown transaction")
_unknonw_method_error = ("m.unknown_method", "Unknown method")
_unexpected_message_error = ("m.unexpected_message", "Unexpected message")
_key_mismatch_error = ("m.key_mismatch", "Key mismatch")
_user_mismatch_error = ("m.user_error", "User mismatch")
_invalid_message_error = ("m.invalid_message", "Invalid message")
_commitment_mismatch_error = (
"m.mismatched_commitment",
"Mismatched commitment",
)
_sas_mismatch_error = (
"m.mismatched_sas",
"Mismatched short authentication string",
)
_max_age = timedelta(minutes=5)
_max_event_timeout = timedelta(minutes=1)
emoji = [
("🐶", "Dog"),
("🐱", "Cat"),
("🦁", "Lion"),
("🐎", "Horse"),
("🦄", "Unicorn"),
("🐷", "Pig"),
("🐘", "Elephant"),
("🐰", "Rabbit"),
("🐼", "Panda"),
("🐓", "Rooster"),
("🐧", "Penguin"),
("🐢", "Turtle"),
("🐟", "Fish"),
("🐙", "Octopus"),
("🦋", "Butterfly"),
("🌷", "Flower"),
("🌳", "Tree"),
("🌵", "Cactus"),
("🍄", "Mushroom"),
("🌏", "Globe"),
("🌙", "Moon"),
("☁️", "Cloud"),
("🔥", "Fire"),
("🍌", "Banana"),
("🍎", "Apple"),
("🍓", "Strawberry"),
("🌽", "Corn"),
("🍕", "Pizza"),
("🎂", "Cake"),
("❤️", "Heart"),
("😀", "Smiley"),
("🤖", "Robot"),
("🎩", "Hat"),
("👓", "Glasses"),
("🔧", "Wrench"),
("🎅", "Santa"),
("👍", "Thumbs up"),
("☂️", "Umbrella"),
("", "Hourglass"),
("", "Clock"),
("🎁", "Gift"),
("💡", "Light Bulb"),
("📕", "Book"),
("✏️", "Pencil"),
("📎", "Paperclip"),
("✂️", "Scissors"),
("🔒", "Lock"),
("🔑", "Key"),
("🔨", "Hammer"),
("☎️", "Telephone"),
("🏁", "Flag"),
("🚂", "Train"),
("🚲", "Bicycle"),
("✈️", "Airplane"),
("🚀", "Rocket"),
("🏆", "Trophy"),
("", "Ball"),
("🎸", "Guitar"),
("🎺", "Trumpet"),
("🔔", "Bell"),
("", "Anchor"),
("🎧", "Headphones"),
("📁", "Folder"),
("📌", "Pin"),
]
def __init__(
self,
own_user: str,
own_device: str,
own_fp_key: str,
other_olm_device: OlmDevice,
transaction_id: str = None,
short_auth_string: Optional[List[str]] = None,
mac_methods: Optional[List[str]] = None,
):
self.own_user = own_user
self.own_device = own_device
self.own_fp_key = own_fp_key
self.other_olm_device = other_olm_device
self.transaction_id = transaction_id or str(uuid4())
self.short_auth_string = short_auth_string or ["emoji", "decimal"]
self.mac_methods = mac_methods or Sas._mac_v1
self.chosen_mac_method = ""
self.key_agreement_protocols = Sas._key_agreeemnt_protocols
self.chosen_key_agreement: Optional[str] = None
self.state = SasState.created
self.we_started_it = True
self.sas_accepted = False
self.commitment = None
self.cancel_reason = ""
self.cancel_code = ""
self.their_sas_key: Optional[str] = None
self.verified_devices: List[str] = []
self.creation_time = datetime.now()
self._last_event_time = self.creation_time
super().__init__()
@classmethod
def from_key_verification_start(
cls, own_user, own_device, own_fp_key, other_olm_device, event
):
# type: (str, str, str, OlmDevice, KeyVerificationStart) -> Sas
"""Create a SAS object from a KeyVerificationStart event.
Args:
own_user (str): The user id of our own user.
own_device (str): The device id of our own user.
own_fp_key (str): The fingerprint key of our own device that will
be verified by the other client.
other_olm_device (OlmDevice): The Olm device of the other user that
should be verified.
event (KeyVerificationStart): The event that we received from the
other device to start the key verification process.
"""
obj = cls(
own_user,
own_device,
own_fp_key,
other_olm_device,
event.transaction_id,
event.short_authentication_string,
event.message_authentication_codes,
)
obj.we_started_it = False
obj.state = SasState.started
string_content = Api.to_canonical_json(event.source["content"])
obj.commitment = olm.sha256(obj.pubkey + string_content)
obj.key_agreement_protocols = event.key_agreement_protocols
if (
Sas._sas_method_v1 != event.method
or (
Sas._key_agreement_v1 not in event.key_agreement_protocols
and Sas._key_agreement_v2 not in event.key_agreement_protocols
)
or Sas._hash_v1 not in event.hashes
or (
Sas._mac_normal not in event.message_authentication_codes
and Sas._mac_old not in event.message_authentication_codes
)
or (
"emoji" not in event.short_authentication_string
and "decimal" not in event.short_authentication_string
)
):
obj.state = SasState.canceled
obj.cancel_code, obj.cancel_reason = obj._unknonw_method_error
return obj
@property
def canceled(self) -> bool:
"""Is the verification request canceled."""
return self.state == SasState.canceled
@property
def timed_out(self) -> bool:
"""Did the verification process time out."""
if self.verified or self.canceled:
return False
now = datetime.now()
if (
now - self.creation_time >= self._max_age
or now - self._last_event_time >= self._max_event_timeout
):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._timeout_error
return True
return False
@property
def verified(self) -> bool:
"""Is the device verified and the request done."""
return self.state == SasState.mac_received and self.sas_accepted
def set_their_pubkey(self, pubkey: str):
self.their_sas_key = pubkey
super().set_their_pubkey(pubkey)
def accept_sas(self):
"""Accept the short authentication string."""
if self.state == SasState.canceled:
raise LocalProtocolError(
"Key verification process was canceled "
"can't accept short authentication "
"string"
)
if not self.other_key_set:
raise LocalProtocolError(
"Other public key isn't set yet, can't "
"generate nor accept a short "
"authentication string."
)
self.sas_accepted = True
def reject_sas(self):
"""Reject the authentication string."""
if not self.other_key_set:
raise LocalProtocolError(
"Other public key isn't set yet, can't "
"generate nor reject a short "
"authentication string."
)
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._sas_mismatch_error
def cancel(self):
"""Cancel the authentication process."""
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._user_cancel_error
def _check_commitment(self, key: str):
assert self.commitment
calculated_commitment = olm.sha256(
key + Api.to_canonical_json(self.start_verification().content)
)
return self.commitment == calculated_commitment
def _grouper(self, iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks."""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
@property
def _extra_info_v1(self) -> str:
device = self.other_olm_device
tx_id = self.transaction_id
our_info = f"{self.own_user}{self.own_device}"
their_info = f"{device.user_id}{device.device_id}"
if self.we_started_it:
return f"MATRIX_KEY_VERIFICATION_SAS{our_info}{their_info}{tx_id}"
else:
return f"MATRIX_KEY_VERIFICATION_SAS{their_info}{our_info}{tx_id}"
@property
def _extra_info_v2(self) -> str:
device = self.other_olm_device
tx_id = self.transaction_id
assert self.their_sas_key
our_info = f"{self.own_user}|{self.own_device}|{self.pubkey}"
their_info = f"{device.user_id}|{device.device_id}|{self.their_sas_key}"
if self.we_started_it:
return f"MATRIX_KEY_VERIFICATION_SAS|{our_info}|{their_info}|{tx_id}"
else:
return f"MATRIX_KEY_VERIFICATION_SAS|{their_info}|{our_info}|{tx_id}"
@property
def _extra_info(self) -> str:
if self.chosen_key_agreement == Sas._key_agreement_v1:
return self._extra_info_v1
elif self.chosen_key_agreement == Sas._key_agreement_v2:
return self._extra_info_v2
raise ValueError(f"Unknown key agreement protocol {self.chosen_key_agreement}")
def get_emoji(self) -> List[Tuple[str, str]]:
"""Get the emoji short authentication string.
Returns a list of tuples that contain the emoji and the description of
the emoji of the short authentication string.
"""
return self._generate_emoji(self._extra_info)
def get_decimals(self) -> Tuple[int, ...]:
"""Get the decimal short authentication string.
Returns a tuple that contains three 4 digit integer numbers that
represent the short authentication string.
"""
return self._generate_decimals(self._extra_info)
def _generate_emoji(self, extra_info: str) -> List[Tuple[str, str]]:
"""Create a list of emojies from our shared secret."""
generated_bytes = self.generate_bytes(extra_info, 6)
number = "".join([format(x, "08b") for x in bytes(generated_bytes)])
return [
self.emoji[int(x, 2)]
for x in map("".join, list(self._grouper(number[:42], 6)))
]
def _generate_decimals(self, extra_info: str) -> Tuple[int, ...]:
"""Create a decimal number from our shared secret."""
generated_bytes = self.generate_bytes(extra_info, 5)
number = "".join([format(x, "08b") for x in bytes(generated_bytes)])
return tuple(
int(x, 2) + 1000 for x in map("".join, list(self._grouper(number[:-1], 13)))
)
def start_verification(self) -> ToDeviceMessage:
"""Create a content dictionary to start the verification."""
if not self.we_started_it:
raise LocalProtocolError(
"Verification was not started by us, "
"can't send start verification message."
)
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, "
"can't send start verification message."
)
content = {
"from_device": self.own_device,
"method": self._sas_method_v1,
"transaction_id": self.transaction_id,
"key_agreement_protocols": Sas._key_agreeemnt_protocols,
"hashes": [self._hash_v1],
"message_authentication_codes": self._mac_v1,
"short_authentication_string": self._strings_v1,
}
message = ToDeviceMessage(
"m.key.verification.start",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def accept_verification(self) -> ToDeviceMessage:
"""Create a content dictionary to accept the verification offer."""
if self.we_started_it:
raise LocalProtocolError(
"Verification was started by us, can't " "accept offer."
)
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "accept offer."
)
sas_methods = []
if "emoji" in self.short_auth_string:
sas_methods.append("emoji")
if "decimal" in self.short_auth_string:
sas_methods.append("decimal")
if self._mac_normal in self.mac_methods:
self.chosen_mac_method = self._mac_normal
else:
self.chosen_mac_method = self._mac_old
if Sas._key_agreement_v2 in self.key_agreement_protocols:
self.chosen_key_agreement = Sas._key_agreement_v2
else:
self.chosen_key_agreement = Sas._key_agreement_v1
content = {
"transaction_id": self.transaction_id,
"key_agreement_protocol": self.chosen_key_agreement,
"hash": self._hash_v1,
"message_authentication_code": self.chosen_mac_method,
"short_authentication_string": sas_methods,
"commitment": self.commitment,
}
message = ToDeviceMessage(
"m.key.verification.accept",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def share_key(self) -> ToDeviceMessage:
"""Create a dictionary containing our public key."""
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "share our public key."
)
content = {"transaction_id": self.transaction_id, "key": self.pubkey}
message = ToDeviceMessage(
"m.key.verification.key",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def get_mac(self) -> ToDeviceMessage:
"""Create a dictionary containing our MAC."""
if not self.sas_accepted:
raise LocalProtocolError("SAS string wasn't yet accepted")
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "generate MAC."
)
key_id = f"ed25519:{self.own_device}"
assert self.chosen_mac_method
if self.chosen_mac_method == self._mac_normal:
calculate_mac = self.calculate_mac
elif self.chosen_mac_method == self._mac_old:
calculate_mac = self.calculate_mac_long_kdf
info = (
"MATRIX_KEY_VERIFICATION_MAC"
f"{self.own_user}{self.own_device}"
f"{self.other_olm_device.user_id}{self.other_olm_device.id}{self.transaction_id}"
)
mac = {key_id: calculate_mac(self.own_fp_key, info + key_id)}
content = {
"mac": mac,
"keys": calculate_mac(key_id, info + "KEY_IDS"),
"transaction_id": self.transaction_id,
}
message = ToDeviceMessage(
"m.key.verification.mac",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def get_cancellation(self) -> ToDeviceMessage:
"""Create a dictionary containing our verification cancellation."""
if self.state != SasState.canceled:
raise LocalProtocolError("Sas process isn't canceled.")
assert self.cancel_code
assert self.cancel_reason
content = {
"code": self.cancel_code,
"reason": self.cancel_reason,
"transaction_id": self.transaction_id,
}
message = ToDeviceMessage(
"m.key.verification.cancel",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def _event_ok(self, event: KeyVerificationEvent):
if self.state == SasState.canceled:
return False
if event.transaction_id != self.transaction_id:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._txid_error
return False
if self.other_olm_device.user_id != event.sender:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._user_mismatch_error
return False
return True
def receive_accept_event(self, event):
"""Receive a KeyVerificationAccept event."""
if not self._event_ok(event):
return
if self.state != SasState.created:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = Sas._unexpected_message_error
return
if (
event.key_agreement_protocol not in Sas._key_agreeemnt_protocols
or event.hash != Sas._hash_v1
or event.message_authentication_code not in Sas._mac_v1
or (
"emoji" not in event.short_authentication_string
and "decimal" not in event.short_authentication_string
)
):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = Sas._unknonw_method_error
return
self.commitment = event.commitment
self.chosen_mac_method = event.message_authentication_code
self.chosen_key_agreement = event.key_agreement_protocol
self.short_auth_string = event.short_authentication_string
self.state = SasState.accepted
def receive_key_event(self, event):
"""Receive a KeyVerificationKey event."""
if self.other_key_set or (
(self.state != SasState.started) and (self.state != SasState.accepted)
):
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._unexpected_message_error
return
if not self._event_ok(event):
return
if self.we_started_it:
if not self._check_commitment(event.key):
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._commitment_mismatch_error
return
self.set_their_pubkey(event.key)
self.state = SasState.key_received
def receive_mac_event(self, event):
"""Receive a KeyVerificationMac event.
Args:
event (KeyVerificationMac): The MAC event that was received for
this SAS session.
"""
if self.verified:
return
if not self._event_ok(event):
return
if self.state != SasState.key_received:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = Sas._unexpected_message_error
return
info = (
f"MATRIX_KEY_VERIFICATION_MAC{self.other_olm_device.user_id}{self.other_olm_device.id}"
f"{self.own_user}{self.own_device}{self.transaction_id}"
)
key_ids = ",".join(sorted(event.mac.keys()))
assert self.chosen_mac_method
if self.chosen_mac_method == self._mac_normal:
calculate_mac = self.calculate_mac
elif self.chosen_mac_method == self._mac_old:
calculate_mac = self.calculate_mac_long_kdf
if event.keys != calculate_mac(key_ids, info + "KEY_IDS"):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
for key_id, key_mac in event.mac.items():
try:
key_type, device_id = key_id.split(":", 2)
except ValueError:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._invalid_message_error
return
if key_type != "ed25519":
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
if device_id != self.other_olm_device.id:
continue
other_fp_key = self.other_olm_device.ed25519
if key_mac != calculate_mac(other_fp_key, info + key_id):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
self.verified_devices.append(device_id)
if not self.verified_devices:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
self.state = SasState.mac_received