mirror of https://github.com/poljar/matrix-nio.git
3664 lines
130 KiB
Python
3664 lines
130 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright © 2018, 2019 Damir Jelić <poljar@termina.org.uk>
|
|
# Copyright © 2020-2021 Famedly GmbH
|
|
#
|
|
# Permission to use, copy, modify, and/or distribute this software for
|
|
# any purpose with or without fee is hereby granted, provided that the
|
|
# above copyright notice and this permission notice appear in all copies.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
|
|
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
|
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
|
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
|
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import warnings
|
|
from asyncio import Event as AsyncioEvent
|
|
from dataclasses import dataclass, field
|
|
from functools import partial, wraps
|
|
from json.decoder import JSONDecodeError
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
AsyncIterable,
|
|
BinaryIO,
|
|
Callable,
|
|
Coroutine,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
)
|
|
from urllib.parse import urlparse
|
|
from uuid import UUID, uuid4
|
|
|
|
from aiofiles.threadpool.binary import AsyncBufferedReader
|
|
from aiofiles.threadpool.text import AsyncTextIOWrapper
|
|
from aiohttp import (
|
|
ClientResponse,
|
|
ClientSession,
|
|
ClientTimeout,
|
|
ContentTypeError,
|
|
TraceConfig,
|
|
)
|
|
from aiohttp.client_exceptions import ClientConnectionError
|
|
from aiohttp.connector import Connection
|
|
from aiohttp_socks import ProxyConnector
|
|
|
|
from ..api import (
|
|
Api,
|
|
EventFormat,
|
|
MessageDirection,
|
|
PushRuleKind,
|
|
ResizingMethod,
|
|
RoomPreset,
|
|
RoomVisibility,
|
|
_FilterT,
|
|
)
|
|
from ..crypto import (
|
|
AsyncDataT,
|
|
OlmDevice,
|
|
async_encrypt_attachment,
|
|
async_generator_from_data,
|
|
)
|
|
from ..event_builders import ToDeviceMessage
|
|
from ..events import (
|
|
BadEventType,
|
|
Event,
|
|
MegolmEvent,
|
|
PushAction,
|
|
PushCondition,
|
|
RoomKeyRequest,
|
|
RoomKeyRequestCancellation,
|
|
ToDeviceEvent,
|
|
)
|
|
from ..exceptions import (
|
|
GroupEncryptionError,
|
|
LocalProtocolError,
|
|
MembersSyncError,
|
|
SendRetryError,
|
|
TransferCancelledError,
|
|
)
|
|
from ..monitors import TransferMonitor
|
|
from ..responses import (
|
|
ContentRepositoryConfigError,
|
|
ContentRepositoryConfigResponse,
|
|
DeleteAliasError,
|
|
DeleteAliasResponse,
|
|
DeleteDevicesAuthResponse,
|
|
DeleteDevicesError,
|
|
DeleteDevicesResponse,
|
|
DeletePushRuleError,
|
|
DeletePushRuleResponse,
|
|
DevicesError,
|
|
DevicesResponse,
|
|
DiscoveryInfoError,
|
|
DiscoveryInfoResponse,
|
|
DownloadError,
|
|
DownloadResponse,
|
|
EnablePushRuleError,
|
|
EnablePushRuleResponse,
|
|
ErrorResponse,
|
|
FileResponse,
|
|
GetOpenIDTokenError,
|
|
GetOpenIDTokenResponse,
|
|
JoinedMembersError,
|
|
JoinedMembersResponse,
|
|
JoinedRoomsError,
|
|
JoinedRoomsResponse,
|
|
JoinError,
|
|
JoinResponse,
|
|
KeysClaimError,
|
|
KeysClaimResponse,
|
|
KeysQueryError,
|
|
KeysQueryResponse,
|
|
KeysUploadError,
|
|
KeysUploadResponse,
|
|
LoginError,
|
|
LoginInfoError,
|
|
LoginInfoResponse,
|
|
LoginResponse,
|
|
LogoutError,
|
|
LogoutResponse,
|
|
PresenceGetError,
|
|
PresenceGetResponse,
|
|
PresenceSetError,
|
|
PresenceSetResponse,
|
|
ProfileGetAvatarError,
|
|
ProfileGetAvatarResponse,
|
|
ProfileGetDisplayNameError,
|
|
ProfileGetDisplayNameResponse,
|
|
ProfileGetError,
|
|
ProfileGetResponse,
|
|
ProfileSetAvatarError,
|
|
ProfileSetAvatarResponse,
|
|
ProfileSetDisplayNameError,
|
|
ProfileSetDisplayNameResponse,
|
|
PutAliasError,
|
|
PutAliasResponse,
|
|
RegisterResponse,
|
|
Response,
|
|
RoomBanError,
|
|
RoomBanResponse,
|
|
RoomContextError,
|
|
RoomContextResponse,
|
|
RoomCreateError,
|
|
RoomCreateResponse,
|
|
RoomDeleteAliasError,
|
|
RoomDeleteAliasResponse,
|
|
RoomForgetError,
|
|
RoomForgetResponse,
|
|
RoomGetEventError,
|
|
RoomGetEventResponse,
|
|
RoomGetStateError,
|
|
RoomGetStateEventError,
|
|
RoomGetStateEventResponse,
|
|
RoomGetStateResponse,
|
|
RoomGetVisibilityError,
|
|
RoomGetVisibilityResponse,
|
|
RoomInviteError,
|
|
RoomInviteResponse,
|
|
RoomKeyRequestError,
|
|
RoomKeyRequestResponse,
|
|
RoomKickError,
|
|
RoomKickResponse,
|
|
RoomLeaveError,
|
|
RoomLeaveResponse,
|
|
RoomMessagesError,
|
|
RoomMessagesResponse,
|
|
RoomPutAliasError,
|
|
RoomPutAliasResponse,
|
|
RoomPutStateError,
|
|
RoomPutStateResponse,
|
|
RoomReadMarkersError,
|
|
RoomReadMarkersResponse,
|
|
RoomRedactError,
|
|
RoomRedactResponse,
|
|
RoomResolveAliasError,
|
|
RoomResolveAliasResponse,
|
|
RoomSendResponse,
|
|
RoomTypingError,
|
|
RoomTypingResponse,
|
|
RoomUnbanError,
|
|
RoomUnbanResponse,
|
|
RoomUpdateAliasError,
|
|
RoomUpdateAliasResponse,
|
|
RoomUpgradeError,
|
|
RoomUpgradeResponse,
|
|
SetPushRuleActionsError,
|
|
SetPushRuleActionsResponse,
|
|
SetPushRuleError,
|
|
SetPushRuleResponse,
|
|
ShareGroupSessionError,
|
|
ShareGroupSessionResponse,
|
|
SyncError,
|
|
SyncResponse,
|
|
ThumbnailError,
|
|
ThumbnailResponse,
|
|
ToDeviceError,
|
|
ToDeviceResponse,
|
|
UpdateDeviceError,
|
|
UpdateDeviceResponse,
|
|
UpdateReceiptMarkerResponse,
|
|
UploadError,
|
|
UploadFilterError,
|
|
UploadFilterResponse,
|
|
UploadResponse,
|
|
WhoamiError,
|
|
WhoamiResponse,
|
|
)
|
|
from . import Client, ClientConfig
|
|
from .base_client import logged_in, store_loaded
|
|
|
|
_ShareGroupSessionT = Union[ShareGroupSessionError, ShareGroupSessionResponse]
|
|
|
|
_ProfileGetDisplayNameT = Union[
|
|
ProfileGetDisplayNameResponse, ProfileGetDisplayNameError
|
|
]
|
|
_ProfileSetDisplayNameT = Union[
|
|
ProfileSetDisplayNameResponse, ProfileSetDisplayNameError
|
|
]
|
|
|
|
DataProvider = Callable[[int, int], AsyncDataT]
|
|
SynchronousFile = (
|
|
io.TextIOBase,
|
|
io.BufferedReader,
|
|
io.BufferedRandom,
|
|
io.BytesIO,
|
|
io.FileIO,
|
|
)
|
|
AsyncFile = (AsyncBufferedReader, AsyncTextIOWrapper)
|
|
|
|
|
|
@dataclass
|
|
class ResponseCb:
|
|
"""Response callback."""
|
|
|
|
func: Callable = field()
|
|
filter: Union[Tuple[Type], Type, None] = None
|
|
|
|
|
|
async def on_request_chunk_sent(session, context, params):
|
|
"""TraceConfig callback to run when a chunk is sent for client uploads."""
|
|
|
|
context_obj = context.trace_request_ctx
|
|
|
|
if isinstance(context_obj, TransferMonitor):
|
|
context_obj.transferred += len(params.chunk)
|
|
|
|
|
|
async def connect_wrapper(self, *args, **kwargs) -> Connection:
|
|
connection = await type(self).connect(self, *args, **kwargs)
|
|
connection.transport.set_write_buffer_limits(16 * 1024)
|
|
return connection
|
|
|
|
|
|
def client_session(func):
|
|
"""Ensure that the Async client has a valid client session."""
|
|
|
|
@wraps(func)
|
|
async def wrapper(self, *args, **kwargs):
|
|
if not self.client_session:
|
|
trace = TraceConfig()
|
|
trace.on_request_chunk_sent.append(on_request_chunk_sent)
|
|
|
|
connector = ProxyConnector.from_url(self.proxy) if self.proxy else None
|
|
self.client_session = ClientSession(
|
|
timeout=ClientTimeout(total=self.config.request_timeout),
|
|
trace_configs=[trace],
|
|
connector=connector,
|
|
)
|
|
|
|
self.client_session.connector.connect = partial(
|
|
connect_wrapper,
|
|
self.client_session.connector,
|
|
)
|
|
|
|
return await func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AsyncClientConfig(ClientConfig):
|
|
"""Async nio client configuration.
|
|
|
|
Attributes:
|
|
max_limit_exceeded (int, optional): How many 429 (Too many requests)
|
|
errors can a request encounter before giving up and returning
|
|
an ErrorResponse.
|
|
Default is None for unlimited.
|
|
|
|
max_timeouts (int, optional): How many timeout connection errors can
|
|
a request encounter before giving up and raising the error:
|
|
a ClientConnectionError, TimeoutError, or asyncio.TimeoutError.
|
|
Default is None for unlimited.
|
|
|
|
backoff_factor (float): A backoff factor to apply between retries
|
|
for timeouts, starting from the second try.
|
|
nio will sleep for `backoff_factor * (2 ** (total_retries - 1))`
|
|
seconds.
|
|
For example, with the default backoff_factor of 0.1,
|
|
nio will sleep for 0.0, 0.2, 0.4, ... seconds between retries.
|
|
|
|
max_timeout_retry_wait_time (float): The maximum time in seconds to
|
|
wait between retries for timeouts, by default 60.
|
|
|
|
request_timeout (float): How many seconds a request has to finish,
|
|
before it is retried or raise an `asycio.TimeoutError` depending
|
|
on `max_timeouts`.
|
|
Defaults to 60 seconds, and can be disabled with `0`.
|
|
`AsyncClient.sync()` overrides this option with its
|
|
`timeout` argument.
|
|
The `download()`, `thumbnail()` and `upload()` methods ignore
|
|
this option and use `0`.
|
|
"""
|
|
|
|
max_limit_exceeded: Optional[int] = None
|
|
max_timeouts: Optional[int] = None
|
|
backoff_factor: float = 0.1
|
|
max_timeout_retry_wait_time: float = 60
|
|
request_timeout: float = 60
|
|
|
|
|
|
class AsyncClient(Client):
|
|
"""An async IO matrix client.
|
|
|
|
Args:
|
|
homeserver (str): The URL of the homeserver which we want to connect
|
|
to.
|
|
user (str, optional): The user which will be used when we log in to the
|
|
homeserver.
|
|
device_id (str, optional): An unique identifier that distinguishes
|
|
this client instance. If not set the server will provide one after
|
|
log in.
|
|
store_path (str, optional): The directory that should be used for state
|
|
storage.
|
|
config (AsyncClientConfig, optional): Configuration for the client.
|
|
ssl (bool/ssl.SSLContext, optional): SSL validation mode. None for
|
|
default SSL check (ssl.create_default_context() is used), False
|
|
for skip SSL certificate validation connection.
|
|
proxy (str, optional): The proxy that should be used for the HTTP
|
|
connection. Supports SOCKS4(a), SOCKS5, HTTP (tunneling) via an
|
|
URL like e.g. 'socks5://user:password@127.0.0.1:1080'.
|
|
|
|
Attributes:
|
|
synced (Event): An asyncio event that is fired every time the client
|
|
successfully syncs with the server. Note, this event will only be
|
|
fired if the `sync_forever()` method is used.
|
|
|
|
A simple example can be found bellow.
|
|
|
|
Example:
|
|
>>> client = AsyncClient("https://example.org", "example")
|
|
>>> login_response = loop.run_until_complete(
|
|
>>> client.login("hunter1")
|
|
>>> )
|
|
>>> asyncio.run(client.sync_forever(30000))
|
|
|
|
This example assumes a full sync on every run. If a sync token is provided
|
|
for the `since` parameter of the `sync_forever` method `full_state` should
|
|
be set to `True` as well.
|
|
|
|
Example:
|
|
>>> asyncio.run(
|
|
>>> client.sync_forever(30000, since="token123",
|
|
>>> full_state=True)
|
|
>>> )
|
|
|
|
The client can also be configured to store and restore the sync token
|
|
automatically. The `full_state` argument should be set to `True` in that
|
|
case as well.
|
|
|
|
Example:
|
|
>>> config = ClientConfig(store_sync_tokens=True)
|
|
>>> client = AsyncClient("https://example.org", "example",
|
|
>>> store_path="/home/example",
|
|
>>> config=config)
|
|
>>> login_response = loop.run_until_complete(
|
|
>>> client.login("hunter1")
|
|
>>> )
|
|
>>> asyncio.run(client.sync_forever(30000, full_state=True))
|
|
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
homeserver: str,
|
|
user: str = "",
|
|
device_id: Optional[str] = "",
|
|
store_path: Optional[str] = "",
|
|
config: Optional[AsyncClientConfig] = None,
|
|
ssl: Optional[bool] = None,
|
|
proxy: Optional[str] = None,
|
|
):
|
|
self.homeserver = homeserver
|
|
self.client_session: Optional[ClientSession] = None
|
|
|
|
self.ssl = ssl
|
|
self.proxy = proxy
|
|
|
|
self._presence: Optional[str] = None
|
|
|
|
self.synced = AsyncioEvent()
|
|
self.response_callbacks: List[ResponseCb] = []
|
|
|
|
self.sharing_session: Dict[str, AsyncioEvent] = dict()
|
|
|
|
is_config = isinstance(config, ClientConfig)
|
|
is_async_config = isinstance(config, AsyncClientConfig)
|
|
|
|
if is_config and not is_async_config:
|
|
warnings.warn(
|
|
"Pass an AsyncClientConfig instead of ClientConfig.",
|
|
DeprecationWarning,
|
|
)
|
|
config = AsyncClientConfig(**config.__dict__)
|
|
|
|
self.config: AsyncClientConfig = config or AsyncClientConfig()
|
|
|
|
super().__init__(user, device_id, store_path, self.config)
|
|
|
|
def add_response_callback(
|
|
self,
|
|
func: Coroutine[Any, Any, Response],
|
|
cb_filter: Union[Tuple[Type], Type, None] = None,
|
|
):
|
|
"""Add a coroutine that will be called if a response is received.
|
|
|
|
Args:
|
|
func (Coroutine): The coroutine that will be called with the
|
|
response as the argument.
|
|
cb_filter (Type, optional): A type or a tuple of types for which
|
|
the callback should be called.
|
|
|
|
Example:
|
|
|
|
>>> # A callback that will be called every time our `sync_forever`
|
|
>>> # method succesfully syncs with the server.
|
|
>>> async def sync_cb(response):
|
|
... print(f"We synced, token: {response.next_batch}")
|
|
...
|
|
>>> client.add_response_callback(sync_cb, SyncResponse)
|
|
>>> await client.sync_forever(30000)
|
|
|
|
"""
|
|
cb = ResponseCb(func, cb_filter) # type: ignore
|
|
self.response_callbacks.append(cb)
|
|
|
|
async def parse_body(self, transport_response: ClientResponse) -> Dict[Any, Any]:
|
|
"""Parse the body of the response.
|
|
|
|
Low-level function which is normally only used by other methods of
|
|
this class.
|
|
|
|
Args:
|
|
transport_response(ClientResponse): The transport response that
|
|
contains the body of the response.
|
|
|
|
Returns a dictionary representing the response.
|
|
"""
|
|
try:
|
|
return await transport_response.json()
|
|
except (JSONDecodeError, ContentTypeError):
|
|
try:
|
|
# matrix.org return an incorrect content-type for .well-known
|
|
# API requests, which leads to .text() working but not .json()
|
|
return json.loads(await transport_response.text())
|
|
except (JSONDecodeError, ContentTypeError):
|
|
pass
|
|
|
|
return {}
|
|
|
|
async def create_matrix_response(
|
|
self,
|
|
response_class: Type,
|
|
transport_response: ClientResponse,
|
|
data: Tuple[Any, ...] = None,
|
|
) -> Response:
|
|
"""Transform a transport response into a nio matrix response.
|
|
|
|
Low-level function which is normally only used by other methods of
|
|
this class.
|
|
|
|
Args:
|
|
response_class (Type): The class that the requests belongs to.
|
|
transport_response (ClientResponse): The underlying transport
|
|
response that contains our response body.
|
|
data (Tuple, optional): Extra data that is required to instantiate
|
|
the response class.
|
|
|
|
Returns a subclass of `Response` depending on the type of the
|
|
response_class argument.
|
|
"""
|
|
data = data or ()
|
|
|
|
content_type = transport_response.content_type
|
|
is_json = content_type == "application/json"
|
|
|
|
name = None
|
|
if transport_response.content_disposition:
|
|
name = transport_response.content_disposition.filename
|
|
|
|
if issubclass(response_class, FileResponse) and is_json:
|
|
parsed_dict = await self.parse_body(transport_response)
|
|
resp = response_class.from_data(parsed_dict, content_type, name)
|
|
|
|
elif issubclass(response_class, FileResponse):
|
|
body = await transport_response.read()
|
|
resp = response_class.from_data(body, content_type, name)
|
|
elif (
|
|
issubclass(response_class, RoomGetStateEventResponse)
|
|
and transport_response.status == 404
|
|
):
|
|
parsed_dict = await self.parse_body(transport_response)
|
|
resp = response_class.create_error(parsed_dict, data[-1])
|
|
|
|
elif (
|
|
transport_response.status == 401 and response_class == DeleteDevicesResponse
|
|
):
|
|
parsed_dict = await self.parse_body(transport_response)
|
|
resp = DeleteDevicesAuthResponse.from_dict(parsed_dict)
|
|
|
|
else:
|
|
parsed_dict = await self.parse_body(transport_response)
|
|
resp = response_class.from_dict(parsed_dict, *data)
|
|
|
|
resp.transport_response = transport_response
|
|
return resp
|
|
|
|
async def _run_to_device_callbacks(self, event: Union[ToDeviceEvent]):
|
|
for cb in self.to_device_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(event)
|
|
|
|
async def _handle_to_device(self, response: SyncResponse):
|
|
decrypted_to_device = []
|
|
|
|
for index, to_device_event in enumerate(response.to_device_events):
|
|
decrypted_event = self._handle_decrypt_to_device(to_device_event)
|
|
|
|
if decrypted_event:
|
|
decrypted_to_device.append((index, decrypted_event))
|
|
to_device_event = decrypted_event
|
|
|
|
# Do not pass room key request events to our user here. We don't
|
|
# want to notify them about requests that get automatically handled
|
|
# or canceled right away.
|
|
if isinstance(
|
|
to_device_event, (RoomKeyRequest, RoomKeyRequestCancellation)
|
|
):
|
|
continue
|
|
|
|
await self._run_to_device_callbacks(to_device_event)
|
|
|
|
self._replace_decrypted_to_device(decrypted_to_device, response)
|
|
|
|
async def _handle_invited_rooms(self, response: SyncResponse):
|
|
for room_id, info in response.rooms.invite.items():
|
|
room = self._get_invited_room(room_id)
|
|
|
|
for event in info.invite_state:
|
|
room.handle_event(event)
|
|
|
|
for cb in self.event_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(room, event)
|
|
|
|
async def _handle_joined_rooms(self, response: SyncResponse) -> None:
|
|
encrypted_rooms: Set[str] = set()
|
|
|
|
for room_id, join_info in response.rooms.join.items():
|
|
self._handle_joined_state(room_id, join_info, encrypted_rooms)
|
|
|
|
room = self.rooms[room_id]
|
|
decrypted_events: List[Tuple[int, Union[Event, BadEventType]]] = []
|
|
|
|
for index, event in enumerate(join_info.timeline.events):
|
|
decrypted_event = self._handle_timeline_event(
|
|
event, room_id, room, encrypted_rooms
|
|
)
|
|
|
|
if decrypted_event:
|
|
event = decrypted_event
|
|
decrypted_events.append((index, decrypted_event))
|
|
|
|
for cb in self.event_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(room, event)
|
|
|
|
# Replace the Megolm events with decrypted ones
|
|
for index, event in decrypted_events:
|
|
join_info.timeline.events[index] = event
|
|
|
|
for event in join_info.ephemeral:
|
|
room.handle_ephemeral_event(event)
|
|
|
|
for cb in self.ephemeral_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(room, event)
|
|
|
|
for event in join_info.account_data:
|
|
room.handle_account_data(event)
|
|
|
|
for cb in self.room_account_data_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(room, event)
|
|
|
|
if room.encrypted and self.olm is not None:
|
|
self.olm.update_tracked_users(room)
|
|
|
|
self.encrypted_rooms.update(encrypted_rooms)
|
|
|
|
if self.store:
|
|
self.store.save_encrypted_rooms(encrypted_rooms)
|
|
|
|
async def _handle_presence_events(self, response: SyncResponse):
|
|
for event in response.presence_events:
|
|
for room_id in self.rooms.keys():
|
|
if event.user_id not in self.rooms[room_id].users:
|
|
continue
|
|
|
|
self.rooms[room_id].users[event.user_id].presence = event.presence
|
|
self.rooms[room_id].users[
|
|
event.user_id
|
|
].last_active_ago = event.last_active_ago
|
|
self.rooms[room_id].users[
|
|
event.user_id
|
|
].currently_active = event.currently_active
|
|
self.rooms[room_id].users[event.user_id].status_msg = event.status_msg
|
|
|
|
for cb in self.presence_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(event)
|
|
|
|
async def _handle_global_account_data_events( # type: ignore
|
|
self,
|
|
response: SyncResponse,
|
|
) -> None:
|
|
for event in response.account_data_events:
|
|
for cb in self.global_account_data_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(event)
|
|
|
|
async def _handle_expired_verifications(self):
|
|
expired_verifications = self.olm.clear_verifications()
|
|
|
|
for event in expired_verifications:
|
|
for cb in self.to_device_callbacks:
|
|
if cb.filter is None or isinstance(event, cb.filter):
|
|
await asyncio.coroutine(cb.func)(event)
|
|
|
|
async def _handle_sync(self, response: SyncResponse) -> None:
|
|
# We already recieved such a sync response, do nothing in that case.
|
|
if self.next_batch == response.next_batch:
|
|
return
|
|
|
|
self.next_batch = response.next_batch
|
|
|
|
if self.config.store_sync_tokens and self.store:
|
|
self.store.save_sync_token(self.next_batch)
|
|
|
|
await self._handle_to_device(response)
|
|
|
|
await self._handle_invited_rooms(response)
|
|
|
|
await self._handle_joined_rooms(response)
|
|
|
|
await self._handle_presence_events(response)
|
|
|
|
await self._handle_global_account_data_events(response)
|
|
|
|
if self.olm:
|
|
await self._handle_expired_verifications()
|
|
self._handle_olm_events(response)
|
|
await self._collect_key_requests()
|
|
|
|
async def _collect_key_requests(self):
|
|
events = self.olm.collect_key_requests()
|
|
for event in events:
|
|
await self._run_to_device_callbacks(event)
|
|
|
|
async def receive_response(self, response: Response) -> None:
|
|
"""Receive a Matrix Response and change the client state accordingly.
|
|
|
|
Automatically called for all "high-level" methods of this API (each
|
|
function documents calling it).
|
|
|
|
Some responses will get edited for the callers convenience e.g. sync
|
|
responses that contain encrypted messages. The encrypted messages will
|
|
be replaced by decrypted ones if decryption is possible.
|
|
|
|
Args:
|
|
response (Response): the response that we wish the client to handle
|
|
"""
|
|
if not isinstance(response, Response):
|
|
raise ValueError("Invalid response received")
|
|
|
|
if isinstance(response, SyncResponse):
|
|
await self._handle_sync(response)
|
|
else:
|
|
super().receive_response(response)
|
|
|
|
async def get_timeout_retry_wait_time(self, got_timeouts: int) -> float:
|
|
if got_timeouts < 2:
|
|
return 0.0
|
|
|
|
return min(
|
|
self.config.backoff_factor * (2 ** (min(got_timeouts, 1000) - 1)),
|
|
self.config.max_timeout_retry_wait_time,
|
|
)
|
|
|
|
async def _send(
|
|
self,
|
|
response_class: Type,
|
|
method: str,
|
|
path: str,
|
|
data: Union[None, str, AsyncDataT] = None,
|
|
response_data: Optional[Tuple[Any, ...]] = None,
|
|
content_type: Optional[str] = None,
|
|
trace_context: Optional[Any] = None,
|
|
data_provider: Optional[DataProvider] = None,
|
|
timeout: Optional[float] = None,
|
|
content_length: Optional[int] = None,
|
|
):
|
|
headers = (
|
|
{"Content-Type": content_type}
|
|
if content_type
|
|
else {"Content-Type": "application/json"}
|
|
)
|
|
|
|
if content_length is not None:
|
|
headers["Content-Length"] = str(content_length)
|
|
|
|
if self.config.custom_headers is not None:
|
|
headers.update(self.config.custom_headers)
|
|
|
|
got_429 = 0
|
|
max_429 = self.config.max_limit_exceeded
|
|
|
|
got_timeouts = 0
|
|
max_timeouts = self.config.max_timeouts
|
|
|
|
while True:
|
|
if data_provider:
|
|
# mypy expects an "Awaitable[Any]" but data_provider is a
|
|
# method generated during runtime that may or may not be
|
|
# Awaitable. The actual type is a union of the types that we
|
|
# can receive from reading files.
|
|
data = await data_provider(got_429, got_timeouts) # type: ignore
|
|
|
|
try:
|
|
transport_resp = await self.send(
|
|
method,
|
|
path,
|
|
data,
|
|
headers,
|
|
trace_context,
|
|
timeout,
|
|
)
|
|
|
|
resp = await self.create_matrix_response(
|
|
response_class,
|
|
transport_resp,
|
|
response_data,
|
|
)
|
|
|
|
if transport_resp.status == 429 or (
|
|
isinstance(resp, ErrorResponse)
|
|
and resp.status_code in ("M_LIMIT_EXCEEDED", 429)
|
|
):
|
|
got_429 += 1
|
|
|
|
if max_429 is not None and got_429 > max_429:
|
|
break
|
|
|
|
await self.run_response_callbacks([resp])
|
|
|
|
retry_after_ms = getattr(resp, "retry_after_ms", 0) or 5000
|
|
await asyncio.sleep(retry_after_ms / 1000)
|
|
else:
|
|
break
|
|
|
|
except (ClientConnectionError, TimeoutError, asyncio.TimeoutError):
|
|
got_timeouts += 1
|
|
|
|
if max_timeouts is not None and got_timeouts > max_timeouts:
|
|
raise
|
|
|
|
wait = await self.get_timeout_retry_wait_time(got_timeouts)
|
|
await asyncio.sleep(wait)
|
|
|
|
await self.receive_response(resp)
|
|
return resp
|
|
|
|
@client_session
|
|
async def send(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: Union[None, str, AsyncDataT] = None,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
trace_context: Any = None,
|
|
timeout: Optional[float] = None,
|
|
) -> ClientResponse:
|
|
"""Send a request to the homeserver.
|
|
|
|
This function does not call receive_response().
|
|
|
|
Args:
|
|
method (str): The request method that should be used. One of get,
|
|
post, put, delete.
|
|
path (str): The URL path of the request.
|
|
data (str, optional): Data that will be posted with the request.
|
|
headers (Dict[str,str] , optional): Additional request headers that
|
|
should be used with the request.
|
|
trace_context (Any, optional): An object to use for the
|
|
ClientSession TraceConfig context
|
|
timeout (int, optional): How many seconds the request has before
|
|
raising `asyncio.TimeoutError`.
|
|
Overrides `AsyncClient.config.request_timeout` if not `None`.
|
|
"""
|
|
assert self.client_session
|
|
|
|
return await self.client_session.request(
|
|
method,
|
|
self.homeserver + path,
|
|
data=data,
|
|
ssl=self.ssl,
|
|
headers=headers,
|
|
trace_request_ctx=trace_context,
|
|
timeout=self.config.request_timeout if timeout is None else timeout,
|
|
)
|
|
|
|
async def mxc_to_http(
|
|
self,
|
|
mxc: str,
|
|
homeserver: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""Convert a matrix content URI to a HTTP URI."""
|
|
return Api.mxc_to_http(mxc, homeserver or self.homeserver)
|
|
|
|
async def login_raw(
|
|
self, auth_dict: Dict[str, Any]
|
|
) -> Union[LoginResponse, LoginError]:
|
|
"""Login to the homeserver using a raw dictionary.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
auth_dict (Dict[str, Any]): The auth dictionary.
|
|
See the example below and here
|
|
https://matrix.org/docs/spec/client_server/r0.6.0#authentication-types
|
|
for detailed documentation
|
|
|
|
Example:
|
|
>>> auth_dict = {
|
|
>>> "type": "m.login.password",
|
|
>>> "identifier": {
|
|
>>> "type": "m.id.thirdparty",
|
|
>>> "medium": "email",
|
|
>>> "address": "testemail@mail.org"
|
|
>>> },
|
|
>>> "password": "PASSWORDABCD",
|
|
>>> "initial_device_display_name": "Test user"
|
|
>>> }
|
|
|
|
Returns either a `LoginResponse` if the request was successful or
|
|
a `LoginError` if there was an error with the request.
|
|
"""
|
|
if auth_dict is None or auth_dict == {}:
|
|
raise ValueError("Auth dictionary shall not be empty")
|
|
|
|
method, path, data = Api.login_raw(auth_dict)
|
|
|
|
return await self._send(LoginResponse, method, path, data)
|
|
|
|
async def register(self, username, password, device_name=""):
|
|
"""Register with homeserver.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
username (str): Username to register the new user as.
|
|
password (str): New password for the user.
|
|
device_name (str): A display name to assign to a newly-created
|
|
device. Ignored if the logged in device corresponds to a
|
|
known device.
|
|
|
|
Returns a 'RegisterResponse' if successful.
|
|
"""
|
|
method, path, data = Api.register(
|
|
user=username,
|
|
password=password,
|
|
device_name=device_name,
|
|
device_id=self.device_id,
|
|
)
|
|
|
|
return await self._send(RegisterResponse, method, path, data)
|
|
|
|
async def discovery_info(
|
|
self,
|
|
) -> Union[DiscoveryInfoResponse, DiscoveryInfoError]:
|
|
"""Get discovery information about current `AsyncClient.homeserver`.
|
|
|
|
Returns either a `DiscoveryInfoResponse` if the request was successful
|
|
or a `DiscoveryInfoError` if there was an error with the request.
|
|
|
|
Some homeservers do not redirect requests to their main domain and
|
|
instead require clients to use a specific URL for communication.
|
|
|
|
If the domain specified by the `AsyncClient.homeserver` URL
|
|
implements the
|
|
[.well-known](https://matrix.org/docs/spec/client_server/latest#id178),
|
|
discovery mechanism, this method can be used to retrieve the
|
|
actual homeserver URL from it.
|
|
|
|
Example:
|
|
>>> client = AsyncClient(homeserver="https://example.org")
|
|
>>> response = await client.discovery_info()
|
|
>>> if isinstance(response, DiscoveryInfoResponse):
|
|
>>> client.homeserver = response.homeserver_url
|
|
"""
|
|
method, path = Api.discovery_info()
|
|
return await self._send(DiscoveryInfoResponse, method, path)
|
|
|
|
async def login_info(self) -> Union[LoginInfoResponse, LoginInfoError]:
|
|
"""Get the available login methods from the server
|
|
|
|
Returns either a `LoginInfoResponse` if the request was successful or
|
|
a `LoginInfoError` if there was an error with the request.
|
|
|
|
"""
|
|
method, path = Api.login_info()
|
|
|
|
return await self._send(LoginInfoResponse, method, path)
|
|
|
|
async def login(
|
|
self,
|
|
password: Optional[str] = None,
|
|
device_name: Optional[str] = "",
|
|
token: Optional[str] = None,
|
|
) -> Union[LoginResponse, LoginError]:
|
|
"""Login to the homeserver.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
password (str, optional): The user's password.
|
|
device_name (str): A display name to assign to a newly-created
|
|
device. Ignored if the logged in device corresponds to a
|
|
known device.
|
|
token (str, optional): A login token, for example provided by a
|
|
single sign-on service.
|
|
|
|
Either a password or a token needs to be provided.
|
|
|
|
Returns either a `LoginResponse` if the request was successful or
|
|
a `LoginError` if there was an error with the request.
|
|
"""
|
|
|
|
if password is None and token is None:
|
|
raise ValueError("Either a password or a token needs to be provided")
|
|
|
|
method, path, data = Api.login(
|
|
self.user,
|
|
password=password,
|
|
device_name=device_name,
|
|
device_id=self.device_id,
|
|
token=token,
|
|
)
|
|
|
|
return await self._send(LoginResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def logout(
|
|
self, all_devices: bool = False
|
|
) -> Union[LogoutResponse, LogoutError]:
|
|
"""Logout from the homeserver.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either 'LogoutResponse' if the request was successful or
|
|
a `Logouterror` if there was an error with the request.
|
|
"""
|
|
method, path, data = Api.logout(self.access_token, all_devices)
|
|
|
|
return await self._send(LogoutResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def sync(
|
|
self,
|
|
timeout: Optional[int] = 0,
|
|
sync_filter: _FilterT = None,
|
|
since: Optional[str] = None,
|
|
full_state: Optional[bool] = None,
|
|
set_presence: Optional[str] = None,
|
|
) -> Union[SyncResponse, SyncError]:
|
|
"""Synchronise the client's state with the latest state on the server.
|
|
|
|
In general you should use sync_forever() which handles additional
|
|
tasks automatically (like sending encryption keys among others).
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
timeout(int, optional): The maximum time that the server should
|
|
wait for new events before it should return the request
|
|
anyways, in milliseconds.
|
|
If ``0``, no timeout is applied.
|
|
If ``None``, use ``AsyncClient.config.request_timeout``.
|
|
If a timeout is applied and the server fails to return after
|
|
15 seconds of expected timeout,
|
|
the client will timeout by itself.
|
|
sync_filter (Union[None, str, Dict[Any, Any]):
|
|
A filter ID that can be obtained from
|
|
``AsyncClient.upload_filter()`` (preferred),
|
|
or filter dict that should be used for this sync request.
|
|
full_state (bool, optional): Controls whether to include the full
|
|
state for all rooms the user is a member of. If this is set to
|
|
true, then all state events will be returned, even if since is
|
|
non-empty. The timeline will still be limited by the since
|
|
parameter.
|
|
since (str, optional): A token specifying a point in time where to
|
|
continue the sync from. Defaults to the last sync token we
|
|
received from the server using this API call.
|
|
set_presence (str, optional): The presence state.
|
|
One of: ["online", "offline", "unavailable"]
|
|
|
|
Returns either a `SyncResponse` if the request was successful or
|
|
a `SyncError` if there was an error with the request.
|
|
"""
|
|
|
|
sync_token = since or self.next_batch
|
|
presence = set_presence or self._presence
|
|
method, path = Api.sync(
|
|
self.access_token,
|
|
since=sync_token or self.loaded_sync_token,
|
|
timeout=timeout or None,
|
|
filter=sync_filter,
|
|
full_state=full_state,
|
|
set_presence=presence,
|
|
)
|
|
|
|
response = await self._send(
|
|
SyncResponse,
|
|
method,
|
|
path,
|
|
# 0 if full_state: server doesn't respect timeout if full_state
|
|
# + 15: give server a chance to naturally return before we timeout
|
|
timeout=0 if full_state else timeout / 1000 + 15 if timeout else timeout,
|
|
)
|
|
|
|
return response
|
|
|
|
@logged_in
|
|
async def send_to_device_messages(
|
|
self,
|
|
) -> List[Union[ToDeviceResponse, ToDeviceError]]:
|
|
"""Send out outgoing to-device messages.
|
|
|
|
Automatically called by sync_forever().
|
|
"""
|
|
if not self.outgoing_to_device_messages:
|
|
return []
|
|
|
|
tasks = []
|
|
|
|
for message in self.outgoing_to_device_messages:
|
|
task = asyncio.ensure_future(self.to_device(message))
|
|
tasks.append(task)
|
|
|
|
return await asyncio.gather(*tasks)
|
|
|
|
async def run_response_callbacks(
|
|
self, responses: List[Union[Response, ErrorResponse]]
|
|
):
|
|
"""Run the configured response callbacks for the given responses.
|
|
|
|
Low-level function which is normally only used by other methods of
|
|
this class. Automatically called by sync_forever() and all functions
|
|
calling receive_response().
|
|
"""
|
|
for response in responses:
|
|
for cb in self.response_callbacks:
|
|
if cb.filter is None or isinstance(response, cb.filter):
|
|
await asyncio.coroutine(cb.func)(response)
|
|
|
|
@logged_in
|
|
async def sync_forever(
|
|
self,
|
|
timeout: Optional[int] = None,
|
|
sync_filter: _FilterT = None,
|
|
since: Optional[str] = None,
|
|
full_state: Optional[bool] = None,
|
|
loop_sleep_time: Optional[int] = None,
|
|
first_sync_filter: _FilterT = None,
|
|
set_presence: Optional[str] = None,
|
|
):
|
|
"""Continuously sync with the configured homeserver.
|
|
|
|
This method calls the sync method in a loop. To react to events event
|
|
callbacks should be configured.
|
|
|
|
The loop also makes sure to handle other required requests between
|
|
syncs, including to_device messages and sending encryption keys if
|
|
required. To react to the responses a response callback should be
|
|
added.
|
|
|
|
Args:
|
|
timeout (int, optional): The maximum time that the server should
|
|
wait for new events before it should return the request
|
|
anyways, in milliseconds.
|
|
If ``0``, no timeout is applied.
|
|
If ``None``, ``AsyncClient.config.request_timeout`` is used.
|
|
In any case, ``0`` is always used for the first sync.
|
|
If a timeout is applied and the server fails to return after
|
|
15 seconds of expected timeout,
|
|
the client will timeout by itself.
|
|
|
|
sync_filter (Union[None, str, Dict[Any, Any]):
|
|
A filter ID that can be obtained from
|
|
``AsyncClient.upload_filter()`` (preferred),
|
|
or filter dict that should be used for sync requests.
|
|
|
|
full_state (bool, optional): Controls whether to include the full
|
|
state for all rooms the user is a member of. If this is set to
|
|
true, then all state events will be returned, even if since is
|
|
non-empty. The timeline will still be limited by the since
|
|
parameter. This argument will be used only for the first sync
|
|
request.
|
|
|
|
since (str, optional): A token specifying a point in time where to
|
|
continue the sync from. Defaults to the last sync token we
|
|
received from the server using this API call. This argument
|
|
will be used only for the first sync request, the subsequent
|
|
sync requests will use the token from the last sync response.
|
|
|
|
loop_sleep_time (int, optional): The sleep time, if any, between
|
|
successful sync loop iterations in milliseconds.
|
|
|
|
first_sync_filter (Union[None, str, Dict[Any, Any]):
|
|
A filter ID that can be obtained from
|
|
``AsyncClient.upload_filter()`` (preferred),
|
|
or filter dict to use for the first sync request only.
|
|
If `None` (default), the `sync_filter` parameter's value
|
|
is used.
|
|
To have no filtering for the first sync regardless of
|
|
`sync_filter`'s value, pass `{}`.
|
|
|
|
set_presence (str, optional): The presence state.
|
|
One of: ["online", "offline", "unavailable"]
|
|
"""
|
|
|
|
first_sync = True
|
|
|
|
while True:
|
|
try:
|
|
use_filter = first_sync_filter if first_sync else sync_filter
|
|
use_timeout = 0 if first_sync else timeout
|
|
|
|
tasks = []
|
|
|
|
# Make sure that if this is our first sync that the sync happens
|
|
# before the other requests, this helps to ensure that after one
|
|
# fired synced event the state is indeed fully synced.
|
|
if first_sync:
|
|
presence = set_presence or self._presence
|
|
sync_response = await self.sync(
|
|
use_timeout, use_filter, since, full_state, presence
|
|
)
|
|
await self.run_response_callbacks([sync_response])
|
|
else:
|
|
presence = set_presence or self._presence
|
|
tasks = [
|
|
asyncio.ensure_future(coro)
|
|
for coro in (
|
|
self.sync(
|
|
use_timeout, use_filter, since, full_state, presence
|
|
),
|
|
self.send_to_device_messages(),
|
|
)
|
|
]
|
|
|
|
if self.should_upload_keys:
|
|
tasks.append(asyncio.ensure_future(self.keys_upload()))
|
|
|
|
if self.should_query_keys:
|
|
tasks.append(asyncio.ensure_future(self.keys_query()))
|
|
|
|
if self.should_claim_keys:
|
|
tasks.append(
|
|
asyncio.ensure_future(
|
|
self.keys_claim(self.get_users_for_key_claiming()),
|
|
)
|
|
)
|
|
|
|
for response in asyncio.as_completed(tasks):
|
|
await self.run_response_callbacks([await response])
|
|
|
|
first_sync = False
|
|
full_state = None
|
|
since = None
|
|
|
|
self.synced.set()
|
|
self.synced.clear()
|
|
|
|
if loop_sleep_time:
|
|
await asyncio.sleep(loop_sleep_time / 1000)
|
|
|
|
except asyncio.CancelledError:
|
|
for task in tasks:
|
|
task.cancel()
|
|
|
|
break
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def start_key_verification(
|
|
self, device: OlmDevice, tx_id: Optional[str] = None
|
|
) -> Union[ToDeviceResponse, ToDeviceError]:
|
|
"""Start a interactive key verification with the given device.
|
|
|
|
Returns either a `ToDeviceResponse` if the request was successful or
|
|
a `ToDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
device (OlmDevice): An device with which we would like to start the
|
|
interactive key verification process.
|
|
"""
|
|
message = self.create_key_verification(device)
|
|
return await self.to_device(message, tx_id)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def cancel_key_verification(
|
|
self,
|
|
transaction_id: str,
|
|
reject: bool = False,
|
|
tx_id: Optional[str] = None,
|
|
) -> Union[ToDeviceResponse, ToDeviceError]:
|
|
"""Cancel a interactive key verification with the given device.
|
|
|
|
Returns either a `ToDeviceResponse` if the request was successful or
|
|
a `ToDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
transaction_id (str): An transaction id of a valid key verification
|
|
process.
|
|
reject (bool): Is the cancelation reason because we're rejecting
|
|
the short auth string and mark it as mismatching or a normal
|
|
user cancelation.
|
|
|
|
Raises a LocalProtocolError no verification process with the given
|
|
transaction ID exists or if reject is True and the short auth string
|
|
couldn't be shown yet because plublic keys weren't yet exchanged.
|
|
"""
|
|
if transaction_id not in self.key_verifications:
|
|
raise LocalProtocolError(
|
|
f"Key verification with the transaction id {transaction_id} does not exist."
|
|
)
|
|
|
|
sas = self.key_verifications[transaction_id]
|
|
|
|
if reject:
|
|
sas.reject_sas()
|
|
else:
|
|
sas.cancel()
|
|
|
|
message = sas.get_cancellation()
|
|
|
|
return await self.to_device(message, tx_id)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def accept_key_verification(
|
|
self, transaction_id: str, tx_id: Optional[str] = None
|
|
) -> Union[ToDeviceResponse, ToDeviceError]:
|
|
"""Accept a key verification start event.
|
|
|
|
Returns either a `ToDeviceResponse` if the request was successful or
|
|
a `ToDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
transaction_id (str): An transaction id of a valid key verification
|
|
process.
|
|
"""
|
|
if transaction_id not in self.key_verifications:
|
|
raise LocalProtocolError(
|
|
f"Key verification with the transaction id {transaction_id} does not exist."
|
|
)
|
|
|
|
sas = self.key_verifications[transaction_id]
|
|
|
|
message = sas.accept_verification()
|
|
|
|
return await self.to_device(message, tx_id)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def confirm_short_auth_string(
|
|
self, transaction_id: str, tx_id: Optional[str] = None
|
|
) -> Union[ToDeviceResponse, ToDeviceError]:
|
|
"""Confirm a short auth string and mark it as matching.
|
|
|
|
Returns either a `ToDeviceResponse` if the request was successful or
|
|
a `ToDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
transaction_id (str): An transaction id of a valid key verification
|
|
process.
|
|
"""
|
|
message = self.confirm_key_verification(transaction_id)
|
|
return await self.to_device(message, tx_id)
|
|
|
|
@logged_in
|
|
async def to_device(
|
|
self,
|
|
message: ToDeviceMessage,
|
|
tx_id: Optional[str] = None,
|
|
) -> Union[ToDeviceResponse, ToDeviceError]:
|
|
"""Send a to-device message.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ToDeviceResponse` if the request was successful or
|
|
a `ToDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
message (ToDeviceMessage): The message that should be sent out.
|
|
tx_id (str, optional): The transaction ID for this message. Should
|
|
be unique.
|
|
"""
|
|
uuid = tx_id or uuid4()
|
|
|
|
method, path, data = Api.to_device(
|
|
self.access_token, message.type, message.as_dict(), uuid
|
|
)
|
|
|
|
return await self._send(
|
|
ToDeviceResponse, method, path, data, response_data=(message,)
|
|
)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def keys_upload(self) -> Union[KeysUploadResponse, KeysUploadError]:
|
|
"""Upload the E2E encryption keys.
|
|
|
|
This uploads the long lived session keys as well as the required amount
|
|
of one-time keys.
|
|
|
|
Automatically called by sync_forever().
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Raises LocalProtocolError if the client isn't logged in, if the session
|
|
store isn't loaded or if no encryption keys need to be uploaded.
|
|
"""
|
|
if not self.should_upload_keys:
|
|
raise LocalProtocolError("No key upload needed.")
|
|
|
|
assert self.olm
|
|
keys_dict = self.olm.share_keys()
|
|
|
|
method, path, data = Api.keys_upload(self.access_token, keys_dict)
|
|
|
|
return await self._send(KeysUploadResponse, method, path, data)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def keys_query(self) -> Union[KeysQueryResponse, KeysQueryError]:
|
|
"""Query the server for user keys.
|
|
|
|
This queries the server for device keys of users with which we share an
|
|
encrypted room.
|
|
|
|
Automatically called by sync_forever() and room_send().
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Raises LocalProtocolError if the client isn't logged in, if the session
|
|
store isn't loaded or if no key query needs to be performed.
|
|
"""
|
|
user_list = self.users_for_key_query
|
|
|
|
if not user_list:
|
|
raise LocalProtocolError("No key query required.")
|
|
|
|
# TODO pass the sync token here if it's a device update that triggered
|
|
# our need for a key query.
|
|
method, path, data = Api.keys_query(self.access_token, user_list)
|
|
|
|
return await self._send(KeysQueryResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def devices(self) -> Union[DevicesResponse, DevicesError]:
|
|
"""Get the list of devices for the current user.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `DevicesResponse` if the request was successful
|
|
or a `DevicesError` if there was an error with the request.
|
|
"""
|
|
method, path = Api.devices(self.access_token)
|
|
|
|
return await self._send(DevicesResponse, method, path)
|
|
|
|
@logged_in
|
|
async def update_device(
|
|
self, device_id: str, content: Dict[str, str]
|
|
) -> Union[UpdateDeviceResponse, UpdateDeviceError]:
|
|
"""Update the metadata of the given device.
|
|
|
|
Returns either a `UpdateDeviceResponse` if the request was successful or
|
|
a `UpdateDeviceError` if there was an error with the request.
|
|
|
|
Args:
|
|
device_id (str): The device for which the metadata will be updated.
|
|
content (Dict[str, str]): A dictionary of metadata values that will be
|
|
updated for the device.
|
|
|
|
Example:
|
|
>>> device_id = "QBUAZIFURK"
|
|
>>> content = {"display_name": "My new device"}
|
|
>>> await client.update_device(device_id, content)
|
|
|
|
"""
|
|
method, path, data = Api.update_device(self.access_token, device_id, content)
|
|
|
|
return await self._send(UpdateDeviceResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def delete_devices(
|
|
self, devices: List[str], auth: Optional[Dict[str, str]] = None
|
|
) -> Union[DeleteDevicesResponse, DeleteDevicesError]:
|
|
"""Delete a list of devices.
|
|
|
|
This tells the server to delete the given devices and invalidate their
|
|
associated access tokens.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `DeleteDevicesResponse` if the request was successful
|
|
or a `DeleteDevicesError` if there was an error with the request.
|
|
|
|
This endpoint supports user-interactive auth, calling this method
|
|
without an auth dictionary will return a `DeleteDevicesAuthResponse`
|
|
which can be used to introspect the valid authentication methods that
|
|
the server supports.
|
|
|
|
Args:
|
|
devices (List[str]): A list of devices which will be deleted.
|
|
auth (Dict): Additional authentication information for
|
|
the user-interactive authentication API.
|
|
|
|
Example:
|
|
>>> devices = ["QBUAZIFURK", "AUIECTSRND"]
|
|
>>> auth = {"type": "m.login.password",
|
|
... "user": "example",
|
|
... "password": "hunter1"}
|
|
>>> await client.delete_devices(devices, auth)
|
|
|
|
|
|
"""
|
|
method, path, data = Api.delete_devices(
|
|
self.access_token, devices, auth_dict=auth
|
|
)
|
|
|
|
return await self._send(DeleteDevicesResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def joined_members(
|
|
self, room_id: str
|
|
) -> Union[JoinedMembersResponse, JoinedMembersError]:
|
|
"""Get the list of joined members for a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `JoinedMembersResponse` if the request was successful
|
|
or a `JoinedMembersError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id(str): The room id of the room for which we wan't to request
|
|
the joined member list.
|
|
"""
|
|
method, path = Api.joined_members(self.access_token, room_id)
|
|
|
|
return await self._send(
|
|
JoinedMembersResponse, method, path, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def joined_rooms(
|
|
self,
|
|
) -> Union[JoinedRoomsResponse, JoinedRoomsError]:
|
|
"""Get the list of joined rooms.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `JoinedRoomsResponse` if the request was successful
|
|
or a `JoinedRoomsError` if there was an error with the request.
|
|
"""
|
|
method, path = Api.joined_rooms(self.access_token)
|
|
|
|
return await self._send(JoinedRoomsResponse, method, path)
|
|
|
|
@logged_in
|
|
async def room_send(
|
|
self,
|
|
room_id: str,
|
|
message_type: str,
|
|
content: Dict[Any, Any],
|
|
tx_id: Optional[str] = None,
|
|
ignore_unverified_devices: bool = False,
|
|
):
|
|
"""Send a message to a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
room_id(str): The room id of the room where the message should be
|
|
sent to.
|
|
message_type(str): A string identifying the type of the message.
|
|
content(Dict[Any, Any]): A dictionary containing the content of the
|
|
message.
|
|
tx_id(str, optional): The transaction ID of this event used to
|
|
uniquely identify this message.
|
|
ignore_unverified_devices(bool): If the room is encrypted and
|
|
contains unverified devices, the devices can be marked as
|
|
ignored here. Ignored devices will still receive encryption
|
|
keys for messages but they won't be marked as verified.
|
|
|
|
If the room where the message should be sent is encrypted the message
|
|
will be encrypted before sending.
|
|
|
|
This method also makes sure that the room members are fully synced and
|
|
that keys are queried before sending messages to an encrypted room.
|
|
|
|
If the method can't sync the state fully to send out an encrypted
|
|
message after a couple of retries it raises `SendRetryError`.
|
|
|
|
Raises `LocalProtocolError` if the client isn't logged in.
|
|
"""
|
|
uuid: Union[str, UUID] = tx_id or uuid4()
|
|
|
|
if self.olm:
|
|
try:
|
|
room = self.rooms[room_id]
|
|
except KeyError:
|
|
raise LocalProtocolError(f"No such room with id {room_id} found.")
|
|
|
|
if room.encrypted:
|
|
# Check if the members are synced, otherwise users might not get
|
|
# the megolm seession.
|
|
if not room.members_synced:
|
|
responses = []
|
|
responses.append(await self.joined_members(room_id))
|
|
|
|
if self.should_query_keys:
|
|
responses.append(await self.keys_query())
|
|
|
|
# Check if we need to share a group session, it might have been
|
|
# invalidated or expired.
|
|
if self.olm.should_share_group_session(room_id):
|
|
try:
|
|
event = self.sharing_session[room_id]
|
|
await event.wait()
|
|
except KeyError:
|
|
await self.share_group_session(
|
|
room_id,
|
|
ignore_unverified_devices=ignore_unverified_devices,
|
|
)
|
|
|
|
# Reactions as of yet don't support encryption.
|
|
# Relevant spec proposal https://github.com/matrix-org/matrix-doc/pull/1849
|
|
if message_type != "m.reaction":
|
|
# Encrypt our content and change the message type.
|
|
message_type, content = self.encrypt(room_id, message_type, content)
|
|
|
|
method, path, data = Api.room_send(
|
|
self.access_token, room_id, message_type, content, uuid
|
|
)
|
|
|
|
return await self._send(RoomSendResponse, method, path, data, (room_id,))
|
|
|
|
@logged_in
|
|
async def room_get_event(
|
|
self, room_id: str, event_id: str
|
|
) -> Union[RoomGetEventResponse, RoomGetEventError]:
|
|
"""Get a single event based on roomId/eventId.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomGetEventResponse` if the request was successful
|
|
or a `RoomGetEventError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room where the event is in.
|
|
event_id (str): The event id to get.
|
|
"""
|
|
method, path = Api.room_get_event(self.access_token, room_id, event_id)
|
|
|
|
return await self._send(RoomGetEventResponse, method, path)
|
|
|
|
@logged_in
|
|
async def room_put_state(
|
|
self,
|
|
room_id: str,
|
|
event_type: str,
|
|
content: Dict[Any, Any],
|
|
state_key: str = "",
|
|
) -> Union[RoomPutStateResponse, RoomPutStateError]:
|
|
"""Send a state event to a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomPutStateResponse` if the request was successful
|
|
or a `RoomPutStateError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room to send the event to.
|
|
event_type (str): The type of the state to send.
|
|
content (Dict[Any, Any]): The content of the event to be sent.
|
|
state_key (str): The key of the state event to send.
|
|
"""
|
|
|
|
method, path, data = Api.room_put_state(
|
|
self.access_token,
|
|
room_id,
|
|
event_type,
|
|
content,
|
|
state_key=state_key,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomPutStateResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
response_data=(room_id,),
|
|
)
|
|
|
|
@logged_in
|
|
async def room_get_state(
|
|
self,
|
|
room_id: str,
|
|
) -> Union[RoomGetStateResponse, RoomGetStateError]:
|
|
"""Fetch state for a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomGetStateResponse` if the request was successful
|
|
or a `RoomGetStateError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room to fetch state from.
|
|
"""
|
|
|
|
method, path = Api.room_get_state(
|
|
self.access_token,
|
|
room_id,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomGetStateResponse,
|
|
method,
|
|
path,
|
|
response_data=(room_id,),
|
|
)
|
|
|
|
@logged_in
|
|
async def room_get_state_event(
|
|
self, room_id: str, event_type: str, state_key: str = ""
|
|
) -> Union[RoomGetStateEventResponse, RoomGetStateEventError]:
|
|
"""Fetch a state event from a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomGetStateEventResponse` if the request was
|
|
successful or a `RoomGetStateEventError` if there was an error with
|
|
the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room to fetch the event from.
|
|
event_type (str): The type of the state to fetch.
|
|
state_key (str): The key of the state event to fetch.
|
|
"""
|
|
|
|
method, path = Api.room_get_state_event(
|
|
self.access_token, room_id, event_type, state_key=state_key
|
|
)
|
|
|
|
return await self._send(
|
|
RoomGetStateEventResponse,
|
|
method,
|
|
path,
|
|
response_data=(
|
|
event_type,
|
|
state_key,
|
|
room_id,
|
|
),
|
|
)
|
|
|
|
@logged_in
|
|
async def room_redact(
|
|
self,
|
|
room_id: str,
|
|
event_id: str,
|
|
reason: Optional[str] = None,
|
|
tx_id: Union[None, str, UUID] = None,
|
|
) -> Union[RoomRedactResponse, RoomRedactError]:
|
|
"""Strip information out of an event.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomRedactResponse` if the request was successful or
|
|
a `RoomRedactError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that contains the event that
|
|
will be redacted.
|
|
event_id (str): The ID of the event that will be redacted.
|
|
tx_id (str/UUID, optional): A transaction ID for this event.
|
|
reason(str, optional): A description explaining why the
|
|
event was redacted.
|
|
"""
|
|
method, path, data = Api.room_redact(
|
|
self.access_token,
|
|
room_id,
|
|
event_id,
|
|
tx_id=tx_id or uuid4(),
|
|
reason=reason,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomRedactResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
response_data=(room_id,),
|
|
)
|
|
|
|
async def room_resolve_alias(
|
|
self,
|
|
room_alias: str,
|
|
) -> Union[RoomResolveAliasResponse, RoomResolveAliasError]:
|
|
"""Resolve a room alias to a room ID.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomResolveAliasResponse` if the request was
|
|
successful or a `RoomResolveAliasError if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
room_alias (str): The alias to resolve
|
|
"""
|
|
method, path = Api.room_resolve_alias(room_alias)
|
|
|
|
return await self._send(
|
|
RoomResolveAliasResponse,
|
|
method,
|
|
path,
|
|
response_data=(room_alias,),
|
|
)
|
|
|
|
@logged_in
|
|
async def room_delete_alias(
|
|
self,
|
|
room_alias: str,
|
|
) -> Union[RoomDeleteAliasResponse, RoomDeleteAliasError]:
|
|
"""Delete a room alias.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomDeleteAliasResponse` if the request was
|
|
successful or a `RoomDeleteAliasError if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
room_alias (str): The alias to delete
|
|
"""
|
|
method, path = Api.room_delete_alias(
|
|
self.access_token,
|
|
room_alias,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomDeleteAliasResponse,
|
|
method,
|
|
path,
|
|
response_data=(room_alias,),
|
|
)
|
|
|
|
@logged_in
|
|
async def room_put_alias(
|
|
self,
|
|
room_alias: str,
|
|
room_id: str,
|
|
) -> Union[RoomPutAliasResponse, RoomPutAliasError]:
|
|
"""Add a room alias.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomPutAliasResponse` if the request was
|
|
successful or a `RoomPutAliasError if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
room_alias (str): The alias to add
|
|
room_id (str): The room ID to map to
|
|
"""
|
|
method, path, data = Api.room_put_alias(
|
|
self.access_token,
|
|
room_alias,
|
|
room_id,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomPutAliasResponse,
|
|
method,
|
|
path,
|
|
data=data,
|
|
response_data=(room_alias, room_id),
|
|
)
|
|
|
|
async def room_get_visibility(
|
|
self,
|
|
room_id: str,
|
|
) -> Union[RoomGetVisibilityResponse, RoomGetVisibilityError]:
|
|
"""Get visibility for a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomGetVisibilityResponse` if the request was
|
|
successful or a `RoomGetVisibilityError if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
room_id (str): The room ID to get visibility for
|
|
"""
|
|
method, path = Api.room_get_visibility(room_id)
|
|
|
|
return await self._send(
|
|
RoomGetVisibilityResponse,
|
|
method,
|
|
path,
|
|
response_data=(room_id,),
|
|
)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def keys_claim(
|
|
self, user_set: Dict[str, Iterable[str]]
|
|
) -> Union[KeysClaimResponse, KeysClaimError]:
|
|
"""Claim one-time keys for a set of user and device pairs.
|
|
|
|
Automatically called by sync_forever() and room_send().
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
user_set(Dict[str, Iterator[str]]): A dictionary maping from a user
|
|
id to a iterator of device ids. If a user set for a specific
|
|
room is required it can be obtained using the
|
|
`get_missing_sessions()` method.
|
|
|
|
Raises LocalProtocolError if the client isn't logged in, if the session
|
|
store isn't loaded, no room with the given room id exists or the room
|
|
isn't an encrypted room.
|
|
"""
|
|
method, path, data = Api.keys_claim(self.access_token, user_set)
|
|
|
|
return await self._send(KeysClaimResponse, method, path, data)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def share_group_session(
|
|
self,
|
|
room_id: str,
|
|
ignore_unverified_devices: bool = False,
|
|
) -> Union[ShareGroupSessionResponse, ShareGroupSessionError]:
|
|
"""Share a group session with a room.
|
|
|
|
This method sends a group session to members of a room.
|
|
|
|
Automatically called by room_send().
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Args:
|
|
room_id(str): The room id of the room where the message should be
|
|
sent to.
|
|
ignore_unverified_devices(bool): Mark unverified devices as
|
|
ignored. Ignored devices will still receive encryption
|
|
keys for messages but they won't be marked as verified.
|
|
|
|
|
|
Raises LocalProtocolError if the client isn't logged in, if the session
|
|
store isn't loaded, no room with the given room id exists, the room
|
|
isn't an encrypted room or a key sharing request is already in flight
|
|
for this room.
|
|
"""
|
|
assert self.olm
|
|
|
|
try:
|
|
room = self.rooms[room_id]
|
|
except KeyError:
|
|
raise LocalProtocolError(f"No such room with id {room_id}")
|
|
|
|
if not room.encrypted:
|
|
raise LocalProtocolError(f"Room with id {room_id} is not encrypted")
|
|
|
|
if room_id in self.sharing_session:
|
|
raise LocalProtocolError(f"Already sharing a group session for {room_id}")
|
|
|
|
self.sharing_session[room_id] = AsyncioEvent()
|
|
|
|
missing_sessions = self.get_missing_sessions(room_id)
|
|
|
|
if missing_sessions:
|
|
await self.keys_claim(missing_sessions)
|
|
|
|
shared_with = set()
|
|
|
|
try:
|
|
requests = []
|
|
|
|
for sharing_with, to_device_dict in self.olm.share_group_session_parallel(
|
|
room_id,
|
|
list(room.users.keys()),
|
|
ignore_unverified_devices=ignore_unverified_devices,
|
|
):
|
|
method, path, data = Api.to_device(
|
|
self.access_token, "m.room.encrypted", to_device_dict, uuid4()
|
|
)
|
|
|
|
requests.append(
|
|
self._send(
|
|
ShareGroupSessionResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
response_data=(room_id, sharing_with),
|
|
)
|
|
)
|
|
|
|
for response in await asyncio.gather(*requests, return_exceptions=True):
|
|
if isinstance(response, ShareGroupSessionResponse):
|
|
shared_with.update(response.users_shared_with)
|
|
|
|
# Mark the session as shared, usually the olm machine will do this
|
|
# for us, but if there was no-one to share the session with it we
|
|
# need to do it ourselves.
|
|
self.olm.outbound_group_sessions[room_id].shared = True
|
|
|
|
except ClientConnectionError:
|
|
raise
|
|
finally:
|
|
event = self.sharing_session.pop(room_id)
|
|
event.set()
|
|
|
|
return ShareGroupSessionResponse(room_id, shared_with)
|
|
|
|
@logged_in
|
|
@store_loaded
|
|
async def request_room_key(
|
|
self,
|
|
event: MegolmEvent,
|
|
tx_id: Optional[str] = None,
|
|
) -> Union[RoomKeyRequestResponse, RoomKeyRequestError]:
|
|
"""Request a missing room key.
|
|
|
|
This sends out a message to other devices requesting a room key from
|
|
them.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomKeyRequestResponse` if the request was successful
|
|
or a `RoomKeyRequestError` if there was an error with the request.
|
|
|
|
Raises a LocalProtocolError if the room key was already requested.
|
|
|
|
Args:
|
|
event (MegolmEvent): An undecrypted MegolmEvent for which we would
|
|
like to request the decryption key.
|
|
"""
|
|
uuid = tx_id or uuid4()
|
|
|
|
if event.session_id in self.outgoing_key_requests:
|
|
raise LocalProtocolError(
|
|
"A key sharing request is already sent" " out for this session id."
|
|
)
|
|
|
|
assert self.user_id
|
|
assert self.device_id
|
|
|
|
message = event.as_key_request(self.user_id, self.device_id)
|
|
|
|
method, path, data = Api.to_device(
|
|
self.access_token, message.type, message.as_dict(), uuid
|
|
)
|
|
|
|
return await self._send(
|
|
RoomKeyRequestResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
(
|
|
event.session_id,
|
|
event.session_id,
|
|
event.room_id,
|
|
event.algorithm,
|
|
),
|
|
)
|
|
|
|
async def close(self):
|
|
"""Close the underlying http session."""
|
|
if self.client_session:
|
|
await self.client_session.close()
|
|
self.client_session = None
|
|
|
|
@store_loaded
|
|
async def export_keys(self, outfile: str, passphrase: str, count: int = 10000):
|
|
"""Export all the Megolm decryption keys of this device.
|
|
|
|
The keys will be encrypted using the passphrase.
|
|
|
|
Note that this does not save other information such as the private
|
|
identity keys of the device.
|
|
|
|
Args:
|
|
outfile (str): The file to write the keys to.
|
|
passphrase (str): The encryption passphrase.
|
|
count (int): Optional. Round count for the underlying key
|
|
derivation. It is not recommended to specify it unless
|
|
absolutely sure of the consequences.
|
|
"""
|
|
assert self.store
|
|
assert self.olm
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
inbound_group_store = self.store.load_inbound_group_sessions()
|
|
export_keys = partial(
|
|
self.olm.export_keys_static,
|
|
inbound_group_store,
|
|
outfile,
|
|
passphrase,
|
|
count,
|
|
)
|
|
|
|
await loop.run_in_executor(None, export_keys)
|
|
|
|
@store_loaded
|
|
async def import_keys(self, infile: str, passphrase: str):
|
|
"""Import Megolm decryption keys.
|
|
|
|
The keys will be added to the current instance as well as written to
|
|
database.
|
|
|
|
Args:
|
|
infile (str): The file containing the keys.
|
|
passphrase (str): The decryption passphrase.
|
|
|
|
Raises `EncryptionError` if the file is invalid or couldn't be
|
|
decrypted.
|
|
|
|
Raises the usual file errors if the file couldn't be opened.
|
|
"""
|
|
assert self.store
|
|
assert self.olm
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
import_keys = partial(self.olm.import_keys_static, infile, passphrase)
|
|
sessions = await loop.run_in_executor(None, import_keys)
|
|
|
|
for session in sessions:
|
|
# This could be improved by writing everything to db at once at
|
|
# the end
|
|
if self.olm.inbound_group_store.add(session):
|
|
self.store.save_inbound_group_session(session)
|
|
|
|
@logged_in
|
|
async def room_create(
|
|
self,
|
|
visibility: RoomVisibility = RoomVisibility.private,
|
|
alias: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
topic: Optional[str] = None,
|
|
room_version: Optional[str] = None,
|
|
federate: bool = True,
|
|
is_direct: bool = False,
|
|
preset: Optional[RoomPreset] = None,
|
|
invite: Sequence[str] = (),
|
|
initial_state: Sequence[Dict[str, Any]] = (),
|
|
power_level_override: Optional[Dict[str, Any]] = None,
|
|
predecessor: Optional[Dict[str, Any]] = None,
|
|
space: bool = False,
|
|
) -> Union[RoomCreateResponse, RoomCreateError]:
|
|
"""Create a new room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomCreateResponse` if the request was successful or
|
|
a `RoomCreateError` if there was an error with the request.
|
|
|
|
Args:
|
|
visibility (RoomVisibility): whether to have the room published in
|
|
the server's room directory or not.
|
|
Defaults to ``RoomVisibility.private``.
|
|
|
|
alias (str, optional): The desired canonical alias local part.
|
|
For example, if set to "foo" and the room is created on the
|
|
"example.com" server, the room alias will be
|
|
"#foo:example.com".
|
|
|
|
name (str, optional): A name to set for the room.
|
|
|
|
topic (str, optional): A topic to set for the room.
|
|
|
|
room_version (str, optional): The room version to set.
|
|
If not specified, the homeserver will use its default setting.
|
|
If a version not supported by the homeserver is specified,
|
|
a 400 ``M_UNSUPPORTED_ROOM_VERSION`` error will be returned.
|
|
|
|
federate (bool): Whether to allow users from other homeservers from
|
|
joining the room. Defaults to ``True``.
|
|
Cannot be changed later.
|
|
|
|
is_direct (bool): If this should be considered a
|
|
direct messaging room.
|
|
If ``True``, the server will set the ``is_direct`` flag on
|
|
``m.room.member events`` sent to the users in ``invite``.
|
|
Defaults to ``False``.
|
|
|
|
preset (RoomPreset, optional): The selected preset will set various
|
|
rules for the room.
|
|
If unspecified, the server will choose a preset from the
|
|
``visibility``: ``RoomVisibility.public`` equates to
|
|
``RoomPreset.public_chat``, and
|
|
``RoomVisibility.private`` equates to a
|
|
``RoomPreset.private_chat``.
|
|
|
|
invite (list): A list of user id to invite to the room.
|
|
|
|
initial_state (list): A list of state event dicts to send when
|
|
the room is created.
|
|
For example, a room could be made encrypted immediatly by
|
|
having a ``m.room.encryption`` event dict.
|
|
|
|
power_level_override (dict): A ``m.room.power_levels content`` dict
|
|
to override the default.
|
|
The dict will be applied on top of the generated
|
|
``m.room.power_levels`` event before it is sent to the room.
|
|
|
|
predecessor (dict): A reference to the room this room replaces, if the previous room was upgraded.
|
|
Containing the event ID of the last known event in the old room.
|
|
And the ID of the old room.
|
|
``event_id``: ``$something:example.org``,
|
|
``room_id``: ``!oldroom:example.org``
|
|
|
|
space (bool): Create as a Space (defaults to False).
|
|
"""
|
|
|
|
method, path, data = Api.room_create(
|
|
self.access_token,
|
|
visibility=visibility,
|
|
alias=alias,
|
|
name=name,
|
|
topic=topic,
|
|
room_version=room_version,
|
|
federate=federate,
|
|
is_direct=is_direct,
|
|
preset=preset,
|
|
invite=invite,
|
|
initial_state=initial_state,
|
|
power_level_override=power_level_override,
|
|
predecessor=predecessor,
|
|
space=space,
|
|
)
|
|
|
|
return await self._send(RoomCreateResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def join(self, room_id: str) -> Union[JoinResponse, JoinError]:
|
|
"""Join a room.
|
|
|
|
This tells the server to join the given room.
|
|
If the room is not public, the user must be invited.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `JoinResponse` if the request was successful or
|
|
a `JoinError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id: The room id or alias of the room to join.
|
|
"""
|
|
method, path, data = Api.join(self.access_token, room_id)
|
|
return await self._send(JoinResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_invite(
|
|
self,
|
|
room_id: str,
|
|
user_id: str,
|
|
) -> Union[RoomInviteResponse, RoomInviteError]:
|
|
"""Invite a user to a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomInviteResponse` if the request was successful or
|
|
a `RoomInviteError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that the user will be
|
|
invited to.
|
|
user_id (str): The user id of the user that should be invited.
|
|
"""
|
|
method, path, data = Api.room_invite(
|
|
self.access_token,
|
|
room_id,
|
|
user_id,
|
|
)
|
|
return await self._send(RoomInviteResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_leave(
|
|
self, room_id: str
|
|
) -> Union[RoomLeaveResponse, RoomLeaveError]:
|
|
"""Leave a room or reject an invite.
|
|
|
|
This tells the server to leave the given room.
|
|
If the user was only invited, the invite is rejected.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomLeaveResponse` if the request was successful or
|
|
a `RoomLeaveError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id: The room id of the room to leave.
|
|
"""
|
|
method, path, data = Api.room_leave(self.access_token, room_id)
|
|
return await self._send(RoomLeaveResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_forget(
|
|
self, room_id: str
|
|
) -> Union[RoomForgetResponse, RoomForgetError]:
|
|
"""Forget a room.
|
|
|
|
This tells the server to forget the given room's history for our user.
|
|
If all users on a homeserver forget the room, the room will be
|
|
eligible for deletion from that homeserver.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomForgetResponse` if the request was successful or
|
|
a `RoomForgetError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room to forget.
|
|
"""
|
|
method, path, data = Api.room_forget(self.access_token, room_id)
|
|
return await self._send(
|
|
RoomForgetResponse, method, path, data, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def room_kick(
|
|
self,
|
|
room_id: str,
|
|
user_id: str,
|
|
reason: Optional[str] = None,
|
|
) -> Union[RoomKickResponse, RoomKickError]:
|
|
"""Kick a user from a room, or withdraw their invitation.
|
|
|
|
Kicking a user adjusts their membership to "leave" with an optional
|
|
reason.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomKickResponse` if the request was successful or
|
|
a `RoomKickError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that the user will be
|
|
kicked from.
|
|
user_id (str): The user_id of the user that should be kicked.
|
|
reason (str, optional): A reason for which the user is kicked.
|
|
"""
|
|
|
|
method, path, data = Api.room_kick(
|
|
self.access_token,
|
|
room_id,
|
|
user_id,
|
|
reason,
|
|
)
|
|
return await self._send(RoomKickResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_ban(
|
|
self,
|
|
room_id: str,
|
|
user_id: str,
|
|
reason: Optional[str] = None,
|
|
) -> Union[RoomBanResponse, RoomBanError]:
|
|
"""Ban a user from a room.
|
|
|
|
When a user is banned from a room, they may not join it or be
|
|
invited to it until they are unbanned.
|
|
If they are currently in the room, they will be kicked or have their
|
|
invitation withdrawn first.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomBanResponse` if the request was successful or
|
|
a `RoomBanError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that the user will be
|
|
banned from.
|
|
user_id (str): The user_id of the user that should be banned.
|
|
reason (str, optional): A reason for which the user is banned.
|
|
"""
|
|
|
|
method, path, data = Api.room_ban(
|
|
self.access_token,
|
|
room_id,
|
|
user_id,
|
|
reason,
|
|
)
|
|
return await self._send(RoomBanResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_unban(
|
|
self,
|
|
room_id: str,
|
|
user_id: str,
|
|
) -> Union[RoomBanResponse, RoomBanError]:
|
|
"""Unban a user from a room.
|
|
|
|
This allows them to be invited and join the room again.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomUnbanResponse` if the request was successful or
|
|
a `RoomUnbanError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that the user will be
|
|
unbanned from.
|
|
user_id (str): The user_id of the user that should be unbanned.
|
|
"""
|
|
|
|
method, path, data = Api.room_unban(
|
|
self.access_token,
|
|
room_id,
|
|
user_id,
|
|
)
|
|
return await self._send(RoomUnbanResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_context(
|
|
self,
|
|
room_id: str,
|
|
event_id: str,
|
|
limit: Optional[int] = None,
|
|
) -> Union[RoomContextResponse, RoomContextError]:
|
|
"""Fetch a number of events that happened before and after an event.
|
|
|
|
This allows clients to get the context surrounding an event.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomContextResponse` if the request was successful or
|
|
a `RoomContextError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room that contains the event and
|
|
its context.
|
|
event_id (str): The event_id of the event that we wish to get the
|
|
context for.
|
|
limit(int, optional): The maximum number of events to request.
|
|
"""
|
|
|
|
method, path = Api.room_context(self.access_token, room_id, event_id, limit)
|
|
|
|
return await self._send(
|
|
RoomContextResponse, method, path, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def room_messages(
|
|
self,
|
|
room_id: str,
|
|
start: str,
|
|
end: Optional[str] = None,
|
|
direction: MessageDirection = MessageDirection.back,
|
|
limit: int = 10,
|
|
message_filter: Optional[Dict[Any, Any]] = None,
|
|
) -> Union[RoomMessagesResponse, RoomMessagesError]:
|
|
"""Fetch a list of message and state events for a room.
|
|
|
|
It uses pagination query parameters to paginate history in the room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomMessagesResponse` if the request was successful or
|
|
a `RoomMessagesResponse` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room for which we would like to
|
|
fetch the messages.
|
|
start (str): The token to start returning events from. This token
|
|
can be obtained from a prev_batch token returned for each room
|
|
by the sync API, or from a start or end token returned by a
|
|
previous request to this endpoint.
|
|
end (str, optional): The token to stop returning events at. This
|
|
token can be obtained from a prev_batch token returned for
|
|
each room by the sync endpoint, or from a start or end token
|
|
returned by a previous request to this endpoint.
|
|
direction (MessageDirection, optional): The direction to return
|
|
events from. Defaults to MessageDirection.back.
|
|
limit (int, optional): The maximum number of events to return.
|
|
Defaults to 10.
|
|
message_filter (Optional[Dict[Any, Any]]):
|
|
A filter dict that should be used for this room messages
|
|
request.
|
|
|
|
Example:
|
|
>>> response = await client.room_messages(room_id, previous_batch)
|
|
>>> next_response = await client.room_messages(room_id,
|
|
... response.end)
|
|
|
|
|
|
"""
|
|
method, path = Api.room_messages(
|
|
self.access_token,
|
|
room_id,
|
|
start,
|
|
end=end,
|
|
direction=direction,
|
|
limit=limit,
|
|
message_filter=message_filter,
|
|
)
|
|
|
|
return await self._send(
|
|
RoomMessagesResponse, method, path, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def room_typing(
|
|
self,
|
|
room_id: str,
|
|
typing_state: bool = True,
|
|
timeout: int = 30000,
|
|
) -> Union[RoomTypingResponse, RoomTypingError]:
|
|
"""Send a typing notice to the server.
|
|
|
|
This tells the server that the user is typing for the next N
|
|
milliseconds or that the user has stopped typing.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomTypingResponse` if the request was successful or
|
|
a `RoomTypingError` if there was an error with the request.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room where the user is typing.
|
|
typing_state (bool): A flag representing whether the user started
|
|
or stopped typing.
|
|
timeout (int): For how long should the new typing notice be
|
|
valid for in milliseconds.
|
|
"""
|
|
method, path, data = Api.room_typing(
|
|
self.access_token, room_id, self.user_id, typing_state, timeout
|
|
)
|
|
|
|
return await self._send(
|
|
RoomTypingResponse, method, path, data, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def update_receipt_marker(
|
|
self,
|
|
room_id: str,
|
|
event_id: str,
|
|
receipt_type: str = "m.read",
|
|
) -> None:
|
|
"""Update the marker of given the `receipt_type` to specified `event_id`.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `UpdateReceiptMarkerResponse` if the request was
|
|
successful or a `UpdateReceiptMarkerError` if there was an error with
|
|
the request.
|
|
|
|
Args:
|
|
room_id (str): Room id of the room where the marker should
|
|
be updated
|
|
event_id (str): The event ID the read marker should be located at
|
|
receipt_type (str): The type of receipt to send. Currently, only
|
|
`m.read` is supported by the Matrix specification.
|
|
"""
|
|
method, path = Api.update_receipt_marker(
|
|
self.access_token,
|
|
room_id,
|
|
event_id,
|
|
receipt_type,
|
|
)
|
|
|
|
return await self._send(
|
|
UpdateReceiptMarkerResponse,
|
|
method,
|
|
path,
|
|
"{}",
|
|
)
|
|
|
|
@logged_in
|
|
async def room_read_markers(
|
|
self, room_id: str, fully_read_event: str, read_event: Optional[str] = None
|
|
):
|
|
"""Update the fully read marker (and optionally the read receipt) for
|
|
a room.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `RoomReadMarkersResponse` if the request was
|
|
successful or a `RoomReadMarkersError` if there was an error with
|
|
the request.
|
|
|
|
This sets the position of the read markers.
|
|
|
|
- `fully_read_event` is the latest event in the set of events that the
|
|
user has either fully read or indicated they aren't interested in. It
|
|
permits the implementation of a "jump to first unread message" kind
|
|
of feature. It is _private_ (not exposed to other room participants).
|
|
|
|
- `read_event` is the most recent message the user has read and is also
|
|
known as a _read receipt_. A read receipt being set on an event does
|
|
not imply that all previous events have been seen. This happens in
|
|
cases such as when a user comes back to a room after hundreds of
|
|
messages have been sent and _only_ reads the most recent message. The
|
|
read receipt is _public_ (exposed to other room participants).
|
|
|
|
If you want to set the read receipt, you _must_ set `read_event`.
|
|
|
|
Args:
|
|
room_id (str): The room ID of the room where the read markers should
|
|
be updated.
|
|
fully_read_event (str): The event ID that the user has fully read up
|
|
to.
|
|
read_event (Optional[str]): The event ID to set the read receipt
|
|
location at.
|
|
"""
|
|
method, path, data = Api.room_read_markers(
|
|
self.access_token, room_id, fully_read_event, read_event
|
|
)
|
|
|
|
return await self._send(
|
|
RoomReadMarkersResponse, method, path, data, response_data=(room_id,)
|
|
)
|
|
|
|
@logged_in
|
|
async def content_repository_config(
|
|
self,
|
|
) -> Union[ContentRepositoryConfigResponse, ContentRepositoryConfigError]:
|
|
"""Get the content repository configuration, such as upload limits.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ContentRepositoryConfigResponse` if the request
|
|
was successful or a `ContentRepositoryConfigError` if there was an
|
|
error with the request.
|
|
"""
|
|
method, path = Api.content_repository_config(self.access_token)
|
|
|
|
return await self._send(ContentRepositoryConfigResponse, method, path)
|
|
|
|
@staticmethod
|
|
async def _process_data_chunk(chunk, monitor=None):
|
|
if monitor and monitor.cancel:
|
|
raise TransferCancelledError()
|
|
|
|
while monitor and monitor.pause:
|
|
await asyncio.sleep(0.1)
|
|
|
|
return chunk
|
|
|
|
async def _plain_data_generator(self, data, monitor=None):
|
|
"""Yield chunks of bytes from data.
|
|
|
|
If a monitor is passed, update its ``transferred`` property and
|
|
suspend yielding chunks while its ``pause`` attribute is ``True``.
|
|
|
|
Raise ``TransferCancelledError`` if ``monitor.cancel`` is ``True``.
|
|
"""
|
|
|
|
async for value in async_generator_from_data(data):
|
|
yield await self._process_data_chunk(value, monitor)
|
|
|
|
async def _encrypted_data_generator(
|
|
self,
|
|
data,
|
|
decryption_dict,
|
|
monitor=None,
|
|
):
|
|
"""Yield encrypted chunks of bytes from data.
|
|
|
|
If a monitor is passed, update its ``transferred`` property and
|
|
suspend yielding chunks while its ``pause`` attribute is ``True``.
|
|
|
|
The last yielded value will be the decryption dict.
|
|
|
|
Raise ``TransferCancelledError`` if ``monitor.cancel`` is ``True``.
|
|
"""
|
|
|
|
async for value in async_encrypt_attachment(data):
|
|
if isinstance(value, dict): # last yielded value
|
|
decryption_dict.update(value)
|
|
else:
|
|
yield await self._process_data_chunk(value, monitor)
|
|
|
|
@logged_in
|
|
async def upload(
|
|
self,
|
|
data_provider: DataProvider,
|
|
content_type: str = "application/octet-stream",
|
|
filename: Optional[str] = None,
|
|
encrypt: bool = False,
|
|
monitor: Optional[TransferMonitor] = None,
|
|
filesize: Optional[int] = None,
|
|
) -> Tuple[Union[UploadResponse, UploadError], Optional[Dict[str, Any]]]:
|
|
"""Upload a file to the content repository.
|
|
|
|
This method ignores `AsyncClient.config.request_timeout` and uses `0`.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns a tuple containing:
|
|
|
|
- Either a `UploadResponse` if the request was successful, or a
|
|
`UploadError` if there was an error with the request
|
|
|
|
- A dict with file decryption info if encrypt is ``True``,
|
|
else ``None``.
|
|
|
|
Raises a ``TransferCancelledError`` if a monitor is passed and its
|
|
``cancelled`` property becomes set to ``True``.
|
|
|
|
Args:
|
|
data_provider (Callable, SynchronousFile, AsyncFile): A function
|
|
returning the data to upload or a file object. File objects
|
|
must be opened in binary mode (``mode="r+b"``). Callables
|
|
returning a path string, Path, async iterable or aiofiles
|
|
open binary file object allow the file data to be read in an
|
|
asynchronous and lazy way (without reading the entire file
|
|
into memory). Returning a synchronous iterable or standard
|
|
open binary file object will still allow the data to be read
|
|
lazily, but not asynchronously.
|
|
|
|
The function will be called again if the upload fails
|
|
due to a server timeout, in which case it must restart
|
|
from the beginning.
|
|
Callables receive two arguments: the total number of
|
|
429 "Too many request" errors that occured, and the total
|
|
number of server timeout exceptions that occured, thus
|
|
cleanup operations can be performed for retries if necessary.
|
|
|
|
content_type (str): The content MIME type of the file,
|
|
e.g. "image/png".
|
|
Defaults to "application/octet-stream", corresponding to a
|
|
generic binary file.
|
|
Custom values are ignored if encrypt is ``True``.
|
|
|
|
filename (str, optional): The file's original name.
|
|
|
|
encrypt (bool): If the file's content should be encrypted,
|
|
necessary for files that will be sent to encrypted rooms.
|
|
Defaults to ``False``.
|
|
|
|
monitor (TransferMonitor, optional): If a ``TransferMonitor``
|
|
object is passed, it will be updated by this function while
|
|
uploading.
|
|
From this object, statistics such as currently
|
|
transferred bytes or estimated remaining time can be gathered
|
|
while the upload is running as a task; it also allows
|
|
for pausing and cancelling.
|
|
|
|
filesize (int, optional): Size in bytes for the file to transfer.
|
|
If left as ``None``, some servers might refuse the upload.
|
|
|
|
It's common to use this alongside :py:meth:`room_send`. An example of
|
|
uploading a plain text file follows, but the principle is the same for
|
|
media, you just need to add an additional "info" key to the content.
|
|
See `the Matrix client-server spec <https://matrix.org/docs/spec/client_server/r0.6.0#m-room-message-msgtypes>`_
|
|
for more details.
|
|
|
|
Example:
|
|
>>> file_stat = await aiofiles.os.stat("sample.py")
|
|
>>> async with aiofiles.open("sample.py", "r+b") as f:
|
|
>>> resp, maybe_keys = await client.upload(
|
|
... f,
|
|
... content_type="text/plain",
|
|
... filename="hello.py",
|
|
... filesize=file_stat.st_size()
|
|
... )
|
|
|
|
>>> await client.room_send(
|
|
... room_id="!myfaveroom:example.org",
|
|
... message_type="m.room.message",
|
|
... content = {
|
|
... "msgtype": "m.file",
|
|
... "url": resp.content_uri,
|
|
... "body": "descriptive title (like the filename)"
|
|
... }
|
|
... )
|
|
"""
|
|
|
|
http_method, path, _ = Api.upload(self.access_token, filename)
|
|
|
|
decryption_dict: Dict[str, Any] = {}
|
|
|
|
initial_file_pos = 0
|
|
|
|
async def provider(got_429, got_timeouts):
|
|
nonlocal initial_file_pos
|
|
if monitor and (got_429 or got_timeouts):
|
|
# We have to restart from scratch
|
|
monitor.transferred = 0
|
|
|
|
if isinstance(data_provider, Callable):
|
|
data = data_provider(got_429, got_timeouts)
|
|
|
|
elif isinstance(data_provider, SynchronousFile):
|
|
if got_429 or got_timeouts:
|
|
data_provider.seek(initial_file_pos)
|
|
else:
|
|
initial_file_pos = data_provider.tell()
|
|
|
|
data = data_provider
|
|
|
|
elif isinstance(data_provider, AsyncFile):
|
|
if got_429 or got_timeouts:
|
|
await data_provider.seek(initial_file_pos)
|
|
else:
|
|
initial_file_pos = await data_provider.tell()
|
|
|
|
data = data_provider
|
|
|
|
else:
|
|
raise TypeError(
|
|
f"data_provider type {type(data_provider)} "
|
|
"is not of a usable type "
|
|
f"(Callable, {SynchronousFile}, {AsyncFile})"
|
|
)
|
|
|
|
if encrypt:
|
|
return self._encrypted_data_generator(
|
|
data,
|
|
decryption_dict,
|
|
monitor,
|
|
)
|
|
|
|
return self._plain_data_generator(data, monitor)
|
|
|
|
response = await self._send(
|
|
UploadResponse,
|
|
http_method,
|
|
path,
|
|
data_provider=provider,
|
|
content_type="application/octet-stream" if encrypt else content_type,
|
|
trace_context=monitor,
|
|
timeout=0,
|
|
content_length=filesize,
|
|
)
|
|
|
|
# After the upload finished and we get the response above, if encrypt
|
|
# is True, decryption_dict will have been updated from inside the
|
|
# self._encrypted_data_generator().
|
|
return (response, decryption_dict if encrypt else None)
|
|
|
|
@client_session
|
|
async def download(
|
|
self,
|
|
mxc: Optional[str] = None,
|
|
filename: Optional[str] = None,
|
|
allow_remote: bool = True,
|
|
server_name: Optional[str] = None,
|
|
media_id: Optional[str] = None,
|
|
) -> Union[DownloadResponse, DownloadError]:
|
|
"""Get the content of a file from the content repository.
|
|
|
|
This method ignores `AsyncClient.config.request_timeout` and uses `0`.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `DownloadResponse` if the request was successful or
|
|
a `DownloadError` if there was an error with the request.
|
|
|
|
The parameters `server_name` and `media_id` are deprecated and will be removed in a future release.
|
|
Use `mxc` instead.
|
|
|
|
Args:
|
|
mxc (str, optional): The mxc:// URI.
|
|
filename (str, optional): A filename to be returned in the response
|
|
by the server. If None (default), the original name of the
|
|
file will be returned instead, if there is one.
|
|
allow_remote (bool): Indicates to the server that it should not
|
|
attempt to fetch the media if it is deemed remote.
|
|
This is to prevent routing loops where the server contacts
|
|
itself.
|
|
server_name (str, optional): [deprecated] The server name from the mxc:// URI.
|
|
media_id (str, optional): [deprecated] The media ID from the mxc:// URI.
|
|
"""
|
|
# TODO: support TransferMonitor
|
|
|
|
if mxc is None:
|
|
if server_name is None or media_id is None:
|
|
# Too few parameters are passed.
|
|
raise TypeError(
|
|
"Either `mxc` or both the `server_name` and `media_id` are required"
|
|
)
|
|
if server_name is not None or media_id is not None:
|
|
# Deprecated parameters are passed.
|
|
warnings.warn(
|
|
"The parameters `server_name` and `media_id` are deprecated "
|
|
"and will be removed in a future release. Use `mxc` instead",
|
|
DeprecationWarning,
|
|
)
|
|
else:
|
|
if server_name is not None or media_id is not None:
|
|
# Potentially clashing parameters are passed.
|
|
raise TypeError(
|
|
"The parameters `server_name` and `media_id` are deprecated "
|
|
"and will be removed in a future release. Use `mxc` instead"
|
|
)
|
|
else:
|
|
# `mxc` is passed; expected behavior
|
|
url = urlparse(mxc)
|
|
server_name = url.netloc
|
|
media_id = url.path.replace("/", "")
|
|
|
|
http_method, path = Api.download(
|
|
server_name,
|
|
media_id,
|
|
filename,
|
|
allow_remote,
|
|
)
|
|
|
|
return await self._send(
|
|
DownloadResponse,
|
|
http_method,
|
|
path,
|
|
timeout=0,
|
|
)
|
|
|
|
@client_session
|
|
async def thumbnail(
|
|
self,
|
|
server_name: str,
|
|
media_id: str,
|
|
width: int,
|
|
height: int,
|
|
method: ResizingMethod = ResizingMethod.scale,
|
|
allow_remote: bool = True,
|
|
) -> Union[ThumbnailResponse, ThumbnailError]:
|
|
"""Get the thumbnail of a file from the content repository.
|
|
|
|
The actual thumbnail may be larger than the size specified.
|
|
This method ignores `AsyncClient.config.request_timeout` and uses `0`.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ThumbnailResponse` if the request was successful or
|
|
a `ThumbnailError` if there was an error with the request.
|
|
|
|
Args:
|
|
server_name (str): The server name from the mxc:// URI.
|
|
media_id (str): The media ID from the mxc:// URI.
|
|
width (int): The desired width of the thumbnail.
|
|
height (int): The desired height of the thumbnail.
|
|
method (ResizingMethod): The desired resizing method.
|
|
allow_remote (bool): Indicates to the server that it should not
|
|
attempt to fetch the media if it is deemed remote.
|
|
This is to prevent routing loops where the server contacts
|
|
itself.
|
|
"""
|
|
http_method, path = Api.thumbnail(
|
|
server_name, media_id, width, height, method, allow_remote
|
|
)
|
|
|
|
return await self._send(
|
|
ThumbnailResponse,
|
|
http_method,
|
|
path,
|
|
timeout=0,
|
|
)
|
|
|
|
@client_session
|
|
async def get_profile(
|
|
self, user_id: Optional[str] = None
|
|
) -> Union[ProfileGetResponse, ProfileGetError]:
|
|
"""Get a user's combined profile information.
|
|
|
|
This queries the display name and avatar matrix content URI of a user
|
|
from the server. Additional profile information may be present.
|
|
The currently logged in user is queried if no user is specified.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ProfileGetResponse` if the request was
|
|
successful or a `ProfileGetError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
user_id (str): User id of the user to get the profile for.
|
|
"""
|
|
method, path = Api.profile_get(
|
|
user_id or self.user_id, access_token=self.access_token or None
|
|
)
|
|
|
|
return await self._send(
|
|
ProfileGetResponse,
|
|
method,
|
|
path,
|
|
)
|
|
|
|
@client_session
|
|
async def get_presence(
|
|
self, user_id: str
|
|
) -> Union[PresenceGetResponse, PresenceGetError]:
|
|
"""Get a user's presence state.
|
|
|
|
This queries the presence state of a user from the server.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `PresenceGetResponse` if the request was
|
|
successful or a `PresenceGetError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
user_id (str): User id of the user to get the presence state for.
|
|
"""
|
|
|
|
method, path = Api.get_presence(self.access_token, user_id)
|
|
|
|
return await self._send(
|
|
PresenceGetResponse, method, path, response_data=(user_id,)
|
|
)
|
|
|
|
@client_session
|
|
async def set_presence(
|
|
self, presence: str, status_msg: str = None
|
|
) -> Union[PresenceSetResponse, PresenceSetError]:
|
|
"""Set our user's presence state.
|
|
|
|
This tells the server to set presence state of the currently logged
|
|
in user to the supplied string.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `PresenceSetResponse` if the request was
|
|
successful or a `PresenceSetError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
presence (str): The new presence state. One of: ["online", "offline", "unavailable"]
|
|
status_msg (str, optional): The status message to attach to this state.
|
|
"""
|
|
|
|
method, path, data = Api.set_presence(
|
|
self.access_token, self.user_id, presence, status_msg
|
|
)
|
|
|
|
resp = await self._send(PresenceSetResponse, method, path, data)
|
|
if isinstance(resp, PresenceSetResponse):
|
|
self._presence = presence
|
|
|
|
return resp
|
|
|
|
@client_session
|
|
async def get_displayname(
|
|
self, user_id: Optional[str] = None
|
|
) -> _ProfileGetDisplayNameT:
|
|
"""Get a user's display name.
|
|
|
|
This queries the display name of a user from the server.
|
|
The currently logged in user is queried if no user is specified.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ProfileGetDisplayNameResponse` if the request was
|
|
successful or a `ProfileGetDisplayNameError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
user_id (str): User id of the user to get the display name for.
|
|
"""
|
|
method, path = Api.profile_get_displayname(
|
|
user_id or self.user_id, access_token=self.access_token or None
|
|
)
|
|
|
|
return await self._send(
|
|
ProfileGetDisplayNameResponse,
|
|
method,
|
|
path,
|
|
)
|
|
|
|
@logged_in
|
|
async def set_displayname(self, displayname: str) -> _ProfileSetDisplayNameT:
|
|
"""Set user's display name.
|
|
|
|
This tells the server to set display name of the currently logged
|
|
in user to the supplied string.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ProfileSetDisplayNameResponse` if the request was
|
|
successful or a `ProfileSetDisplayNameError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
displayname (str): Display name to set.
|
|
"""
|
|
method, path, data = Api.profile_set_displayname(
|
|
self.access_token, self.user_id, displayname
|
|
)
|
|
|
|
return await self._send(
|
|
ProfileSetDisplayNameResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
)
|
|
|
|
@client_session
|
|
async def get_avatar(
|
|
self, user_id: Optional[str] = None
|
|
) -> Union[ProfileGetAvatarResponse, ProfileGetAvatarError]:
|
|
"""Get a user's avatar URL.
|
|
|
|
This queries the avatar matrix content URI of a user from the server.
|
|
The currently logged in user is queried if no user is specified.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ProfileGetAvatarResponse` if the request was
|
|
successful or a `ProfileGetAvatarError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
user_id (str): User id of the user to get the avatar for.
|
|
"""
|
|
method, path = Api.profile_get_avatar(
|
|
user_id or self.user_id, access_token=self.access_token or None
|
|
)
|
|
|
|
return await self._send(
|
|
ProfileGetAvatarResponse,
|
|
method,
|
|
path,
|
|
)
|
|
|
|
@logged_in
|
|
async def set_avatar(
|
|
self, avatar_url: str
|
|
) -> Union[ProfileSetAvatarResponse, ProfileSetAvatarError]:
|
|
"""Set the user's avatar URL.
|
|
|
|
This tells the server to set the avatar of the currently logged
|
|
in user to supplied matrix content URI.
|
|
|
|
Calls receive_response() to update the client state if necessary.
|
|
|
|
Returns either a `ProfileSetAvatarResponse` if the request was
|
|
successful or a `ProfileSetAvatarError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
avatar_url (str): matrix content URI of the avatar to set.
|
|
"""
|
|
method, path, data = Api.profile_set_avatar(
|
|
self.access_token, self.user_id, avatar_url
|
|
)
|
|
|
|
return await self._send(
|
|
ProfileSetAvatarResponse,
|
|
method,
|
|
path,
|
|
data,
|
|
)
|
|
|
|
@logged_in
|
|
async def get_openid_token(
|
|
self, user_id: str
|
|
) -> Union[GetOpenIDTokenResponse, GetOpenIDTokenError]:
|
|
"""Gets an OpenID token object that the requester may supply to another service
|
|
to verify their identity in matrix.
|
|
|
|
Returns either a `GetOpenIDTokenResponse` if the request was
|
|
successful or a `GetOpenIDTokenError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
user_id (str): The user who requested the OpenID token
|
|
"""
|
|
|
|
method, path, data = Api.get_openid_token(self.access_token, user_id)
|
|
|
|
return await self._send(GetOpenIDTokenResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def upload_filter(
|
|
self,
|
|
user_id: Optional[str] = None,
|
|
event_fields: Optional[List[str]] = None,
|
|
event_format: EventFormat = EventFormat.client,
|
|
presence: Optional[Dict[str, Any]] = None,
|
|
account_data: Optional[Dict[str, Any]] = None,
|
|
room: Optional[Dict[str, Any]] = None,
|
|
) -> Union[UploadFilterResponse, UploadFilterError]:
|
|
"""Upload a new filter definition to the homeserver.
|
|
|
|
Returns either a `UploadFilterResponse` if the request was
|
|
successful or a `UploadFilterError` if there was an error
|
|
with the request.
|
|
|
|
The filter ID from the successful responses can be used for
|
|
the ``AsyncClient.sync()``, ``AsyncClient.sync_forever()`` and
|
|
``AsyncClient.room_messages()`` methods.
|
|
|
|
Args:
|
|
user_id (Optional[str]): ID of the user uploading the filter.
|
|
If not provider, the current logged in user's ID is used.
|
|
|
|
event_fields (Optional[List[str]]): List of event fields to
|
|
include. If this list is absent then all fields are included.
|
|
The entries may include '.' characters to indicate sub-fields.
|
|
A literal '.' character in a field name may be escaped
|
|
using a '\'.
|
|
|
|
event_format (EventFormat): The format to use for events.
|
|
|
|
presence (Dict[str, Any]): The presence updates to include.
|
|
The dict corresponds to the `EventFilter` type described
|
|
in https://matrix.org/docs/spec/client_server/latest#id240
|
|
|
|
account_data (Dict[str, Any]): The user account data that isn't
|
|
associated with rooms to include.
|
|
The dict corresponds to the `EventFilter` type described
|
|
in https://matrix.org/docs/spec/client_server/latest#id240
|
|
|
|
room (Dict[str, Any]): Filters to be applied to room data.
|
|
The dict corresponds to the `RoomFilter` type described
|
|
in https://matrix.org/docs/spec/client_server/latest#id240
|
|
"""
|
|
method, path, data = Api.upload_filter(
|
|
self.access_token,
|
|
user_id or self.user_id,
|
|
event_fields,
|
|
event_format,
|
|
presence,
|
|
account_data,
|
|
room,
|
|
)
|
|
|
|
return await self._send(UploadFilterResponse, method, path, data)
|
|
|
|
async def whoami(self):
|
|
if self.access_token is None:
|
|
raise ValueError("No access_token is set.")
|
|
|
|
method, path = Api.whoami(self.access_token)
|
|
return await self._send(WhoamiResponse, method, path)
|
|
|
|
@logged_in
|
|
async def set_pushrule(
|
|
self,
|
|
scope: str,
|
|
kind: PushRuleKind,
|
|
rule_id: str,
|
|
before: Optional[str] = None,
|
|
after: Optional[str] = None,
|
|
actions: Sequence[PushAction] = (),
|
|
conditions: Optional[Sequence[PushCondition]] = None,
|
|
pattern: Optional[str] = None,
|
|
) -> Union[SetPushRuleResponse, SetPushRuleError]:
|
|
"""Create or modify an existing push rule.
|
|
|
|
Returns either a `SetPushRuleResponse` if the request was
|
|
successful or a `SetPushRuleError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
scope (str): The scope of this rule, e.g. ``"global"``.
|
|
Homeservers currently only process ``global`` rules for
|
|
event matching, while ``device`` rules are a planned feature.
|
|
It is up to clients to interpret any other scope name.
|
|
|
|
kind (PushRuleKind): The kind of rule.
|
|
|
|
rule_id (str): The identifier of the rule. Must be unique
|
|
within its scope and kind.
|
|
For rules of ``room`` kind, this is the room ID to match for.
|
|
For rules of ``sender`` kind, this is the user ID to match.
|
|
|
|
before (Optional[str]): Position this rule before the one matching
|
|
the given rule ID.
|
|
The rule ID cannot belong to a predefined server rule.
|
|
``before`` and ``after`` cannot be both specified.
|
|
|
|
after (Optional[str]): Position this rule after the one matching
|
|
the given rule ID.
|
|
The rule ID cannot belong to a predefined server rule.
|
|
``before`` and ``after`` cannot be both specified.
|
|
|
|
actions (Sequence[PushAction]): Actions to perform when the
|
|
conditions for this rule are met. The given actions replace
|
|
the existing ones.
|
|
|
|
conditions (Sequence[PushCondition]): Event conditions that must
|
|
hold true for the rule to apply to that event.
|
|
A rule with no conditions always hold true.
|
|
Only applicable to ``underride`` and ``override`` rules.
|
|
|
|
pattern (Optional[str]): Glob-style pattern to match against
|
|
for the event's content.
|
|
Only applicable to ``content`` rules.
|
|
|
|
Example:
|
|
>>> client.set_pushrule(
|
|
... scope = "global",
|
|
... kind = PushRuleKind.room,
|
|
... rule_id = "!foo123:example.org",
|
|
... actions = [PushNotify(), PushSetTweak("sound", "default")],
|
|
... )
|
|
...
|
|
... client.set_pushrule(
|
|
... scope = "global",
|
|
... kind = PushRuleKind.override,
|
|
... rule_id = "silence_large_rooms",
|
|
... actions = [],
|
|
... conditions = [PushRoomMemberCount(10, ">")],
|
|
... )
|
|
...
|
|
... client.set_pushrule(
|
|
... scope = "global",
|
|
... kind = PushRuleKind.content,
|
|
... rule_id = "highlight_messages_containing_nio_word",
|
|
... actions = [PushNotify(), PushSetTweak("highlight", True)],
|
|
... pattern = "nio"
|
|
... )
|
|
|
|
"""
|
|
|
|
method, path, data = Api.set_pushrule(
|
|
self.access_token,
|
|
scope,
|
|
kind,
|
|
rule_id,
|
|
before,
|
|
after,
|
|
actions,
|
|
conditions,
|
|
pattern,
|
|
)
|
|
|
|
return await self._send(SetPushRuleResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def delete_pushrule(
|
|
self,
|
|
scope: str,
|
|
kind: PushRuleKind,
|
|
rule_id: str,
|
|
) -> Union[DeletePushRuleResponse, DeletePushRuleError]:
|
|
"""Delete an existing push rule.
|
|
|
|
Returns either a `DeletePushRuleResponse` if the request was
|
|
successful or a `DeletePushRuleError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
scope (str): The scope of this rule, e.g. ``"global"``.
|
|
Homeservers currently only process ``global`` rules for
|
|
event matching, while ``device`` rules are a planned feature.
|
|
It is up to clients to interpret any other scope name.
|
|
|
|
kind (PushRuleKind): The kind of rule.
|
|
|
|
rule_id (str): The identifier of the rule. Must be unique
|
|
within its scope and kind.
|
|
"""
|
|
|
|
method, path = Api.delete_pushrule(
|
|
self.access_token,
|
|
scope,
|
|
kind,
|
|
rule_id,
|
|
)
|
|
|
|
return await self._send(DeletePushRuleResponse, method, path)
|
|
|
|
@logged_in
|
|
async def enable_pushrule(
|
|
self,
|
|
scope: str,
|
|
kind: PushRuleKind,
|
|
rule_id: str,
|
|
enable: bool,
|
|
) -> Union[EnablePushRuleResponse, EnablePushRuleError]:
|
|
"""Enable or disable an existing push rule.
|
|
|
|
Returns either a `EnablePushRuleResponse` if the request was
|
|
successful or a `EnablePushRuleError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
scope (str): The scope of this rule, e.g. ``"global"``.
|
|
Homeservers currently only process ``global`` rules for
|
|
event matching, while ``device`` rules are a planned feature.
|
|
It is up to clients to interpret any other scope name.
|
|
|
|
kind (PushRuleKind): The kind of rule.
|
|
|
|
rule_id (str): The identifier of the rule. Must be unique
|
|
within its scope and kind.
|
|
|
|
enable (bool): Whether to enable or disable this rule.
|
|
"""
|
|
|
|
method, path, data = Api.enable_pushrule(
|
|
self.access_token,
|
|
scope,
|
|
kind,
|
|
rule_id,
|
|
enable,
|
|
)
|
|
|
|
return await self._send(EnablePushRuleResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def set_pushrule_actions(
|
|
self,
|
|
scope: str,
|
|
kind: PushRuleKind,
|
|
rule_id: str,
|
|
actions: Sequence[PushAction],
|
|
) -> Union[SetPushRuleActionsResponse, SetPushRuleActionsError]:
|
|
"""Set the actions for an existing built-in or user-created push rule.
|
|
|
|
Unlike ``set_pushrule``, this method can edit built-in server rules.
|
|
|
|
Returns the HTTP method, HTTP path and data for the request.
|
|
Returns either a `SetPushRuleActionsResponse` if the request was
|
|
successful or a `SetPushRuleActionsError` if there was an error
|
|
with the request.
|
|
|
|
Args:
|
|
scope (str): The scope of this rule, e.g. ``"global"``.
|
|
Homeservers currently only process ``global`` rules for
|
|
event matching, while ``device`` rules are a planned feature.
|
|
It is up to clients to interpret any other scope name.
|
|
|
|
kind (PushRuleKind): The kind of rule.
|
|
|
|
rule_id (str): The identifier of the rule. Must be unique
|
|
within its scope and kind.
|
|
|
|
actions (Sequence[PushAction]): Actions to perform when the
|
|
conditions for this rule are met. The given actions replace
|
|
the existing ones.
|
|
"""
|
|
|
|
method, path, data = Api.set_pushrule_actions(
|
|
self.access_token,
|
|
scope,
|
|
kind,
|
|
rule_id,
|
|
actions,
|
|
)
|
|
|
|
return await self._send(SetPushRuleActionsResponse, method, path, data)
|
|
|
|
@logged_in
|
|
async def room_update_aliases(
|
|
self,
|
|
room_id: str,
|
|
canonical_alias: Union[str, None] = None,
|
|
alt_aliases: List[str] = [],
|
|
):
|
|
"""Update the aliases of an existing room.
|
|
This method will not transfer aliases from one room to another!
|
|
Remove the old alias before trying to assign it again
|
|
|
|
Args:
|
|
room_id (str): Room-ID of the room to assign / remove aliases from
|
|
|
|
canonical_alias (str, None): The main alias of the room
|
|
|
|
alt_aliases (list[str], None): List of alternative aliases for the room
|
|
|
|
If None is passed as canonical_alias or alt_aliases the existing aliases
|
|
will be removed without assigning new aliases.
|
|
"""
|
|
# Concentrate new aliases
|
|
if canonical_alias is None:
|
|
new_aliases = list()
|
|
else:
|
|
new_aliases = alt_aliases + [canonical_alias]
|
|
|
|
# Get current aliases
|
|
current_aliases = list()
|
|
current_alias_event = await self.room_get_state_event(
|
|
room_id, "m.room.canonical_alias"
|
|
)
|
|
if isinstance(current_alias_event, RoomGetStateEventResponse):
|
|
current_aliases.append(current_alias_event.content["alias"])
|
|
if "alt_aliases" in current_alias_event.content:
|
|
alt_aliases = current_alias_event.content["alt_aliases"]
|
|
for alias in alt_aliases:
|
|
current_aliases.append(alias)
|
|
|
|
# Unregister old aliases
|
|
for alias in current_aliases:
|
|
if alias not in new_aliases:
|
|
if isinstance(
|
|
await self.room_delete_alias(alias), RoomDeleteAliasError
|
|
):
|
|
return RoomUpdateAliasError(f"Could not delete alias {alias}")
|
|
|
|
# Register new aliases
|
|
for alias in new_aliases:
|
|
if isinstance(
|
|
await self.room_put_alias(alias, room_id), RoomDeleteAliasError
|
|
):
|
|
return RoomUpdateAliasError(f"Could not put alias {alias}")
|
|
|
|
# Send m.room.canonical_alias event
|
|
put_alias_event = await self.room_put_state(
|
|
room_id,
|
|
"m.room.canonical_alias",
|
|
{"alias": canonical_alias, "alt_aliases": alt_aliases},
|
|
)
|
|
if isinstance(put_alias_event, RoomPutStateError):
|
|
return RoomUpdateAliasError("Failed to put m.room.canonical_alias")
|
|
return RoomUpdateAliasResponse()
|
|
|
|
@logged_in
|
|
async def room_upgrade(
|
|
self,
|
|
old_room_id: str,
|
|
new_room_version: str,
|
|
copy_events: list = [
|
|
"m.room.server_acl",
|
|
"m.room.encryption",
|
|
"m.room.name",
|
|
"m.room.avatar",
|
|
"m.room.topic",
|
|
"m.room.guest_access",
|
|
"m.room.history_visibility",
|
|
"m.room.join_rules",
|
|
"m.room.power_levels",
|
|
],
|
|
room_upgrade_message: str = "This room has been replaced",
|
|
room_power_level_overwrite: Optional[Dict[str, Any]] = None,
|
|
) -> Union[RoomUpgradeResponse, RoomUpgradeError]:
|
|
"""Upgrade an existing room.
|
|
|
|
Args:
|
|
old_room_id (str): Room-ID of the old room
|
|
|
|
new_room_version (str): The new room version
|
|
|
|
copy_events (list): List of state-events to copy from the old room
|
|
Defaults m.room.server_acl, m.room.encryption, m.room.name,
|
|
m.room.avatar, m.room.topic, m.room.guest_access,
|
|
m.room.history_visibility, m.room.join_rules, m.room.power_levels
|
|
|
|
room_upgrade_message (str): Message inside the tombstone-event
|
|
|
|
room_power_level_overwrite (dict): A ``m.room.power_levels content`` dict
|
|
to override the default.
|
|
The dict will be applied on top of the generated
|
|
``m.room.power_levels`` event before it is sent to the room.
|
|
"""
|
|
# Check if we are allowed to tombstone a room
|
|
if not await self.has_event_permission(old_room_id, "m.room.tombstone"):
|
|
return RoomUpgradeError("Not allowed to upgrade room")
|
|
|
|
# Get state events for the old room
|
|
old_room_state_events = await self.room_get_state(old_room_id)
|
|
if isinstance(old_room_state_events, RoomGetStateError):
|
|
return RoomUpgradeError("Failed to get room events")
|
|
|
|
# Get initial_state and power_level
|
|
old_room_power_levels = None
|
|
new_room_initial_state = list()
|
|
for event in old_room_state_events.events:
|
|
if (
|
|
event["type"] in copy_events
|
|
and not event["type"] == "m.room.power_levels"
|
|
):
|
|
new_room_initial_state.append(event)
|
|
if event["type"] == "m.room.power_levels":
|
|
old_room_power_levels = event["content"]
|
|
|
|
# Get last known event from the old room
|
|
old_room_event = await self.room_messages(
|
|
start="", room_id=old_room_id, limit=1
|
|
)
|
|
if isinstance(old_room_event, RoomMessagesError):
|
|
return RoomUpgradeError("Failed to get last known event")
|
|
|
|
old_room_last_event = old_room_event.chunk[0]
|
|
|
|
# Overwrite power level if a new power level was passed
|
|
if room_power_level_overwrite is not None:
|
|
old_room_power_levels = room_power_level_overwrite
|
|
|
|
# Create new room
|
|
new_room = await self.room_create(
|
|
room_version=new_room_version,
|
|
power_level_override=old_room_power_levels,
|
|
initial_state=new_room_initial_state,
|
|
predecessor={
|
|
"event_id": old_room_last_event.event_id,
|
|
"room_id": old_room_id,
|
|
},
|
|
)
|
|
|
|
if isinstance(new_room, RoomCreateError):
|
|
return RoomUpgradeError("Room creation failed")
|
|
|
|
# Send tombstone event to the old room
|
|
old_room_tombstone = await self.room_put_state(
|
|
old_room_id,
|
|
"m.room.tombstone",
|
|
{"body": room_upgrade_message, "replacement_room": new_room.room_id},
|
|
)
|
|
if isinstance(old_room_tombstone, RoomPutStateError):
|
|
return RoomUpgradeError("Failed to put m.room.tombstone")
|
|
|
|
# Get the old rooms aliases
|
|
old_room_alias = await self.room_get_state_event(
|
|
old_room_id, "m.room.canonical_alias"
|
|
)
|
|
if isinstance(old_room_alias, RoomGetStateEventResponse):
|
|
aliases = list()
|
|
aliases.append(old_room_alias.content["alias"])
|
|
if "alt_aliases" in old_room_alias.content:
|
|
alt_aliases = old_room_alias.content["alt_aliases"]
|
|
for alias in alt_aliases:
|
|
aliases.append(alias)
|
|
else:
|
|
alt_aliases = []
|
|
|
|
# Remove the old aliases
|
|
if isinstance(
|
|
await self.room_update_aliases(old_room_id), RoomDeleteAliasError
|
|
):
|
|
return RoomUpgradeError("Could update the old rooms aliases")
|
|
|
|
# Assign new aliases
|
|
if isinstance(
|
|
await self.room_update_aliases(
|
|
new_room.room_id,
|
|
canonical_alias=old_room_alias.content["alias"],
|
|
alt_aliases=alt_aliases,
|
|
),
|
|
RoomDeleteAliasError,
|
|
):
|
|
return RoomUpgradeError("Could update the new rooms aliases")
|
|
|
|
return RoomUpgradeResponse(new_room.room_id)
|
|
|
|
@logged_in
|
|
async def update_room_topic(
|
|
self,
|
|
room_id: str,
|
|
topic: str,
|
|
) -> Union[RoomPutStateResponse, RoomPutStateError]:
|
|
"""Update the room topic
|
|
|
|
Returns either a `RoomPutStateResponse` if the request was successful
|
|
or a `RoomPutStateError` if there was an error with the request.
|
|
|
|
If you wish to send a `state_key` along with the request, use the `room_put_state` method instead.
|
|
|
|
Args:
|
|
room_id (str): The room id of the room to be updated.
|
|
topic (str): The new room topic.
|
|
"""
|
|
|
|
return await self.room_put_state(
|
|
room_id,
|
|
event_type="m.room.topic",
|
|
content={"topic": topic},
|
|
)
|
|
|
|
@logged_in
|
|
async def has_event_permission(
|
|
self, room_id: str, event_name: str, event_type: str = "event"
|
|
) -> Union[bool, ErrorResponse]:
|
|
who_am_i = await self.whoami()
|
|
power_levels = await self.room_get_state_event(room_id, "m.room.power_levels")
|
|
|
|
try:
|
|
user_power_level = power_levels.content["users"][who_am_i.user_id]
|
|
except KeyError:
|
|
user_power_level = power_levels.content["users_default"]
|
|
else:
|
|
return ErrorResponse("Couldn't get user power levels")
|
|
|
|
try:
|
|
event_power_level = power_levels.content["events"][event_name]
|
|
except KeyError:
|
|
if event_type == "event":
|
|
event_power_level = power_levels.content["events_default"]
|
|
elif event_type == "state":
|
|
event_power_level = power_levels.content["state_default"]
|
|
else:
|
|
return ErrorResponse(f"event_type {event_type} unknown")
|
|
else:
|
|
return ErrorResponse("Couldn't get event power levels")
|
|
|
|
return user_power_level >= event_power_level
|
|
|
|
async def has_permission(
|
|
self, room_id: str, permission_type: str
|
|
) -> Union[bool, ErrorResponse]:
|
|
who_am_i = await self.whoami()
|
|
power_levels = await self.room_get_state_event(room_id, "m.room.power_levels")
|
|
|
|
try:
|
|
user_power_level = power_levels.content["users"][who_am_i.user_id]
|
|
except KeyError:
|
|
user_power_level = power_levels.content["users_default"]
|
|
else:
|
|
return ErrorResponse("Couldn't get user power levels")
|
|
|
|
try:
|
|
permission_power_level = power_levels.content[permission_type]
|
|
except KeyError:
|
|
return ErrorResponse(f"permission_type {permission_type} unknown")
|
|
|
|
return user_power_level >= permission_power_level
|