mautrix-python/mautrix/client/api/modules/media_repository.py

353 lines
14 KiB
Python

# Copyright (c) 2022 Tulir Asokan
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations
from typing import Any, AsyncIterable, Literal
from contextlib import contextmanager
import asyncio
import time
from yarl import URL
from mautrix import __optional_imports__
from mautrix.api import MediaPath, Method
from mautrix.errors import MatrixResponseError, make_request_error
from mautrix.types import (
ContentURI,
MediaCreateResponse,
MediaRepoConfig,
MXOpenGraph,
SerializerError,
SpecVersions,
)
from mautrix.util import background_task
from mautrix.util.async_body import async_iter_bytes
from mautrix.util.opt_prometheus import Histogram
from ..base import BaseClientAPI
try:
from mautrix.util import magic
except ImportError:
if __optional_imports__:
raise
magic = None # type: ignore
UPLOAD_TIME = Histogram(
"bridge_media_upload_time",
"Time spent uploading media (milliseconds per megabyte)",
buckets=[10, 25, 50, 100, 250, 500, 750, 1000, 2500, 5000, 10000],
)
class MediaRepositoryMethods(BaseClientAPI):
"""
Methods in section 13.8 Content Repository of the spec. These methods are used for uploading and
downloading content from the media repository and for getting URL previews without leaking
client IPs.
See also: `API reference <https://spec.matrix.org/v1.7/client-server-api/#content-repository>`__
"""
async def create_mxc(self) -> MediaCreateResponse:
"""
Create a media ID for uploading media to the homeserver.
See also: `API reference <https://spec.matrix.org/unstable/client-server-api/#post_matrixmediav1create>`__
Returns:
MediaCreateResponse Containing the MXC URI that can be used to upload a file to later
"""
resp = await self.api.request(Method.POST, MediaPath.v1.create)
return MediaCreateResponse.deserialize(resp)
@contextmanager
def _observe_upload_time(self, size: int | None, mxc: ContentURI | None = None) -> None:
start = time.monotonic_ns()
yield
duration = time.monotonic_ns() - start
if mxc:
duration_sec = duration / 1000**3
self.log.debug(f"Completed asynchronous upload of {mxc} in {duration_sec:.3f} seconds")
if size:
UPLOAD_TIME.observe(duration / size)
async def upload_media(
self,
data: bytes | bytearray | AsyncIterable[bytes],
mime_type: str | None = None,
filename: str | None = None,
size: int | None = None,
mxc: ContentURI | None = None,
async_upload: bool = False,
) -> ContentURI:
"""
Upload a file to the content repository.
See also: `API reference <https://spec.matrix.org/v1.7/client-server-api/#post_matrixmediav3upload>`__
Args:
data: The data to upload.
mime_type: The MIME type to send with the upload request.
filename: The filename to send with the upload request.
size: The file size to send with the upload request.
mxc: An existing MXC URI which doesn't have content yet to upload into.
async_upload: Should the media be uploaded in the background?
If ``True``, this will create a MXC URI using :meth:`create_mxc`, start uploading
in the background, and then immediately return the created URI. This is mutually
exclusive with manually passing the ``mxc`` parameter.
Returns:
The MXC URI to the uploaded file.
Raises:
MatrixResponseError: If the response does not contain a ``content_uri`` field.
ValueError: if both ``async_upload`` and ``mxc`` are provided at the same time.
"""
if magic and isinstance(data, bytes):
mime_type = mime_type or magic.mimetype(data)
headers = {}
if mime_type:
headers["Content-Type"] = mime_type
if size:
headers["Content-Length"] = str(size)
elif isinstance(data, (bytes, bytearray)):
size = len(data)
query = {}
if filename:
query["filename"] = filename
upload_url = None
if async_upload:
if mxc:
raise ValueError("async_upload and mxc can't be provided simultaneously")
create_response = await self.create_mxc()
mxc = create_response.content_uri
upload_url = create_response.unstable_upload_url
path = MediaPath.v3.upload
method = Method.POST
if mxc:
server_name, media_id = self.api.parse_mxc_uri(mxc)
if upload_url is None:
path = MediaPath.v3.upload[server_name][media_id]
method = Method.PUT
else:
path = (
MediaPath.unstable["com.beeper.msc3870"].upload[server_name][media_id].complete
)
if upload_url is not None:
task = self._upload_to_url(upload_url, path, headers, data, post_upload_query=query)
else:
task = self.api.request(
method, path, content=data, headers=headers, query_params=query
)
if async_upload:
async def _try_upload():
try:
with self._observe_upload_time(size, mxc):
await task
except Exception as e:
self.log.error(f"Failed to upload {mxc}: {type(e).__name__}: {e}")
background_task.create(_try_upload())
return mxc
else:
with self._observe_upload_time(size):
resp = await task
try:
return resp["content_uri"]
except KeyError:
raise MatrixResponseError("`content_uri` not in response.")
async def download_media(self, url: ContentURI, timeout_ms: int | None = None) -> bytes:
"""
Download a file from the content repository.
See also: `API reference <https://spec.matrix.org/v1.7/client-server-api/#get_matrixmediav3downloadservernamemediaid>`__
Args:
url: The MXC URI to download.
timeout_ms: The maximum number of milliseconds that the client is willing to wait to
start receiving data. Used for asynchronous uploads.
Returns:
The raw downloaded data.
"""
authenticated = (await self.versions()).supports(SpecVersions.V111)
url = self.api.get_download_url(url, authenticated=authenticated)
query_params: dict[str, Any] = {"allow_redirect": "true"}
if timeout_ms is not None:
query_params["timeout_ms"] = timeout_ms
headers: dict[str, str] = {}
if authenticated:
headers["Authorization"] = f"Bearer {self.api.token}"
if self.api.as_user_id:
query_params["user_id"] = self.api.as_user_id
req_id = self.api.log_download_request(url, query_params)
start = time.monotonic()
async with self.api.session.get(url, params=query_params, headers=headers) as response:
try:
response.raise_for_status()
return await response.read()
finally:
self.api.log_download_request_done(
url, req_id, time.monotonic() - start, response.status
)
async def download_thumbnail(
self,
url: ContentURI,
width: int | None = None,
height: int | None = None,
resize_method: Literal["crop", "scale"] = None,
allow_remote: bool | None = None,
timeout_ms: int | None = None,
):
"""
Download a thumbnail for a file in the content repository.
See also: `API reference <https://spec.matrix.org/v1.7/client-server-api/#get_matrixmediav3thumbnailservernamemediaid>`__
Args:
url: The MXC URI to download.
width: The _desired_ width of the thumbnail. The actual thumbnail may not match the size
specified.
height: The _desired_ height of the thumbnail. The actual thumbnail may not match the
size specified.
resize_method: The desired resizing method. Either ``crop`` or ``scale``.
allow_remote: Indicates to the server that it should not attempt to fetch the media if
it is deemed remote. This is to prevent routing loops where the server contacts
itself.
timeout_ms: The maximum number of milliseconds that the client is willing to wait to
start receiving data. Used for asynchronous Uploads.
Returns:
The raw downloaded data.
"""
authenticated = (await self.versions()).supports(SpecVersions.V111)
url = self.api.get_download_url(
url, download_type="thumbnail", authenticated=authenticated
)
query_params: dict[str, Any] = {"allow_redirect": "true"}
if width is not None:
query_params["width"] = width
if height is not None:
query_params["height"] = height
if resize_method is not None:
query_params["method"] = resize_method
if allow_remote is not None:
query_params["allow_remote"] = str(allow_remote).lower()
if timeout_ms is not None:
query_params["timeout_ms"] = timeout_ms
headers: dict[str, str] = {}
if authenticated:
headers["Authorization"] = f"Bearer {self.api.token}"
if self.api.as_user_id:
query_params["user_id"] = self.api.as_user_id
req_id = self.api.log_download_request(url, query_params)
start = time.monotonic()
async with self.api.session.get(url, params=query_params, headers=headers) as response:
try:
response.raise_for_status()
return await response.read()
finally:
self.api.log_download_request_done(
url, req_id, time.monotonic() - start, response.status
)
async def get_url_preview(self, url: str, timestamp: int | None = None) -> MXOpenGraph:
"""
Get information about a URL for a client.
See also: `API reference <https://spec.matrix.org/v1.2/client-server-api/#get_matrixmediav3preview_url>`__
Args:
url: The URL to get a preview of.
timestamp: The preferred point in time to return a preview for. The server may return a
newer version if it does not have the requested version available.
"""
query_params = {"url": url}
if timestamp is not None:
query_params["ts"] = timestamp
content = await self.api.request(
Method.GET, MediaPath.v3.preview_url, query_params=query_params
)
try:
return MXOpenGraph.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid MXOpenGraph in response.") from e
async def get_media_repo_config(self) -> MediaRepoConfig:
"""
This endpoint allows clients to retrieve the configuration of the content repository, such
as upload limitations. Clients SHOULD use this as a guide when using content repository
endpoints. All values are intentionally left optional. Clients SHOULD follow the advice
given in the field description when the field is not available.
**NOTE:** Both clients and server administrators should be aware that proxies between the
client and the server may affect the apparent behaviour of content repository APIs, for
example, proxies may enforce a lower upload size limit than is advertised by the server on
this endpoint.
See also: `API reference <https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-media-r0-config>`__
Returns:
The media repository config.
"""
content = await self.api.request(Method.GET, MediaPath.v3.config)
try:
return MediaRepoConfig.deserialize(content)
except SerializerError as e:
raise MatrixResponseError("Invalid MediaRepoConfig in response") from e
async def _upload_to_url(
self,
upload_url: str,
post_upload_path: str,
headers: dict[str, str],
data: bytes | bytearray | AsyncIterable[bytes],
post_upload_query: dict[str, str],
min_iter_size: int = 25 * 1024 * 1024,
) -> None:
retry_count = self.api.default_retry_count
backoff = 2
do_fake_iter = data and hasattr(data, "__len__") and len(data) > min_iter_size
if do_fake_iter:
headers["Content-Length"] = str(len(data))
while True:
self.log.debug("Uploading media to external URL %s", upload_url)
upload_response = None
try:
req_data = async_iter_bytes(data) if do_fake_iter else data
upload_response = await self.api.session.put(
upload_url, data=req_data, headers=headers
)
upload_response.raise_for_status()
except Exception as e:
if retry_count <= 0:
raise make_request_error(
http_status=upload_response.status if upload_response else -1,
text=(await upload_response.text()) if upload_response else "",
errcode="COM.BEEPER.EXTERNAL_UPLOAD_ERROR",
message=None,
)
self.log.warning(
f"Uploading media to external URL {upload_url} failed: {e}, "
f"retrying in {backoff} seconds",
)
await asyncio.sleep(backoff)
backoff *= 2
retry_count -= 1
else:
break
await self.api.request(Method.POST, post_upload_path, query_params=post_upload_query)