mirror of https://github.com/poljar/matrix-nio.git
1500 lines
51 KiB
Python
1500 lines
51 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
|
|
#
|
|
# Permission to use, copy, modify, and/or distribute this software for
|
|
# any purpose with or without fee is hereby granted, provided that the
|
|
# above copyright notice and this permission notice appear in all copies.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
|
|
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
|
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
|
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
|
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from functools import wraps
|
|
from typing import (
|
|
Any,
|
|
Awaitable,
|
|
Callable,
|
|
Coroutine,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
)
|
|
|
|
from logbook import Logger
|
|
|
|
from ..crypto import ENCRYPTION_ENABLED, DeviceStore, OlmDevice, OutgoingKeyRequest
|
|
from ..events import (
|
|
AccountDataEvent,
|
|
BadEvent,
|
|
BadEventType,
|
|
EphemeralEvent,
|
|
Event,
|
|
MegolmEvent,
|
|
PresenceEvent,
|
|
RoomEncryptionEvent,
|
|
RoomKeyRequest,
|
|
RoomKeyRequestCancellation,
|
|
RoomMemberEvent,
|
|
ToDeviceEvent,
|
|
UnknownBadEvent,
|
|
)
|
|
from ..exceptions import EncryptionError, LocalProtocolError, MembersSyncError
|
|
from ..log import logger_group
|
|
from ..responses import (
|
|
ErrorResponse,
|
|
JoinedMembersResponse,
|
|
KeysClaimResponse,
|
|
KeysQueryResponse,
|
|
KeysUploadResponse,
|
|
LoginResponse,
|
|
LogoutResponse,
|
|
PresenceGetResponse,
|
|
RegisterResponse,
|
|
Response,
|
|
RoomContextResponse,
|
|
RoomForgetResponse,
|
|
RoomGetEventResponse,
|
|
RoomInfo,
|
|
RoomKeyRequestResponse,
|
|
RoomMessagesResponse,
|
|
ShareGroupSessionResponse,
|
|
SyncResponse,
|
|
ToDeviceResponse,
|
|
)
|
|
from ..rooms import MatrixInvitedRoom, MatrixRoom
|
|
|
|
if ENCRYPTION_ENABLED:
|
|
from ..crypto import Olm
|
|
from ..store import DefaultStore, MatrixStore, SqliteMemoryStore
|
|
|
|
from ..event_builders import ToDeviceMessage
|
|
|
|
if False:
|
|
from ..crypto import Sas
|
|
|
|
try:
|
|
from json.decoder import JSONDecodeError
|
|
except ImportError:
|
|
JSONDecodeError = ValueError # type: ignore
|
|
|
|
|
|
logger = Logger("nio.client")
|
|
logger_group.add_logger(logger)
|
|
|
|
|
|
def logged_in(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not self.logged_in:
|
|
raise LocalProtocolError("Not logged in.")
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def store_loaded(fn):
|
|
@wraps(fn)
|
|
def inner(self, *args, **kwargs):
|
|
if not self.store or not self.olm:
|
|
raise LocalProtocolError("Matrix store and olm account is not loaded.")
|
|
return fn(self, *args, **kwargs)
|
|
|
|
return inner
|
|
|
|
|
|
@dataclass
|
|
class ClientCallback:
|
|
"""nio internal callback class."""
|
|
|
|
func: Callable = field()
|
|
filter: Union[Tuple[Type, ...], Type, None] = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ClientConfig:
|
|
"""nio client configuration.
|
|
|
|
Attributes:
|
|
store (MatrixStore, optional): The store that should be used for state
|
|
storage.
|
|
store_name (str, optional): Filename that should be used for the
|
|
store.
|
|
encryption_enabled (bool, optional): Should end to end encryption be
|
|
used.
|
|
pickle_key (str, optional): A passphrase that will be used to encrypt
|
|
end to end encryption keys.
|
|
store_sync_tokens (bool, optional): Should the client store and restore
|
|
sync tokens.
|
|
custom_headers (Dict[str, str]): A dictionary of custom http headers.
|
|
|
|
Raises an ImportWarning if encryption_enabled is true but the dependencies
|
|
for encryption aren't installed.
|
|
|
|
"""
|
|
|
|
store: Optional[Type["MatrixStore"]] = DefaultStore if ENCRYPTION_ENABLED else None
|
|
|
|
encryption_enabled: bool = ENCRYPTION_ENABLED
|
|
|
|
store_name: str = ""
|
|
pickle_key: str = "DEFAULT_KEY"
|
|
store_sync_tokens: bool = False
|
|
custom_headers: Dict[str, str] = None
|
|
|
|
def __post_init__(self):
|
|
if not ENCRYPTION_ENABLED and self.encryption_enabled:
|
|
raise ImportWarning(
|
|
"Encryption is enabled in the client "
|
|
"configuration but dependencies for E2E "
|
|
"encryption aren't installed."
|
|
)
|
|
|
|
|
|
class Client:
|
|
"""Matrix no-IO client.
|
|
|
|
Attributes:
|
|
access_token (str): Token authorizing the user with the server. Is set
|
|
after logging in.
|
|
user_id (str): The full mxid of the current user. This is set after
|
|
logging in.
|
|
next_batch (str): The current sync token.
|
|
rooms (Dict[str, MatrixRoom)): A dictionary containing a mapping of room
|
|
ids to MatrixRoom objects. All the rooms a user is joined to will be
|
|
here after a sync.
|
|
invited_rooms (Dict[str, MatrixInvitedRoom)): A dictionary containing
|
|
a mapping of room ids to MatrixInvitedRoom objects. All the rooms
|
|
a user is invited to will be here after a sync.
|
|
|
|
Args:
|
|
user (str): User that will be used to log in.
|
|
device_id (str, optional): An unique identifier that distinguishes
|
|
this client instance. If not set the server will provide one after
|
|
log in.
|
|
store_dir (str, optional): The directory that should be used for state
|
|
storeage.
|
|
config (ClientConfig, optional): Configuration for the client.
|
|
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
user: str,
|
|
device_id: Optional[str] = None,
|
|
store_path: Optional[str] = "",
|
|
config: Optional[ClientConfig] = None,
|
|
):
|
|
self.user = user
|
|
self.device_id = device_id
|
|
self.store_path = store_path
|
|
self.olm: Optional["Olm"] = None
|
|
self.store: Optional["MatrixStore"] = None
|
|
self.config = config or ClientConfig()
|
|
|
|
self.user_id = ""
|
|
# TODO Turn this into a optional string.
|
|
self.access_token = "" # type: str
|
|
self.next_batch = ""
|
|
self.loaded_sync_token = ""
|
|
|
|
self.rooms: Dict[str, MatrixRoom] = dict()
|
|
self.invited_rooms: Dict[str, MatrixInvitedRoom] = dict()
|
|
self.encrypted_rooms: Set[str] = set()
|
|
|
|
self.event_callbacks: List[ClientCallback] = []
|
|
self.ephemeral_callbacks: List[ClientCallback] = []
|
|
self.to_device_callbacks: List[ClientCallback] = []
|
|
self.presence_callbacks: List[ClientCallback] = []
|
|
self.global_account_data_callbacks: List[ClientCallback] = []
|
|
self.room_account_data_callbacks: List[ClientCallback] = []
|
|
|
|
@property
|
|
def logged_in(self) -> bool:
|
|
"""Check if we are logged in.
|
|
|
|
Returns True if the client is logged in to the server, False otherwise.
|
|
"""
|
|
return bool(self.access_token)
|
|
|
|
@property # type: ignore
|
|
@store_loaded
|
|
def device_store(self) -> DeviceStore:
|
|
"""Store containing known devices.
|
|
|
|
Returns a ``DeviceStore`` holding all known olm devices.
|
|
"""
|
|
assert self.olm
|
|
return self.olm.device_store
|
|
|
|
@property # type: ignore
|
|
@store_loaded
|
|
def olm_account_shared(self) -> bool:
|
|
"""Check if the clients Olm account is shared with the server.
|
|
|
|
Returns True if the Olm account is shared, False otherwise.
|
|
"""
|
|
assert self.olm
|
|
return self.olm.account.shared
|
|
|
|
@property
|
|
def users_for_key_query(self) -> Set[str]:
|
|
"""Users for whom we should make a key query."""
|
|
if not self.olm:
|
|
return set()
|
|
|
|
return self.olm.users_for_key_query
|
|
|
|
@property
|
|
def should_upload_keys(self) -> bool:
|
|
"""Check if the client should upload encryption keys.
|
|
|
|
Returns True if encryption keys need to be uploaded, false otherwise.
|
|
"""
|
|
if not self.olm:
|
|
return False
|
|
|
|
return self.olm.should_upload_keys
|
|
|
|
@property
|
|
def should_query_keys(self) -> bool:
|
|
"""Check if the client should make a key query call to the server.
|
|
|
|
Returns True if a key query is necessary, false otherwise.
|
|
"""
|
|
if not self.olm:
|
|
return False
|
|
|
|
return self.olm.should_query_keys
|
|
|
|
@property
|
|
def should_claim_keys(self) -> bool:
|
|
"""Check if the client should claim one-time keys for some users.
|
|
|
|
This should be periodically checked and if true a keys claim request
|
|
should be made with the return value of a
|
|
`get_users_for_key_claiming()` call as the payload.
|
|
|
|
Keys need to be claimed for various reasons. Every time we need to send
|
|
an encrypted message to a device and we don't have a working Olm
|
|
session with them we need to claim one-time keys to create a new Olm
|
|
session.
|
|
|
|
Returns True if a key query is necessary, false otherwise.
|
|
"""
|
|
if not self.olm:
|
|
return False
|
|
|
|
return bool(self.olm.wedged_devices or self.olm.key_request_devices_no_session)
|
|
|
|
@property
|
|
def outgoing_key_requests(self) -> Dict[str, OutgoingKeyRequest]:
|
|
"""Our active key requests that we made."""
|
|
return self.olm.outgoing_key_requests if self.olm else dict()
|
|
|
|
@property
|
|
def key_verifications(self):
|
|
# type () -> Dict[str, Sas]
|
|
"""Key verifications that the client is participating in."""
|
|
return self.olm.key_verifications if self.olm else dict()
|
|
|
|
@property
|
|
def outgoing_to_device_messages(self) -> List[ToDeviceMessage]:
|
|
"""To-device messages that we need to send out."""
|
|
return self.olm.outgoing_to_device_messages if self.olm else []
|
|
|
|
def get_active_sas(self, user_id, device_id):
|
|
# type (str, str) -> Optional[Sas]
|
|
"""Find a non-canceled SAS verification object for the provided user.
|
|
|
|
Args:
|
|
user_id (str): The user for which we should find a SAS verification
|
|
object.
|
|
device_id (str): The device_id for which we should find the SAS
|
|
verification object.
|
|
|
|
Returns the object if it's found, otherwise None.
|
|
"""
|
|
if not self.olm:
|
|
return None
|
|
|
|
return self.olm.get_active_sas(user_id, device_id)
|
|
|
|
def load_store(self):
|
|
"""Load the session store and olm account.
|
|
|
|
If the SqliteMemoryStore is set as the store a store path isn't
|
|
required, if no store path is provided and a store class that requires
|
|
a path is used this method will be a no op.
|
|
|
|
This method does nothing if the store is already loaded.
|
|
|
|
Raises LocalProtocolError if a store class, user_id and device_id are
|
|
not set.
|
|
"""
|
|
if self.store:
|
|
return
|
|
|
|
if not self.user_id:
|
|
raise LocalProtocolError("User id is not set")
|
|
|
|
if not self.device_id:
|
|
raise LocalProtocolError("Device id is not set")
|
|
|
|
if not self.config.store:
|
|
raise LocalProtocolError("No store class was provided in the config.")
|
|
|
|
if self.config.encryption_enabled:
|
|
if self.config.store is SqliteMemoryStore:
|
|
self.store = self.config.store(
|
|
self.user_id,
|
|
self.device_id,
|
|
self.config.pickle_key,
|
|
)
|
|
else:
|
|
if not self.store_path:
|
|
return
|
|
|
|
self.store = self.config.store(
|
|
self.user_id,
|
|
self.device_id,
|
|
self.store_path,
|
|
self.config.pickle_key,
|
|
self.config.store_name,
|
|
)
|
|
assert self.store
|
|
|
|
self.olm = Olm(self.user_id, self.device_id, self.store)
|
|
self.encrypted_rooms = self.store.load_encrypted_rooms()
|
|
|
|
if self.config.store_sync_tokens:
|
|
self.loaded_sync_token = self.store.load_sync_token()
|
|
|
|
def restore_login(
|
|
self,
|
|
user_id: str,
|
|
device_id: str,
|
|
access_token: str,
|
|
):
|
|
"""Restore a previous login to the homeserver.
|
|
|
|
Args:
|
|
user_id (str): The full mxid of the current user.
|
|
device_id (str): An unique identifier that distinguishes
|
|
this client instance.
|
|
access_token (str): Token authorizing the user with the server.
|
|
"""
|
|
self.user_id = user_id
|
|
self.device_id = device_id
|
|
self.access_token = access_token
|
|
if ENCRYPTION_ENABLED:
|
|
self.load_store()
|
|
|
|
def room_contains_unverified(self, room_id: str) -> bool:
|
|
"""Check if a room contains unverified devices.
|
|
|
|
Args:
|
|
room_id (str): Room id of the room that should be checked.
|
|
|
|
Returns True if the room contains unverified devices, false otherwise.
|
|
Returns False if no Olm session is loaded or if the room isn't
|
|
encrypted.
|
|
"""
|
|
try:
|
|
room = self.rooms[room_id]
|
|
except KeyError:
|
|
raise LocalProtocolError(f"No room found with room id {room_id}")
|
|
|
|
if not room.encrypted:
|
|
return False
|
|
|
|
if not self.olm:
|
|
return False
|
|
|
|
for user in room.users:
|
|
if not self.olm.user_fully_verified(user):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _invalidate_session_for_member_event(self, room_id: str):
|
|
if not self.olm:
|
|
return
|
|
self.invalidate_outbound_session(room_id)
|
|
|
|
@store_loaded
|
|
def invalidate_outbound_session(self, room_id: str):
|
|
"""Explicitely remove encryption keys for a room.
|
|
|
|
Args:
|
|
room_id (str): Room id for the room the encryption keys should be
|
|
removed.
|
|
"""
|
|
assert self.olm
|
|
session = self.olm.outbound_group_sessions.pop(room_id, None)
|
|
|
|
# There is no need to invalidate the session if it was never
|
|
# shared, put it back where it was.
|
|
if session and not session.shared:
|
|
self.olm.outbound_group_sessions[room_id] = session
|
|
elif session:
|
|
logger.info(f"Invalidating session for {room_id}")
|
|
|
|
def _invalidate_outbound_sessions(self, device):
|
|
# type: (OlmDevice) -> None
|
|
assert self.olm
|
|
|
|
for room in self.rooms.values():
|
|
if device.user_id in room.users:
|
|
self.invalidate_outbound_session(room.room_id)
|
|
|
|
@store_loaded
|
|
def verify_device(self, device: OlmDevice) -> bool:
|
|
"""Mark a device as verified.
|
|
|
|
A device needs to be either trusted/ignored or blacklisted to either
|
|
share room encryption keys with it or not.
|
|
This method adds the device to the trusted devices and enables sharing
|
|
room encryption keys with it.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which should be added to the trust
|
|
list.
|
|
|
|
Returns true if the device was verified, false if it was already
|
|
verified.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.verify_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
@store_loaded
|
|
def unverify_device(self, device: OlmDevice) -> bool:
|
|
"""Unmark a device as verified.
|
|
|
|
This method removes the device from the trusted devices and disables
|
|
sharing room encryption keys with it. It also invalidates any
|
|
encryption keys for rooms that the device takes part of.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which should be added to the trust
|
|
list.
|
|
|
|
Returns true if the device was unverified, false if it was already
|
|
unverified.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.unverify_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
@store_loaded
|
|
def blacklist_device(self, device: OlmDevice) -> bool:
|
|
"""Mark a device as blacklisted.
|
|
|
|
Devices on the blacklist will not receive room encryption keys and
|
|
therefore won't be able to decrypt messages coming from this client.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which should be added to the
|
|
blacklist.
|
|
|
|
Returns true if the device was added, false if it was on the blacklist
|
|
already.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.blacklist_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
@store_loaded
|
|
def unblacklist_device(self, device: OlmDevice) -> bool:
|
|
"""Unmark a device as blacklisted.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which should be removed from the
|
|
blacklist.
|
|
|
|
Returns true if the device was removed, false if it wasn't on the
|
|
blacklist and no removal happened.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.unblacklist_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
@store_loaded
|
|
def ignore_device(self, device: OlmDevice) -> bool:
|
|
"""Mark a device as ignored.
|
|
|
|
Ignored devices will still receive room encryption keys, despire not
|
|
being verified.
|
|
|
|
Args:
|
|
device (OlmDevice): the device to ignore
|
|
|
|
Returns true if device is ignored, or false if it is already on the
|
|
list of ignored devices.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.ignore_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
@store_loaded
|
|
def unignore_device(self, device: OlmDevice) -> bool:
|
|
"""Unmark a device as ignored.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which should be removed from the
|
|
list of ignored devices.
|
|
|
|
Returns true if the device was removed, false if it wasn't on the
|
|
list and no removal happened.
|
|
"""
|
|
assert self.olm
|
|
|
|
changed = self.olm.unignore_device(device)
|
|
if changed:
|
|
self._invalidate_outbound_sessions(device)
|
|
|
|
return changed
|
|
|
|
def _handle_register(self, response):
|
|
# type: (Union[RegisterResponse, ErrorResponse]) -> None
|
|
if isinstance(response, ErrorResponse):
|
|
return
|
|
|
|
self.restore_login(response.user_id, response.device_id, response.access_token)
|
|
|
|
def _handle_login(self, response: Union[LoginResponse, ErrorResponse]):
|
|
if isinstance(response, ErrorResponse):
|
|
return
|
|
|
|
self.restore_login(response.user_id, response.device_id, response.access_token)
|
|
|
|
def _handle_logout(self, response: Union[LogoutResponse, ErrorResponse]):
|
|
if not isinstance(response, ErrorResponse):
|
|
self.access_token = ""
|
|
|
|
@store_loaded
|
|
def decrypt_event(self, event: MegolmEvent) -> Union[Event, BadEventType]:
|
|
"""Try to decrypt an undecrypted megolm event.
|
|
|
|
Args:
|
|
event (MegolmEvent): Event that should be decrypted.
|
|
|
|
Returns the decrypted event, raises EncryptionError if there was an
|
|
error while decrypting.
|
|
"""
|
|
if not isinstance(event, MegolmEvent):
|
|
raise ValueError(
|
|
"Invalid event, this function can only decrypt " "MegolmEvents"
|
|
)
|
|
|
|
assert self.olm
|
|
return self.olm.decrypt_megolm_event(event)
|
|
|
|
def _handle_decrypt_to_device(
|
|
self, to_device_event: ToDeviceEvent
|
|
) -> Optional[ToDeviceEvent]:
|
|
if self.olm:
|
|
return self.olm.handle_to_device_event(to_device_event)
|
|
|
|
return None
|
|
|
|
def _replace_decrypted_to_device(
|
|
self,
|
|
decrypted_events: List[Tuple[int, ToDeviceEvent]],
|
|
response: SyncResponse,
|
|
):
|
|
# Replace the encrypted to_device events with decrypted ones
|
|
for decrypted_event in decrypted_events:
|
|
index, event = decrypted_event
|
|
response.to_device_events[index] = event
|
|
|
|
def _run_to_device_callbacks(self, event: ToDeviceEvent):
|
|
for cb in self.to_device_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(event)
|
|
|
|
def _handle_to_device(self, response: SyncResponse):
|
|
decrypted_to_device = []
|
|
|
|
for index, to_device_event in enumerate(response.to_device_events):
|
|
decrypted_event = self._handle_decrypt_to_device(to_device_event)
|
|
|
|
if decrypted_event:
|
|
decrypted_to_device.append((index, decrypted_event))
|
|
to_device_event = decrypted_event
|
|
|
|
# Do not pass room key request events to our user here. We don't
|
|
# want to notify them about requests that get automatically handled
|
|
# or canceled right away.
|
|
if isinstance(
|
|
to_device_event, (RoomKeyRequest, RoomKeyRequestCancellation)
|
|
):
|
|
continue
|
|
|
|
self._run_to_device_callbacks(to_device_event)
|
|
|
|
self._replace_decrypted_to_device(decrypted_to_device, response)
|
|
|
|
def _get_invited_room(self, room_id: str) -> MatrixInvitedRoom:
|
|
if room_id not in self.invited_rooms:
|
|
logger.info(f"New invited room {room_id}")
|
|
self.invited_rooms[room_id] = MatrixInvitedRoom(room_id, self.user_id)
|
|
|
|
return self.invited_rooms[room_id]
|
|
|
|
def _handle_invited_rooms(self, response: SyncResponse):
|
|
for room_id, info in response.rooms.invite.items():
|
|
room = self._get_invited_room(room_id)
|
|
|
|
for event in info.invite_state:
|
|
room.handle_event(event)
|
|
|
|
for cb in self.event_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(room, event)
|
|
|
|
def _handle_joined_state(
|
|
self, room_id: str, join_info: RoomInfo, encrypted_rooms: Set[str]
|
|
):
|
|
if room_id in self.invited_rooms:
|
|
del self.invited_rooms[room_id]
|
|
|
|
if room_id not in self.rooms:
|
|
logger.info(f"New joined room {room_id}")
|
|
self.rooms[room_id] = MatrixRoom(
|
|
room_id, self.user_id, room_id in self.encrypted_rooms
|
|
)
|
|
|
|
room = self.rooms[room_id]
|
|
|
|
for event in join_info.state:
|
|
if isinstance(event, RoomEncryptionEvent):
|
|
encrypted_rooms.add(room_id)
|
|
|
|
if isinstance(event, RoomMemberEvent):
|
|
if room.handle_membership(event):
|
|
self._invalidate_session_for_member_event(room_id)
|
|
else:
|
|
room.handle_event(event)
|
|
|
|
if join_info.summary:
|
|
room.update_summary(join_info.summary)
|
|
|
|
if join_info.unread_notifications:
|
|
room.update_unread_notifications(join_info.unread_notifications)
|
|
|
|
def _handle_timeline_event(
|
|
self,
|
|
event: Union[Event, BadEventType],
|
|
room_id: str,
|
|
room: MatrixRoom,
|
|
encrypted_rooms: Set[str],
|
|
) -> Optional[Union[Event, BadEventType]]:
|
|
decrypted_event = None
|
|
|
|
if isinstance(event, MegolmEvent) and self.olm:
|
|
event.room_id = room_id
|
|
decrypted_event = self.olm._decrypt_megolm_no_error(event)
|
|
|
|
if decrypted_event:
|
|
event = decrypted_event
|
|
|
|
elif isinstance(event, RoomEncryptionEvent):
|
|
encrypted_rooms.add(room_id)
|
|
|
|
if isinstance(event, RoomMemberEvent):
|
|
if room.handle_membership(event):
|
|
self._invalidate_session_for_member_event(room_id)
|
|
|
|
elif isinstance(event, (UnknownBadEvent, BadEvent)):
|
|
pass
|
|
|
|
else:
|
|
room.handle_event(event)
|
|
|
|
return decrypted_event
|
|
|
|
def _handle_joined_rooms(self, response: SyncResponse):
|
|
encrypted_rooms: Set[str] = set()
|
|
|
|
for room_id, join_info in response.rooms.join.items():
|
|
self._handle_joined_state(room_id, join_info, encrypted_rooms)
|
|
|
|
room = self.rooms[room_id]
|
|
decrypted_events: List[Tuple[int, Union[Event, BadEventType]]] = []
|
|
|
|
for index, event in enumerate(join_info.timeline.events):
|
|
decrypted_event = self._handle_timeline_event(
|
|
event, room_id, room, encrypted_rooms
|
|
)
|
|
|
|
if decrypted_event:
|
|
event = decrypted_event
|
|
decrypted_events.append((index, decrypted_event))
|
|
|
|
for cb in self.event_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(room, event)
|
|
|
|
# Replace the Megolm events with decrypted ones
|
|
for index, event in decrypted_events:
|
|
join_info.timeline.events[index] = event
|
|
|
|
for event in join_info.ephemeral:
|
|
room.handle_ephemeral_event(event)
|
|
|
|
for cb in self.ephemeral_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(room, event)
|
|
|
|
for event in join_info.account_data:
|
|
room.handle_account_data(event)
|
|
|
|
for cb in self.room_account_data_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(room, event)
|
|
|
|
if room.encrypted and self.olm is not None:
|
|
self.olm.update_tracked_users(room)
|
|
|
|
self.encrypted_rooms.update(encrypted_rooms)
|
|
|
|
if self.store:
|
|
self.store.save_encrypted_rooms(encrypted_rooms)
|
|
|
|
def _handle_presence_events(self, response: SyncResponse):
|
|
for event in response.presence_events:
|
|
for room_id in self.rooms.keys():
|
|
if event.user_id not in self.rooms[room_id].users:
|
|
continue
|
|
|
|
self.rooms[room_id].users[event.user_id].presence = event.presence
|
|
self.rooms[room_id].users[
|
|
event.user_id
|
|
].last_active_ago = event.last_active_ago
|
|
self.rooms[room_id].users[
|
|
event.user_id
|
|
].currently_active = event.currently_active
|
|
self.rooms[room_id].users[event.user_id].status_msg = event.status_msg
|
|
|
|
for cb in self.presence_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(event)
|
|
|
|
def _handle_global_account_data_events(
|
|
self,
|
|
response: SyncResponse,
|
|
) -> None:
|
|
for event in response.account_data_events:
|
|
for cb in self.global_account_data_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(event)
|
|
|
|
def _handle_expired_verifications(self):
|
|
expired_verifications = self.olm.clear_verifications()
|
|
|
|
for event in expired_verifications:
|
|
for cb in self.to_device_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
cb.func(event)
|
|
|
|
def _handle_olm_events(self, response: SyncResponse):
|
|
assert self.olm
|
|
|
|
changed_users = set()
|
|
if response.device_key_count.signed_curve25519:
|
|
self.olm.uploaded_key_count = response.device_key_count.signed_curve25519
|
|
|
|
for user in response.device_list.changed:
|
|
for room in self.rooms.values():
|
|
if not room.encrypted:
|
|
continue
|
|
|
|
if user in room.users:
|
|
changed_users.add(user)
|
|
|
|
for user in response.device_list.left:
|
|
for room in self.rooms.values():
|
|
if not room.encrypted:
|
|
continue
|
|
|
|
if user in room.users:
|
|
changed_users.add(user)
|
|
|
|
self.olm.add_changed_users(changed_users)
|
|
|
|
def _handle_sync(
|
|
self, response: SyncResponse
|
|
) -> Union[None, Coroutine[Any, Any, None]]:
|
|
# We already recieved such a sync response, do nothing in that case.
|
|
if self.next_batch == response.next_batch:
|
|
return None
|
|
|
|
self.next_batch = response.next_batch
|
|
|
|
if self.config.store_sync_tokens and self.store:
|
|
self.store.save_sync_token(self.next_batch)
|
|
|
|
self._handle_to_device(response)
|
|
|
|
self._handle_invited_rooms(response)
|
|
|
|
self._handle_joined_rooms(response)
|
|
|
|
self._handle_presence_events(response)
|
|
|
|
self._handle_global_account_data_events(response)
|
|
|
|
if self.olm:
|
|
self._handle_expired_verifications()
|
|
self._handle_olm_events(response)
|
|
self._collect_key_requests()
|
|
|
|
return None
|
|
|
|
def _collect_key_requests(self):
|
|
events = self.olm.collect_key_requests()
|
|
for event in events:
|
|
self._run_to_device_callbacks(event)
|
|
|
|
def _decrypt_event_array(self, array: List[Union[Event, BadEventType]]):
|
|
if not self.olm:
|
|
return
|
|
|
|
decrypted_events = []
|
|
|
|
for index, event in enumerate(array):
|
|
if isinstance(event, MegolmEvent):
|
|
new_event = self.olm._decrypt_megolm_no_error(event)
|
|
if new_event:
|
|
decrypted_events.append((index, new_event))
|
|
|
|
for decrypted_event in decrypted_events:
|
|
index, event = decrypted_event
|
|
array[index] = event
|
|
|
|
def _handle_context_response(self, response: RoomContextResponse):
|
|
if isinstance(response.event, MegolmEvent):
|
|
if self.olm:
|
|
decrypted_event = self.olm._decrypt_megolm_no_error(response.event)
|
|
response.event = decrypted_event
|
|
|
|
self._decrypt_event_array(response.events_after)
|
|
self._decrypt_event_array(response.events_before)
|
|
|
|
def _handle_messages_response(self, response: RoomMessagesResponse):
|
|
decrypted_events = []
|
|
|
|
for index, event in enumerate(response.chunk):
|
|
if isinstance(event, MegolmEvent) and self.olm:
|
|
new_event = self.olm._decrypt_megolm_no_error(event)
|
|
if new_event:
|
|
decrypted_events.append((index, new_event))
|
|
|
|
for index, event in decrypted_events:
|
|
response.chunk[index] = event
|
|
|
|
def _handle_olm_response(
|
|
self,
|
|
response: Union[
|
|
ShareGroupSessionResponse,
|
|
KeysClaimResponse,
|
|
KeysQueryResponse,
|
|
KeysUploadResponse,
|
|
RoomKeyRequestResponse,
|
|
ToDeviceResponse,
|
|
],
|
|
):
|
|
if not self.olm:
|
|
return
|
|
|
|
self.olm.handle_response(response)
|
|
|
|
if isinstance(response, ShareGroupSessionResponse):
|
|
room_id = response.room_id
|
|
session = self.olm.outbound_group_sessions.get(room_id, None)
|
|
room = self.rooms.get(room_id, None)
|
|
|
|
if not session or not room:
|
|
return
|
|
|
|
session.users_shared_with.update(response.users_shared_with)
|
|
users = room.users
|
|
|
|
for user_id in users:
|
|
for device in self.device_store.active_user_devices(user_id):
|
|
user = (user_id, device.id)
|
|
if (
|
|
user not in session.users_shared_with
|
|
and user not in session.users_ignored
|
|
):
|
|
return
|
|
|
|
logger.info(f"Marking outbound group session for room {room_id} as shared")
|
|
session.shared = True
|
|
|
|
elif isinstance(response, KeysQueryResponse):
|
|
for user_id in response.changed:
|
|
for room in self.rooms.values():
|
|
if room.encrypted and user_id in room.users:
|
|
self.invalidate_outbound_session(room.room_id)
|
|
|
|
def _handle_joined_members(self, response: JoinedMembersResponse):
|
|
if response.room_id not in self.rooms:
|
|
return
|
|
|
|
room = self.rooms[response.room_id]
|
|
|
|
joined_user_ids = {m.user_id for m in response.members}
|
|
|
|
for user_id in tuple(room.users):
|
|
invited = room.users[user_id].invited
|
|
|
|
if not invited and user_id not in joined_user_ids:
|
|
room.remove_member(user_id)
|
|
|
|
for member in response.members:
|
|
room.add_member(member.user_id, member.display_name, member.avatar_url)
|
|
|
|
room.members_synced = True
|
|
|
|
if room.encrypted and self.olm is not None:
|
|
self.olm.update_tracked_users(room)
|
|
|
|
def _handle_room_forget_response(self, response: RoomForgetResponse):
|
|
self.encrypted_rooms.discard(response.room_id)
|
|
|
|
if response.room_id in self.rooms:
|
|
room = self.rooms.pop(response.room_id)
|
|
|
|
if room.encrypted and self.store:
|
|
self.store.delete_encrypted_room(room.room_id)
|
|
|
|
elif response.room_id in self.invited_rooms:
|
|
del self.invited_rooms[response.room_id]
|
|
|
|
def _handle_presence_response(self, response: PresenceGetResponse):
|
|
for room_id in self.rooms.keys():
|
|
if response.user_id not in self.rooms[room_id].users:
|
|
continue
|
|
|
|
self.rooms[room_id].users[response.user_id].presence = response.presence
|
|
self.rooms[room_id].users[
|
|
response.user_id
|
|
].last_active_ago = response.last_active_ago
|
|
self.rooms[room_id].users[response.user_id].currently_active = (
|
|
response.currently_active or False
|
|
)
|
|
self.rooms[room_id].users[response.user_id].status_msg = response.status_msg
|
|
|
|
def receive_response(
|
|
self, response: Response
|
|
) -> Union[None, Coroutine[Any, Any, None]]:
|
|
"""Receive a Matrix Response and change the client state accordingly.
|
|
|
|
Some responses will get edited for the callers convenience e.g. sync
|
|
responses that contain encrypted messages. The encrypted messages will
|
|
be replaced by decrypted ones if decryption is possible.
|
|
|
|
Args:
|
|
response (Response): the response that we wish the client to handle
|
|
"""
|
|
if not isinstance(response, Response):
|
|
raise ValueError("Invalid response received")
|
|
|
|
if isinstance(response, LoginResponse):
|
|
self._handle_login(response)
|
|
elif isinstance(response, LogoutResponse):
|
|
self._handle_logout(response)
|
|
elif isinstance(response, RegisterResponse):
|
|
self._handle_register(response)
|
|
elif isinstance(response, SyncResponse):
|
|
self._handle_sync(response)
|
|
elif isinstance(response, RoomMessagesResponse):
|
|
self._handle_messages_response(response)
|
|
elif isinstance(response, RoomContextResponse):
|
|
self._handle_context_response(response)
|
|
elif isinstance(response, KeysUploadResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, KeysQueryResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, KeysClaimResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, ShareGroupSessionResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, JoinedMembersResponse):
|
|
self._handle_joined_members(response)
|
|
elif isinstance(response, RoomKeyRequestResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, RoomForgetResponse):
|
|
self._handle_room_forget_response(response)
|
|
elif isinstance(response, ToDeviceResponse):
|
|
self._handle_olm_response(response)
|
|
elif isinstance(response, RoomGetEventResponse):
|
|
if isinstance(response.event, MegolmEvent) and self.olm is not None:
|
|
try:
|
|
response.event = self.decrypt_event(response.event)
|
|
except EncryptionError:
|
|
pass
|
|
elif isinstance(response, PresenceGetResponse):
|
|
self._handle_presence_response(response)
|
|
elif isinstance(response, ErrorResponse):
|
|
if response.soft_logout:
|
|
self.access_token = ""
|
|
|
|
return None
|
|
|
|
@store_loaded
|
|
def export_keys(self, outfile: str, passphrase: str, count: int = 10000):
|
|
"""Export all the Megolm decryption keys of this device.
|
|
|
|
The keys will be encrypted using the passphrase.
|
|
|
|
Note that this does not save other information such as the private
|
|
identity keys of the device.
|
|
|
|
Args:
|
|
outfile (str): The file to write the keys to.
|
|
passphrase (str): The encryption passphrase.
|
|
count (int): Optional. Round count for the underlying key
|
|
derivation. It is not recommended to specify it unless
|
|
absolutely sure of the consequences.
|
|
"""
|
|
assert self.olm
|
|
self.olm.export_keys(outfile, passphrase, count=count)
|
|
|
|
@store_loaded
|
|
def import_keys(self, infile: str, passphrase: str):
|
|
"""Import Megolm decryption keys.
|
|
|
|
The keys will be added to the current instance as well as written to
|
|
database.
|
|
|
|
Args:
|
|
infile (str): The file containing the keys.
|
|
passphrase (str): The decryption passphrase.
|
|
|
|
Raises `EncryptionError` if the file is invalid or couldn't be
|
|
decrypted.
|
|
|
|
Raises the usual file errors if the file couldn't be opened.
|
|
"""
|
|
assert self.olm
|
|
self.olm.import_keys(infile, passphrase)
|
|
|
|
@store_loaded
|
|
def get_missing_sessions(self, room_id: str) -> Dict[str, List[str]]:
|
|
"""Get users and devices for which we don't have active Olm sessions.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room for which we should get the
|
|
users with missing Olm sessions.
|
|
|
|
Raises `LocalProtocolError` if the room with the provided room id is
|
|
not found or the room is not encrypted.
|
|
"""
|
|
assert self.olm
|
|
|
|
if room_id not in self.rooms:
|
|
raise LocalProtocolError(f"No room found with room id {room_id}")
|
|
room = self.rooms[room_id]
|
|
|
|
if not room.encrypted:
|
|
raise LocalProtocolError(f"Room with id {room_id} is not encrypted")
|
|
|
|
return self.olm.get_missing_sessions(list(room.users))
|
|
|
|
@store_loaded
|
|
def get_users_for_key_claiming(self) -> Dict[str, List[str]]:
|
|
"""Get the content for a key claim request that needs to be made.
|
|
|
|
Returns a dictionary containing users as the keys and a list of devices
|
|
for which we will claim one-time keys.
|
|
|
|
Raises a LocalProtocolError if no key claim request needs to be made.
|
|
"""
|
|
assert self.olm
|
|
return self.olm.get_users_for_key_claiming()
|
|
|
|
@store_loaded
|
|
def encrypt(
|
|
self, room_id: str, message_type: str, content: Dict[Any, Any]
|
|
) -> Tuple[str, Dict[str, str]]:
|
|
"""Encrypt a message to be sent to the provided room.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room where the message will be
|
|
sent.
|
|
message_type (str): The type of the message.
|
|
content (str): The dictionary containing the content of the
|
|
message.
|
|
|
|
Raises `GroupEncryptionError` if the group session for the provided
|
|
room isn't shared yet.
|
|
|
|
Raises `MembersSyncError` if the room is encrypted but the room members
|
|
aren't fully loaded due to member lazy loading.
|
|
|
|
Returns a tuple containing the new message type and the new encrypted
|
|
content.
|
|
"""
|
|
assert self.olm
|
|
|
|
try:
|
|
room = self.rooms[room_id]
|
|
except KeyError:
|
|
raise LocalProtocolError(f"No such room with id {room_id} found.")
|
|
|
|
if not room.encrypted:
|
|
raise LocalProtocolError(f"Room {room_id} is not encrypted")
|
|
|
|
if not room.members_synced:
|
|
raise MembersSyncError(
|
|
"The room is encrypted and the members " "aren't fully synced."
|
|
)
|
|
|
|
encrypted_content = self.olm.group_encrypt(
|
|
room_id,
|
|
{"content": content, "type": message_type},
|
|
)
|
|
|
|
# The relationship needs to be sent unencrypted, so put it in the
|
|
# unencrypted content.
|
|
if "m.relates_to" in content:
|
|
encrypted_content["m.relates_to"] = content["m.relates_to"]
|
|
|
|
message_type = "m.room.encrypted"
|
|
|
|
return message_type, encrypted_content
|
|
|
|
def add_event_callback(
|
|
self,
|
|
callback: Callable[[MatrixRoom, Event], Optional[Awaitable[None]]],
|
|
filter: Union[Type[Event], Tuple[Type[Event], ...]],
|
|
) -> None:
|
|
"""Add a callback that will be executed on room events.
|
|
|
|
The callback can be used on joined rooms as well as on invited rooms.
|
|
The room parameter for the callback will have a different type
|
|
depending on if the room is joined or invited.
|
|
|
|
Args:
|
|
callback (Callable[[MatrixRoom, Event], Optional[Awaitable[None]]]): A
|
|
function that will be called if the event type in the filter
|
|
argument is found in a room timeline.
|
|
|
|
filter (Union[Type[Event], Tuple[Type[Event], ...]]):
|
|
The event type or a tuple
|
|
containing multiple types for which the function will be
|
|
called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.event_callbacks.append(cb)
|
|
|
|
def add_ephemeral_callback(
|
|
self,
|
|
callback: Callable[[MatrixRoom, EphemeralEvent], None],
|
|
filter: Union[Type[EphemeralEvent], Tuple[Type[EphemeralEvent], ...]],
|
|
) -> None:
|
|
"""Add a callback that will be executed on ephemeral room events.
|
|
|
|
Args:
|
|
callback (Callable[MatrixRoom, EphemeralEvent]):
|
|
A function that will be
|
|
called if the event type in the filter argument is found in the
|
|
ephemeral room event list.
|
|
|
|
filter
|
|
(Union[Type[EphemeralEvent], Tuple[Type[EphemeralEvent], ...]]):
|
|
The event type or a tuple containing
|
|
multiple types for which the function will be called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.ephemeral_callbacks.append(cb)
|
|
|
|
def add_global_account_data_callback(
|
|
self,
|
|
callback: Callable[[AccountDataEvent], None],
|
|
filter: Union[
|
|
Type[AccountDataEvent],
|
|
Tuple[Type[AccountDataEvent], ...],
|
|
],
|
|
) -> None:
|
|
"""Add a callback that will be executed on global account data events.
|
|
|
|
Args:
|
|
callback (Callable[[AccountDataEvent], None]):
|
|
A function that will be
|
|
called if the event type in the filter argument is found in
|
|
the account data event list.
|
|
|
|
filter
|
|
(Union[Type[AccountDataEvent], Tuple[Type[AccountDataEvent, ...]]):
|
|
The event type or a tuple
|
|
containing multiple types for which the function
|
|
will be called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.global_account_data_callbacks.append(cb)
|
|
|
|
def add_room_account_data_callback(
|
|
self,
|
|
callback: Callable[[MatrixRoom, AccountDataEvent], None],
|
|
filter: Union[
|
|
Type[AccountDataEvent],
|
|
Tuple[Type[AccountDataEvent], ...],
|
|
],
|
|
) -> None:
|
|
"""Add a callback that will be executed on room account data events.
|
|
|
|
Args:
|
|
callback (Callable[[MatrixRoom, AccountDataEvent], None]):
|
|
A function that will be
|
|
called if the event type in the filter argument is found in
|
|
the room account data event list.
|
|
|
|
filter
|
|
(Union[Type[AccountDataEvent], Tuple[Type[AccountDataEvent, ...]]):
|
|
The event type or a tuple
|
|
containing multiple types for which the function
|
|
will be called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.room_account_data_callbacks.append(cb)
|
|
|
|
def add_to_device_callback(
|
|
self,
|
|
callback: Callable[[ToDeviceEvent], None],
|
|
filter: Union[Type[ToDeviceEvent], Tuple[Type[ToDeviceEvent], ...]],
|
|
) -> None:
|
|
"""Add a callback that will be executed on to-device events.
|
|
|
|
Args:
|
|
callback (Callable[[ToDeviceEvent], None]): A function that will be
|
|
called if the event type in the filter argument is found in a
|
|
the to-device part of the sync response.
|
|
|
|
filter
|
|
(Union[Type[ToDeviceEvent], Tuple[Type[ToDeviceEvent], ...]]):
|
|
The event type or a tuple
|
|
containing multiple types for which the function
|
|
will be called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.to_device_callbacks.append(cb)
|
|
|
|
def add_presence_callback(
|
|
self,
|
|
callback: Callable[[PresenceEvent], None],
|
|
filter: Union[Type, Tuple[Type]],
|
|
):
|
|
"""Add a callback that will be executed on presence events.
|
|
|
|
Args:
|
|
callback (Callable[[PresenceEvent], None]): A function that will be
|
|
called if the event type in the filter argument is found in a
|
|
the presence part of the sync response.
|
|
filter (Union[Type, Tuple[Type]]): The event type or a tuple
|
|
containing multiple types for which the function
|
|
will be called.
|
|
|
|
"""
|
|
cb = ClientCallback(callback, filter)
|
|
self.presence_callbacks.append(cb)
|
|
|
|
@store_loaded
|
|
def create_key_verification(self, device: OlmDevice) -> ToDeviceMessage:
|
|
"""Start a new key verification process with the given device.
|
|
|
|
Args:
|
|
device (OlmDevice): The device which we would like to verify
|
|
|
|
Returns a ``ToDeviceMessage`` that should be sent to to the homeserver.
|
|
"""
|
|
assert self.olm
|
|
return self.olm.create_sas(device)
|
|
|
|
@store_loaded
|
|
def confirm_key_verification(self, transaction_id: str) -> ToDeviceMessage:
|
|
"""Confirm that the short auth string of a key verification matches.
|
|
|
|
Args:
|
|
transaction_id (str): The transaction id of the interactive key
|
|
verification.
|
|
|
|
Returns a ``ToDeviceMessage`` that should be sent to to the homeserver.
|
|
|
|
If the other user already confirmed the short auth string on their side
|
|
this function will also verify the device that is partaking in the
|
|
verification process.
|
|
"""
|
|
if transaction_id not in self.key_verifications:
|
|
raise LocalProtocolError(
|
|
f"Key verification with the transaction id {transaction_id} does not exist."
|
|
)
|
|
|
|
sas = self.key_verifications[transaction_id]
|
|
|
|
sas.accept_sas()
|
|
message = sas.get_mac()
|
|
|
|
if sas.verified:
|
|
self.verify_device(sas.other_olm_device)
|
|
|
|
return message
|
|
|
|
def room_devices(self, room_id: str) -> Dict[str, Dict[str, OlmDevice]]:
|
|
"""Get all Olm devices participating in a room.
|
|
|
|
Args:
|
|
room_id (str): The id of the room for which we would like to
|
|
collect all the devices.
|
|
|
|
Returns a dictionary holding the user as the key and a dictionary of
|
|
the device id as the key and OlmDevice as the value.
|
|
|
|
Raises LocalProtocolError if no room is found with the given room_id.
|
|
"""
|
|
devices: Dict[str, Dict[str, OlmDevice]] = defaultdict(dict)
|
|
|
|
if not self.olm:
|
|
return devices
|
|
|
|
try:
|
|
room = self.rooms[room_id]
|
|
except KeyError:
|
|
raise LocalProtocolError(f"No room found with room id {room_id}")
|
|
|
|
if not room.encrypted:
|
|
return devices
|
|
|
|
users = room.users.keys()
|
|
|
|
for user in users:
|
|
user_devices = self.device_store.active_user_devices(user)
|
|
devices[user] = {d.id: d for d in user_devices}
|
|
|
|
return devices
|
|
|
|
@store_loaded
|
|
def get_active_key_requests(
|
|
self, user_id: str, device_id: str
|
|
) -> List[RoomKeyRequest]:
|
|
"""Get key requests from a device that are waiting for verification.
|
|
|
|
Args:
|
|
user_id (str): The id of the user for which we would like to find
|
|
the active key requests.
|
|
device_id (str): The id of the device for which we would like to
|
|
find the active key requests.
|
|
|
|
Example:
|
|
>>> # A to-device callback that verifies devices that
|
|
>>> # request room keys and continues the room key sharing process.
|
|
>>> # Note that a single user/device can have multiple key requests
|
|
>>> # queued up.
|
|
>>> def key_share_cb(event):
|
|
... user_id = event.sender
|
|
... device_id = event.requesting_device_id
|
|
... device = client.device_store[user_id][device_id]
|
|
... client.verify_device(device)
|
|
... for request in client.get_active_key_requests(
|
|
... user_id, device_id):
|
|
... client.continue_key_share(request)
|
|
>>> client.add_to_device_callback(key_share_cb)
|
|
|
|
Returns:
|
|
list: A list of actively waiting key requests from the given user.
|
|
|
|
"""
|
|
assert self.olm
|
|
return self.olm.get_active_key_requests(user_id, device_id)
|
|
|
|
@store_loaded
|
|
def continue_key_share(self, event: RoomKeyRequest) -> bool:
|
|
"""Continue a previously interrupted key share event.
|
|
|
|
To handle room key requests properly client users need to add a
|
|
callback for RoomKeyRequest:
|
|
|
|
>>> client.add_to_device_callback(callback, RoomKeyRequest)
|
|
|
|
This callback will be run only if a room key request needs user
|
|
interaction, that is if a room key request is coming from an untrusted
|
|
device.
|
|
|
|
After a user has verified the requesting device the key sharing can be
|
|
continued using this method:
|
|
|
|
>>> client.continue_key_share(room_key_request)
|
|
|
|
Args:
|
|
event (RoomKeyRequest): The event which we would like to continue.
|
|
|
|
If the key share event is continued successfully a to-device message
|
|
will be queued up in the `client.outgoing_to_device_messages` list
|
|
waiting to be sent out
|
|
|
|
Returns:
|
|
bool: True if the request was continued, False otherwise.
|
|
|
|
"""
|
|
assert self.olm
|
|
return self.olm.continue_key_share(event)
|
|
|
|
@store_loaded
|
|
def cancel_key_share(self, event: RoomKeyRequest) -> bool:
|
|
"""Cancel a previously interrupted key share event.
|
|
|
|
This method is the counterpart to the `continue_key_share()` method. If
|
|
a user choses not to verify a device and does not want to share room
|
|
keys with such a device it should cancel the request with this method.
|
|
|
|
>>> client.cancel_key_share(room_key_request)
|
|
|
|
Args:
|
|
event (RoomKeyRequest): The event which we would like to cancel.
|
|
|
|
Returns:
|
|
bool: True if the request was cancelled, False otherwise.
|
|
|
|
"""
|
|
assert self.olm
|
|
return self.olm.cancel_key_share(event)
|