1041 lines
38 KiB
Python
1041 lines
38 KiB
Python
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
||
# Copyright (C) 2022 Tulir Asokan
|
||
#
|
||
# This program is free software: you can redistribute it and/or modify
|
||
# it under the terms of the GNU Affero General Public License as published by
|
||
# the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU Affero General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU Affero General Public License
|
||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, NamedTuple
|
||
import base64
|
||
import codecs
|
||
import hashlib
|
||
import html
|
||
import mimetypes
|
||
import unicodedata
|
||
|
||
from attr import dataclass
|
||
from telethon.tl.types import (
|
||
Document,
|
||
DocumentAttributeAnimated,
|
||
DocumentAttributeAudio,
|
||
DocumentAttributeFilename,
|
||
DocumentAttributeImageSize,
|
||
DocumentAttributeSticker,
|
||
DocumentAttributeVideo,
|
||
Game,
|
||
InputPhotoFileLocation,
|
||
InputStickerSetID,
|
||
InputStickerSetShortName,
|
||
Message,
|
||
MessageEntityPre,
|
||
MessageMediaContact,
|
||
MessageMediaDice,
|
||
MessageMediaDocument,
|
||
MessageMediaGame,
|
||
MessageMediaGeo,
|
||
MessageMediaGeoLive,
|
||
MessageMediaInvoice,
|
||
MessageMediaPhoto,
|
||
MessageMediaPoll,
|
||
MessageMediaStory,
|
||
MessageMediaUnsupported,
|
||
MessageMediaVenue,
|
||
MessageMediaWebPage,
|
||
MessageReplyStoryHeader,
|
||
PeerChannel,
|
||
PeerChat,
|
||
PeerUser,
|
||
Photo,
|
||
PhotoCachedSize,
|
||
PhotoEmpty,
|
||
PhotoSize,
|
||
PhotoSizeEmpty,
|
||
PhotoSizeProgressive,
|
||
Poll,
|
||
TypeDocumentAttribute,
|
||
TypePhotoSize,
|
||
UpdateShortChatMessage,
|
||
UpdateShortMessage,
|
||
WebPage,
|
||
)
|
||
from telethon.utils import decode_waveform
|
||
|
||
from mautrix.appservice import IntentAPI
|
||
from mautrix.types import (
|
||
EventID,
|
||
EventType,
|
||
Format,
|
||
ImageInfo,
|
||
LocationMessageEventContent,
|
||
MediaMessageEventContent,
|
||
MessageEventContent,
|
||
MessageType,
|
||
TextMessageEventContent,
|
||
ThumbnailInfo,
|
||
)
|
||
from mautrix.util.logging import TraceLogger
|
||
|
||
from .. import abstract_user as au, formatter, matrix as m, portal as po, puppet as pu, util
|
||
from ..config import Config
|
||
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
|
||
from ..tgclient import MautrixTelegramClient
|
||
from ..types import TelegramID
|
||
from ..util import sane_mimetypes
|
||
|
||
try:
|
||
import phonenumbers
|
||
except ImportError:
|
||
phonenumbers = None
|
||
|
||
|
||
@dataclass
|
||
class ConvertedMessage:
|
||
content: MessageEventContent
|
||
caption: MessageEventContent | None = None
|
||
type: EventType = EventType.ROOM_MESSAGE
|
||
disappear_seconds: int | None = None
|
||
disappear_start_immediately: bool = False
|
||
|
||
|
||
class DocAttrs(NamedTuple):
|
||
name: str | None
|
||
mime_type: str | None
|
||
is_sticker: bool
|
||
sticker_alt: str | None
|
||
sticker_pack_ref: dict | None
|
||
width: int
|
||
height: int
|
||
is_gif: bool
|
||
is_audio: bool
|
||
is_voice: bool
|
||
duration: int
|
||
waveform: bytes
|
||
|
||
|
||
BEEPER_LINK_PREVIEWS_KEY = "com.beeper.linkpreviews"
|
||
BEEPER_IMAGE_ENCRYPTION_KEY = "beeper:image:encryption"
|
||
|
||
|
||
class TelegramMessageConverter:
|
||
portal: po.Portal
|
||
matrix: m.MatrixHandler
|
||
config: Config
|
||
command_prefix: str
|
||
log: TraceLogger
|
||
|
||
def __init__(self, portal: po.Portal) -> None:
|
||
self.portal = portal
|
||
self.matrix = portal.matrix
|
||
self.config = portal.config
|
||
self.command_prefix = self.config["bridge.command_prefix"]
|
||
self.log = portal.log.getChild("msg_conv")
|
||
|
||
self._media_converters = {
|
||
MessageMediaPhoto: self._convert_photo,
|
||
MessageMediaDocument: self._convert_document,
|
||
MessageMediaGeo: self._convert_location,
|
||
MessageMediaGeoLive: self._convert_location,
|
||
MessageMediaVenue: self._convert_location,
|
||
MessageMediaPoll: self._convert_poll,
|
||
MessageMediaDice: self._convert_dice,
|
||
MessageMediaUnsupported: self._convert_unsupported,
|
||
MessageMediaGame: self._convert_game,
|
||
MessageMediaContact: self._convert_contact,
|
||
MessageMediaStory: self._convert_story,
|
||
MessageMediaInvoice: self._convert_invoice,
|
||
}
|
||
self._allowed_media = tuple(self._media_converters.keys())
|
||
|
||
async def convert(
|
||
self,
|
||
source: au.AbstractUser,
|
||
intent: IntentAPI,
|
||
is_bot: bool,
|
||
is_channel: bool,
|
||
evt: Message,
|
||
no_reply_fallback: bool = False,
|
||
deterministic_reply_id: bool = False,
|
||
client: MautrixTelegramClient | None = None,
|
||
) -> ConvertedMessage | None:
|
||
if not client:
|
||
client = source.client
|
||
if hasattr(evt, "media") and isinstance(evt.media, self._allowed_media):
|
||
if self._should_convert_full_document(evt.media, is_bot, is_channel):
|
||
convert_media = self._media_converters[type(evt.media)]
|
||
converted = await convert_media(
|
||
source=source, intent=intent, evt=evt, client=client
|
||
)
|
||
else:
|
||
converted = await self._convert_document_thumb_only(source, intent, evt, client)
|
||
elif evt.message:
|
||
converted = await self._convert_text(source, intent, is_bot, evt, client)
|
||
else:
|
||
self.log.debug("Unhandled Telegram message %d", evt.id)
|
||
return
|
||
if converted:
|
||
if evt.ttl_period and not converted.disappear_seconds:
|
||
converted.disappear_seconds = evt.ttl_period
|
||
converted.disappear_start_immediately = True
|
||
converted.content.external_url = self._get_external_url(evt)
|
||
converted.content["fi.mau.telegram.source"] = {
|
||
"space": self.portal.tgid if self.portal.peer_type == "channel" else source.tgid,
|
||
"chat_id": self.portal.tgid,
|
||
"peer_type": self.portal.peer_type,
|
||
"id": evt.id,
|
||
}
|
||
if converted.caption:
|
||
converted.caption["fi.mau.telegram.source"] = converted.content[
|
||
"fi.mau.telegram.source"
|
||
]
|
||
converted.caption.external_url = converted.content.external_url
|
||
if self.portal.get_config("caption_in_message"):
|
||
self._caption_to_message(converted)
|
||
await self._set_reply(
|
||
source,
|
||
evt,
|
||
converted.content,
|
||
no_fallback=no_reply_fallback,
|
||
deterministic_id=deterministic_reply_id,
|
||
)
|
||
return converted
|
||
|
||
def _should_convert_full_document(self, media, is_bot: bool, is_channel: bool) -> bool:
|
||
if not isinstance(media, MessageMediaDocument):
|
||
return True
|
||
size = media.document.size
|
||
if is_bot and self.config["bridge.document_as_link_size.bot"]:
|
||
return size < self.config["bridge.document_as_link_size.bot"] * 1000**2
|
||
if is_channel and self.config["bridge.document_as_link_size.channel"]:
|
||
return size < self.config["bridge.document_as_link_size.channel"] * 1000**2
|
||
return True
|
||
|
||
@staticmethod
|
||
def _caption_to_message(converted: ConvertedMessage) -> None:
|
||
content, caption = converted.content, converted.caption
|
||
converted.caption = None
|
||
|
||
content["filename"] = content.body
|
||
content["org.matrix.msc1767.caption"] = {
|
||
"org.matrix.msc1767.text": caption.body,
|
||
}
|
||
content.body = caption.body
|
||
if caption.format == Format.HTML:
|
||
content["org.matrix.msc1767.caption"][
|
||
"org.matrix.msc1767.html"
|
||
] = caption.formatted_body
|
||
content["formatted_body"] = caption.formatted_body
|
||
content["format"] = Format.HTML.value
|
||
|
||
def _get_external_url(self, evt: Message) -> str | None:
|
||
if self.portal.peer_type == "channel" and self.portal.username is not None:
|
||
return f"https://t.me/{self.portal.username}/{evt.id}"
|
||
elif self.portal.peer_type != "user":
|
||
return f"https://t.me/c/{self.portal.tgid}/{evt.id}"
|
||
return None
|
||
|
||
@staticmethod
|
||
def _int_to_bytes(i: int) -> bytes:
|
||
return codecs.decode(f"{i:010x}", "hex")
|
||
|
||
def _encode_msgid(self, source: au.AbstractUser, evt: Message) -> str:
|
||
if self.portal.peer_type == "channel":
|
||
play_id = b"c" + self._int_to_bytes(self.portal.tgid) + self._int_to_bytes(evt.id)
|
||
elif self.portal.peer_type == "chat":
|
||
play_id = (
|
||
b"g"
|
||
+ self._int_to_bytes(self.portal.tgid)
|
||
+ self._int_to_bytes(evt.id)
|
||
+ self._int_to_bytes(source.tgid)
|
||
)
|
||
elif self.portal.peer_type == "user":
|
||
play_id = b"u" + self._int_to_bytes(self.portal.tgid) + self._int_to_bytes(evt.id)
|
||
else:
|
||
raise ValueError("Portal has invalid peer type")
|
||
return base64.b64encode(play_id).decode("utf-8").rstrip("=")
|
||
|
||
def deterministic_event_id(self, space: TelegramID, msg_id: TelegramID) -> EventID:
|
||
hash_content = f"{self.portal.mxid}/telegram/{space}/{msg_id}"
|
||
hashed = hashlib.sha256(hash_content.encode("utf-8")).digest()
|
||
b64hash = base64.urlsafe_b64encode(hashed).decode("utf-8").rstrip("=")
|
||
return EventID(f"${b64hash}:telegram.org")
|
||
|
||
async def _set_reply(
|
||
self,
|
||
source: au.AbstractUser,
|
||
evt: Message,
|
||
content: MessageEventContent,
|
||
no_fallback: bool = False,
|
||
deterministic_id: bool = False,
|
||
) -> None:
|
||
if not evt.reply_to:
|
||
return
|
||
elif isinstance(evt.reply_to, MessageReplyStoryHeader):
|
||
return
|
||
|
||
if evt.reply_to.quote and content.msgtype.is_text:
|
||
content.ensure_has_html()
|
||
quote_html = await formatter.telegram_text_to_matrix_html(
|
||
source, evt.reply_to.quote_text, evt.reply_to.quote_entities
|
||
)
|
||
content.formatted_body = (
|
||
f"<blockquote data-telegram-partial-reply>{quote_html}</blockquote>"
|
||
f"{content.formatted_body}"
|
||
)
|
||
|
||
space = (
|
||
evt.peer_id.channel_id
|
||
if isinstance(evt, Message) and isinstance(evt.peer_id, PeerChannel)
|
||
else source.tgid
|
||
)
|
||
if isinstance(evt, Message):
|
||
evt_peer_id = evt.peer_id
|
||
elif isinstance(evt, UpdateShortMessage):
|
||
evt_peer_id = PeerUser(evt.user_id)
|
||
elif isinstance(evt, UpdateShortChatMessage):
|
||
evt_peer_id = PeerChat(evt.chat_id)
|
||
else:
|
||
evt_peer_id = None
|
||
if evt.reply_to.reply_to_peer_id and evt.reply_to.reply_to_peer_id != evt_peer_id:
|
||
if not self.config["bridge.cross_room_replies"]:
|
||
return
|
||
space = (
|
||
evt.reply_to.reply_to_peer_id.channel_id
|
||
if isinstance(evt.reply_to.reply_to_peer_id, PeerChannel)
|
||
else source.tgid
|
||
)
|
||
|
||
reply_to_id = TelegramID(evt.reply_to.reply_to_msg_id)
|
||
msg = await DBMessage.get_one_by_tgid(reply_to_id, space)
|
||
no_fallback = no_fallback or self.config["bridge.disable_reply_fallbacks"]
|
||
if not msg:
|
||
# TODO try to find room ID when generating deterministic ID for cross-room reply
|
||
if deterministic_id:
|
||
content.set_reply(self.deterministic_event_id(space, reply_to_id))
|
||
return
|
||
elif msg.mx_room != self.portal.mxid and not self.config["bridge.cross_room_replies"]:
|
||
return
|
||
elif not isinstance(content, TextMessageEventContent) or no_fallback:
|
||
# Not a text message, just set the reply metadata and return
|
||
content.set_reply(msg.mxid)
|
||
if msg.mx_room != self.portal.mxid:
|
||
content.relates_to.in_reply_to["room_id"] = msg.mx_room
|
||
return
|
||
|
||
# Text message, try to fetch original message to generate reply fallback.
|
||
try:
|
||
event = await self.portal.main_intent.get_event(msg.mx_room, msg.mxid)
|
||
if event.type == EventType.ROOM_ENCRYPTED and source.bridge.matrix.e2ee:
|
||
event = await source.bridge.matrix.e2ee.decrypt(event)
|
||
if isinstance(event.content, TextMessageEventContent):
|
||
event.content.trim_reply_fallback()
|
||
puppet = await pu.Puppet.get_by_mxid(event.sender, create=False)
|
||
content.set_reply(event, displayname=puppet.displayname if puppet else event.sender)
|
||
except Exception:
|
||
self.log.exception("Failed to get event to add reply fallback")
|
||
content.set_reply(msg.mxid)
|
||
if msg.mx_room != self.portal.mxid:
|
||
content.relates_to.in_reply_to["room_id"] = msg.mx_room
|
||
|
||
@staticmethod
|
||
def _photo_size_key(photo: TypePhotoSize) -> int:
|
||
if isinstance(photo, PhotoSize):
|
||
return photo.size
|
||
elif isinstance(photo, PhotoSizeProgressive):
|
||
return max(photo.sizes)
|
||
elif isinstance(photo, PhotoSizeEmpty):
|
||
return 0
|
||
else:
|
||
return len(photo.bytes)
|
||
|
||
@classmethod
|
||
def get_largest_photo_size(
|
||
cls, photo: Photo | Document
|
||
) -> tuple[InputPhotoFileLocation | None, TypePhotoSize | None]:
|
||
if (
|
||
not photo
|
||
or isinstance(photo, PhotoEmpty)
|
||
or (isinstance(photo, Document) and not photo.thumbs)
|
||
):
|
||
return None, None
|
||
|
||
largest = max(
|
||
photo.thumbs if isinstance(photo, Document) else photo.sizes, key=cls._photo_size_key
|
||
)
|
||
return (
|
||
InputPhotoFileLocation(
|
||
id=photo.id,
|
||
access_hash=photo.access_hash,
|
||
file_reference=photo.file_reference,
|
||
thumb_size=largest.type,
|
||
),
|
||
largest,
|
||
)
|
||
|
||
async def _webpage_to_beeper_link_preview(
|
||
self, source: au.AbstractUser, intent: IntentAPI, webpage: WebPage
|
||
) -> dict[str, Any]:
|
||
beeper_link_preview: dict[str, Any] = {
|
||
"matched_url": webpage.url,
|
||
"og:title": webpage.title,
|
||
"og:url": webpage.url,
|
||
"og:description": webpage.description,
|
||
}
|
||
|
||
# Upload an image corresponding to the link preview if it exists.
|
||
if webpage.photo:
|
||
loc, largest_size = self.get_largest_photo_size(webpage.photo)
|
||
if loc is None:
|
||
return beeper_link_preview
|
||
beeper_link_preview["og:image:height"] = largest_size.h
|
||
beeper_link_preview["og:image:width"] = largest_size.w
|
||
file = await util.transfer_file_to_matrix(
|
||
source.client,
|
||
intent,
|
||
loc,
|
||
encrypt=self.portal.encrypted,
|
||
async_upload=self.config["homeserver.async_media"],
|
||
)
|
||
|
||
if file.decryption_info:
|
||
beeper_link_preview[BEEPER_IMAGE_ENCRYPTION_KEY] = file.decryption_info.serialize()
|
||
else:
|
||
beeper_link_preview["og:image"] = file.mxc
|
||
|
||
return beeper_link_preview
|
||
|
||
async def _convert_text(
|
||
self,
|
||
source: au.AbstractUser,
|
||
intent: IntentAPI,
|
||
is_bot: bool,
|
||
evt: Message,
|
||
client: MautrixTelegramClient,
|
||
) -> ConvertedMessage:
|
||
content = await formatter.telegram_to_matrix(evt, source, client)
|
||
if is_bot and self.portal.get_config("bot_messages_as_notices"):
|
||
content.msgtype = MessageType.NOTICE
|
||
|
||
if (
|
||
hasattr(evt, "media")
|
||
and isinstance(evt.media, MessageMediaWebPage)
|
||
and isinstance(evt.media.webpage, WebPage)
|
||
):
|
||
content[BEEPER_LINK_PREVIEWS_KEY] = [
|
||
await self._webpage_to_beeper_link_preview(source, intent, evt.media.webpage)
|
||
]
|
||
|
||
return ConvertedMessage(content=content)
|
||
|
||
async def _convert_photo(
|
||
self,
|
||
source: au.AbstractUser,
|
||
intent: IntentAPI,
|
||
evt: Message,
|
||
client: MautrixTelegramClient,
|
||
) -> ConvertedMessage | None:
|
||
media: MessageMediaPhoto = evt.media
|
||
if media.photo is None and media.ttl_seconds:
|
||
return ConvertedMessage(
|
||
content=TextMessageEventContent(
|
||
msgtype=MessageType.NOTICE, body="Photo has expired"
|
||
)
|
||
)
|
||
loc, largest_size = self.get_largest_photo_size(media.photo)
|
||
if loc is None:
|
||
return ConvertedMessage(
|
||
content=TextMessageEventContent(
|
||
msgtype=MessageType.TEXT,
|
||
body="Failed to bridge image",
|
||
)
|
||
)
|
||
file = await util.transfer_file_to_matrix(
|
||
client,
|
||
intent,
|
||
loc,
|
||
encrypt=self.portal.encrypted,
|
||
async_upload=self.config["homeserver.async_media"],
|
||
)
|
||
if not file:
|
||
return None
|
||
info = ImageInfo(
|
||
height=largest_size.h,
|
||
width=largest_size.w,
|
||
orientation=0,
|
||
mimetype=file.mime_type,
|
||
size=self._photo_size_key(largest_size),
|
||
)
|
||
if media.spoiler:
|
||
info["fi.mau.telegram.spoiler"] = True
|
||
ext = sane_mimetypes.guess_extension(file.mime_type)
|
||
name = f"disappearing_image{ext}" if media.ttl_seconds else f"image{ext}"
|
||
content = MediaMessageEventContent(
|
||
msgtype=MessageType.IMAGE,
|
||
info=info,
|
||
body=name,
|
||
)
|
||
if file.decryption_info:
|
||
content.file = file.decryption_info
|
||
else:
|
||
content.url = file.mxc
|
||
caption_content = (
|
||
await formatter.telegram_to_matrix(evt, source, client) if evt.message else None
|
||
)
|
||
return ConvertedMessage(
|
||
content=content,
|
||
caption=caption_content,
|
||
disappear_seconds=self._adjust_ttl(media.ttl_seconds),
|
||
)
|
||
|
||
@staticmethod
|
||
def _adjust_ttl(ttl: int | None) -> int | None:
|
||
if not ttl:
|
||
return None
|
||
elif ttl == 2147483647:
|
||
# View-once media, set low TTL
|
||
return 15
|
||
else:
|
||
# Increase media TTL because it's supposed to be counted from opening the media,
|
||
# but we can only count it from read receipt.
|
||
return ttl * 5
|
||
|
||
async def _convert_document_thumb_only(
|
||
self,
|
||
source: au.AbstractUser,
|
||
intent: IntentAPI,
|
||
evt: Message,
|
||
client: MautrixTelegramClient,
|
||
) -> ConvertedMessage | None:
|
||
document = evt.media.document
|
||
|
||
if not document:
|
||
return None
|
||
|
||
external_link_content = "Unsupported file, please access directly on Telegram"
|
||
|
||
external_url = self._get_external_url(evt)
|
||
# We don't generate external URLs for bot users so only set if known
|
||
if external_url is not None:
|
||
external_link_content = (
|
||
f"Unsupported file, please access directly on Telegram here: {external_url}"
|
||
)
|
||
|
||
attrs = _parse_document_attributes(document.attributes)
|
||
file = None
|
||
|
||
thumb_loc, thumb_size = self.get_largest_photo_size(document)
|
||
if thumb_size and not isinstance(thumb_size, (PhotoSize, PhotoCachedSize)):
|
||
self.log.debug(f"Unsupported thumbnail type {type(thumb_size)}")
|
||
thumb_loc = None
|
||
thumb_size = None
|
||
if thumb_loc:
|
||
try:
|
||
file = await util.transfer_thumbnail_to_matrix(
|
||
client,
|
||
intent,
|
||
thumb_loc,
|
||
video=None,
|
||
mime_type=document.mime_type,
|
||
encrypt=self.portal.encrypted,
|
||
async_upload=self.config["homeserver.async_media"],
|
||
)
|
||
except Exception:
|
||
self.log.exception("Failed to transfer thumbnail")
|
||
if not file:
|
||
name = attrs.name or ""
|
||
caption = f"\n{evt.message}" if evt.message else ""
|
||
return ConvertedMessage(
|
||
content=TextMessageEventContent(
|
||
msgtype=MessageType.NOTICE,
|
||
body=f"{name}{caption}\n{external_link_content}",
|
||
)
|
||
)
|
||
|
||
info, name = _parse_document_meta(evt, file, attrs, thumb_size)
|
||
|
||
event_type = EventType.ROOM_MESSAGE
|
||
if not name:
|
||
ext = sane_mimetypes.guess_extension(file.mime_type) or ""
|
||
name = "unnamed_file" + ext
|
||
|
||
content = MediaMessageEventContent(
|
||
body=name,
|
||
info=info,
|
||
msgtype={
|
||
"video/": MessageType.VIDEO,
|
||
"audio/": MessageType.AUDIO,
|
||
"image/": MessageType.IMAGE,
|
||
}.get(info.mimetype[:6], MessageType.FILE),
|
||
)
|
||
if file.decryption_info:
|
||
content.file = file.decryption_info
|
||
else:
|
||
content.url = file.mxc
|
||
|
||
caption_content = (
|
||
await formatter.telegram_to_matrix(evt, source, client) if evt.message else None
|
||
)
|
||
caption_content = f"{caption_content}\n{external_link_content}"
|
||
|
||
return ConvertedMessage(
|
||
type=event_type,
|
||
content=content,
|
||
caption=caption_content,
|
||
disappear_seconds=self._adjust_ttl(evt.media.ttl_seconds),
|
||
)
|
||
|
||
async def _convert_document(
|
||
self,
|
||
source: au.AbstractUser,
|
||
intent: IntentAPI,
|
||
evt: Message,
|
||
client: MautrixTelegramClient,
|
||
) -> ConvertedMessage | None:
|
||
document = evt.media.document
|
||
|
||
if not document:
|
||
return None
|
||
|
||
attrs = _parse_document_attributes(document.attributes)
|
||
|
||
if document.size > self.matrix.media_config.upload_size:
|
||
name = attrs.name or ""
|
||
caption = f"\n{evt.message}" if evt.message else ""
|
||
return ConvertedMessage(
|
||
content=TextMessageEventContent(
|
||
msgtype=MessageType.NOTICE, body=f"Too large file {name}{caption}"
|
||
)
|
||
)
|
||
|
||
thumb_loc, thumb_size = self.get_largest_photo_size(document)
|
||
if thumb_size and not isinstance(thumb_size, (PhotoSize, PhotoCachedSize)):
|
||
self.log.debug(f"Unsupported thumbnail type {type(thumb_size)}")
|
||
thumb_loc = None
|
||
thumb_size = None
|
||
parallel_id = source.tgid if self.config["bridge.parallel_file_transfer"] else None
|
||
tgs_convert = self.config["bridge.animated_sticker"]
|
||
file = await util.transfer_file_to_matrix(
|
||
client,
|
||
intent,
|
||
document,
|
||
thumb_loc,
|
||
is_sticker=attrs.is_sticker,
|
||
tgs_convert=tgs_convert,
|
||
webm_convert=tgs_convert["target"] if tgs_convert["convert_from_webm"] else None,
|
||
filename=attrs.name,
|
||
parallel_id=parallel_id,
|
||
encrypt=self.portal.encrypted,
|
||
async_upload=self.config["homeserver.async_media"],
|
||
)
|
||
if not file:
|
||
return None
|
||
|
||
info, name = _parse_document_meta(evt, file, attrs, thumb_size)
|
||
|
||
event_type = EventType.ROOM_MESSAGE
|
||
# Elements only support images as stickers, so send animated webm stickers as m.video
|
||
if attrs.is_sticker and file.mime_type.startswith("image/"):
|
||
event_type = EventType.STICKER
|
||
# Tell clients to render the stickers as 256x256 if they're bigger
|
||
if info.width > 256 or info.height > 256:
|
||
if info.width > info.height:
|
||
info.height = int(info.height / (info.width / 256))
|
||
info.width = 256
|
||
else:
|
||
info.width = int(info.width / (info.height / 256))
|
||
info.height = 256
|
||
if info.thumbnail_info:
|
||
info.thumbnail_info.width = info.width
|
||
info.thumbnail_info.height = info.height
|
||
if attrs.is_gif or (attrs.is_sticker and info.mimetype == "video/webm"):
|
||
if attrs.is_gif:
|
||
info["fi.mau.telegram.gif"] = True
|
||
else:
|
||
info["fi.mau.telegram.animated_sticker"] = True
|
||
info["fi.mau.gif"] = True
|
||
info["fi.mau.loop"] = True
|
||
info["fi.mau.autoplay"] = True
|
||
info["fi.mau.hide_controls"] = True
|
||
info["fi.mau.no_audio"] = True
|
||
if evt.media.spoiler:
|
||
info["fi.mau.telegram.spoiler"] = True
|
||
if not name:
|
||
ext = sane_mimetypes.guess_extension(file.mime_type) or ""
|
||
name = "unnamed_file" + ext
|
||
|
||
content = MediaMessageEventContent(
|
||
body=name,
|
||
info=info,
|
||
msgtype={
|
||
"video/": MessageType.VIDEO,
|
||
"audio/": MessageType.AUDIO,
|
||
"image/": MessageType.IMAGE,
|
||
}.get(info.mimetype[:6], MessageType.FILE),
|
||
)
|
||
if event_type == EventType.STICKER:
|
||
content.msgtype = None
|
||
if attrs.is_audio:
|
||
content["org.matrix.msc1767.audio"] = {"duration": attrs.duration * 1000}
|
||
if attrs.waveform:
|
||
content["org.matrix.msc1767.audio"]["waveform"] = [x << 5 for x in attrs.waveform]
|
||
if attrs.is_voice:
|
||
content["org.matrix.msc3245.voice"] = {}
|
||
if file.decryption_info:
|
||
content.file = file.decryption_info
|
||
else:
|
||
content.url = file.mxc
|
||
|
||
caption_content = (
|
||
await formatter.telegram_to_matrix(evt, source, client) if evt.message else None
|
||
)
|
||
|
||
return ConvertedMessage(
|
||
type=event_type,
|
||
content=content,
|
||
caption=caption_content,
|
||
disappear_seconds=self._adjust_ttl(evt.media.ttl_seconds),
|
||
)
|
||
|
||
@staticmethod
|
||
async def _convert_location(evt: Message, **_) -> ConvertedMessage:
|
||
long = evt.media.geo.long
|
||
lat = evt.media.geo.lat
|
||
long_char = "E" if long > 0 else "W"
|
||
lat_char = "N" if lat > 0 else "S"
|
||
geo = f"{round(lat, 6)},{round(long, 6)}"
|
||
|
||
body = f"{round(abs(lat), 4)}° {lat_char}, {round(abs(long), 4)}° {long_char}"
|
||
url = f"https://maps.google.com/?q={geo}"
|
||
|
||
if isinstance(evt.media, MessageMediaGeoLive):
|
||
note = "Live Location (see your Telegram client for live updates)"
|
||
elif isinstance(evt.media, MessageMediaVenue):
|
||
note = evt.media.title
|
||
else:
|
||
note = "Location"
|
||
|
||
content = LocationMessageEventContent(
|
||
msgtype=MessageType.LOCATION,
|
||
geo_uri=f"geo:{geo}",
|
||
body=f"{note}: {body}\n{url}",
|
||
)
|
||
content["format"] = str(Format.HTML)
|
||
content["formatted_body"] = f"{note}: <a href='{url}'>{body}</a>"
|
||
content["org.matrix.msc3488.location"] = {
|
||
"uri": content.geo_uri,
|
||
"description": note,
|
||
}
|
||
return ConvertedMessage(content=content)
|
||
|
||
@staticmethod
|
||
async def _convert_unsupported(
|
||
source: au.AbstractUser, evt: Message, client: MautrixTelegramClient, **_
|
||
) -> ConvertedMessage:
|
||
override_text = (
|
||
"This message is not supported on your version of Mautrix-Telegram. "
|
||
"Please check https://github.com/mautrix/telegram or ask your "
|
||
"bridge administrator about possible updates."
|
||
)
|
||
content = await formatter.telegram_to_matrix(
|
||
evt, source, client, override_text=override_text
|
||
)
|
||
content.msgtype = MessageType.NOTICE
|
||
content["fi.mau.telegram.unsupported"] = True
|
||
return ConvertedMessage(content=content)
|
||
|
||
async def _convert_poll(self, source: au.AbstractUser, evt: Message, **_) -> ConvertedMessage:
|
||
poll: Poll = evt.media.poll
|
||
poll_id = self._encode_msgid(source, evt)
|
||
|
||
_n = 0
|
||
|
||
def n() -> int:
|
||
nonlocal _n
|
||
_n += 1
|
||
return _n
|
||
|
||
text_answers = "\n".join(f"{n()}. {answer.text}" for answer in poll.answers)
|
||
html_answers = "\n".join(f"<li>{answer.text}</li>" for answer in poll.answers)
|
||
vote_command = f"{self.command_prefix} vote {poll_id}"
|
||
content = TextMessageEventContent(
|
||
msgtype=MessageType.TEXT,
|
||
format=Format.HTML,
|
||
body=(
|
||
f"Poll: {poll.question}\n{text_answers}\n"
|
||
f"Vote with {vote_command} <choice number>"
|
||
),
|
||
formatted_body=(
|
||
f"<strong>Poll</strong>: {poll.question}<br/>\n"
|
||
f"<ol>{html_answers}</ol>\n"
|
||
f"Vote with <code>{vote_command} <choice number></code>"
|
||
),
|
||
)
|
||
|
||
return ConvertedMessage(content=content)
|
||
|
||
@staticmethod
|
||
async def _convert_dice(evt: Message, **_) -> ConvertedMessage:
|
||
roll: MessageMediaDice = evt.media
|
||
emoji_text = {
|
||
"\U0001F3AF": " Dart throw",
|
||
"\U0001F3B2": " Dice roll",
|
||
"\U0001F3C0": " Basketball throw",
|
||
"\U0001F3B0": " Slot machine",
|
||
"\U0001F3B3": " Bowling",
|
||
"\u26BD": " Football kick",
|
||
}
|
||
text = f"{roll.emoticon}{emoji_text.get(roll.emoticon, '')} result: {_format_dice(roll)}"
|
||
content = TextMessageEventContent(
|
||
msgtype=MessageType.TEXT,
|
||
format=Format.HTML,
|
||
body=text,
|
||
formatted_body=f"<h4>{text}</h4>",
|
||
)
|
||
content["fi.mau.telegram.dice"] = {"emoticon": roll.emoticon, "value": roll.value}
|
||
return ConvertedMessage(content=content)
|
||
|
||
async def _convert_game(
|
||
self, source: au.AbstractUser, evt: Message, client: MautrixTelegramClient, **_
|
||
) -> ConvertedMessage:
|
||
game: Game = evt.media.game
|
||
play_id = self._encode_msgid(source, evt)
|
||
command = f"{self.command_prefix} play {play_id}"
|
||
override_text = f"Run {command} in your bridge management room to play {game.title}"
|
||
override_entities = [
|
||
MessageEntityPre(offset=len("Run "), length=len(command), language="")
|
||
]
|
||
|
||
content = await formatter.telegram_to_matrix(
|
||
evt, source, client, override_text=override_text, override_entities=override_entities
|
||
)
|
||
content.msgtype = MessageType.NOTICE
|
||
content["fi.mau.telegram.game"] = play_id
|
||
|
||
return ConvertedMessage(content=content)
|
||
|
||
@staticmethod
|
||
async def _convert_contact(
|
||
source: au.AbstractUser, evt: Message, client: MautrixTelegramClient, **_
|
||
) -> ConvertedMessage:
|
||
contact: MessageMediaContact = evt.media
|
||
name = " ".join(x for x in [contact.first_name, contact.last_name] if x)
|
||
formatted_phone = f"+{contact.phone_number}"
|
||
if phonenumbers is not None:
|
||
try:
|
||
parsed = phonenumbers.parse(formatted_phone)
|
||
fmt = phonenumbers.PhoneNumberFormat.INTERNATIONAL
|
||
formatted_phone = phonenumbers.format_number(parsed, fmt)
|
||
except phonenumbers.NumberParseException:
|
||
pass
|
||
content = TextMessageEventContent(
|
||
msgtype=MessageType.TEXT,
|
||
body=f"Shared contact info for {name}: {formatted_phone}",
|
||
)
|
||
content["fi.mau.telegram.contact"] = {
|
||
"user_id": contact.user_id,
|
||
"first_name": contact.first_name,
|
||
"last_name": contact.last_name,
|
||
"phone_number": contact.phone_number,
|
||
"vcard": contact.vcard,
|
||
}
|
||
|
||
puppet = await pu.Puppet.get_by_tgid(TelegramID(contact.user_id))
|
||
if not puppet.displayname:
|
||
try:
|
||
entity = await client.get_entity(PeerUser(contact.user_id))
|
||
await puppet.update_info(source, entity, client_override=client)
|
||
except Exception as e:
|
||
source.log.warning(f"Failed to sync puppet info of received contact: {e}")
|
||
else:
|
||
content.format = Format.HTML
|
||
content.formatted_body = (
|
||
f"Shared contact info for "
|
||
f"<a href='https://matrix.to/#/{puppet.mxid}'>{html.escape(name)}</a>: "
|
||
f"{html.escape(formatted_phone)}"
|
||
)
|
||
return ConvertedMessage(content=content)
|
||
|
||
@staticmethod
|
||
async def _convert_story(
|
||
source: au.AbstractUser, evt: Message, client: MautrixTelegramClient, **_
|
||
) -> ConvertedMessage:
|
||
content = await formatter.telegram_to_matrix(
|
||
evt, source, client, override_text="Stories are not yet supported"
|
||
)
|
||
content.msgtype = MessageType.NOTICE
|
||
content["fi.mau.telegram.unsupported"] = True
|
||
return ConvertedMessage(content=content)
|
||
|
||
@staticmethod
|
||
async def _convert_invoice(
|
||
source: au.AbstractUser, evt: Message, client: MautrixTelegramClient, **_
|
||
) -> ConvertedMessage:
|
||
content = await formatter.telegram_to_matrix(
|
||
evt, source, client, override_text="Invoices are not yet supported"
|
||
)
|
||
content.msgtype = MessageType.NOTICE
|
||
content["fi.mau.telegram.unsupported"] = True
|
||
return ConvertedMessage(content=content)
|
||
|
||
|
||
def _parse_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs:
|
||
name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0
|
||
is_gif, is_audio, is_voice, duration, waveform = False, False, False, 0, bytes()
|
||
sticker_pack_ref = None
|
||
for attr in attributes:
|
||
if isinstance(attr, DocumentAttributeFilename):
|
||
name = name or attr.file_name
|
||
mime_type, _ = mimetypes.guess_type(attr.file_name)
|
||
elif isinstance(attr, DocumentAttributeSticker):
|
||
is_sticker = True
|
||
sticker_alt = attr.alt
|
||
if isinstance(attr.stickerset, InputStickerSetID):
|
||
sticker_pack_ref = {
|
||
"id": str(attr.stickerset.id),
|
||
"access_hash": str(attr.stickerset.access_hash),
|
||
}
|
||
elif isinstance(attr.stickerset, InputStickerSetShortName):
|
||
sticker_pack_ref = {"short_name": attr.stickerset.short_name}
|
||
elif isinstance(attr, DocumentAttributeAnimated):
|
||
is_gif = True
|
||
elif isinstance(attr, DocumentAttributeVideo):
|
||
width, height = attr.w, attr.h
|
||
elif isinstance(attr, DocumentAttributeImageSize):
|
||
width, height = attr.w, attr.h
|
||
elif isinstance(attr, DocumentAttributeAudio):
|
||
is_audio = True
|
||
is_voice = attr.voice or False
|
||
duration = attr.duration
|
||
waveform = decode_waveform(attr.waveform) if attr.waveform else b""
|
||
|
||
return DocAttrs(
|
||
name=name,
|
||
mime_type=mime_type,
|
||
is_sticker=is_sticker,
|
||
sticker_alt=sticker_alt,
|
||
sticker_pack_ref=sticker_pack_ref,
|
||
width=width,
|
||
height=height,
|
||
is_gif=is_gif,
|
||
is_audio=is_audio,
|
||
is_voice=is_voice,
|
||
duration=duration,
|
||
waveform=waveform,
|
||
)
|
||
|
||
|
||
def _parse_document_meta(
|
||
evt: Message, file: DBTelegramFile, attrs: DocAttrs, thumb_size: TypePhotoSize
|
||
) -> tuple[ImageInfo, str]:
|
||
document = evt.media.document
|
||
name = attrs.name
|
||
if attrs.is_sticker:
|
||
alt = attrs.sticker_alt
|
||
if len(alt) > 0:
|
||
try:
|
||
name = f"{alt} ({unicodedata.name(alt[0]).lower()})"
|
||
except ValueError:
|
||
name = alt
|
||
|
||
generic_types = ("text/plain", "application/octet-stream")
|
||
if file.mime_type in generic_types and document.mime_type not in generic_types:
|
||
mime_type = document.mime_type or file.mime_type
|
||
elif file.mime_type == "application/ogg":
|
||
mime_type = "audio/ogg"
|
||
else:
|
||
mime_type = file.mime_type or document.mime_type
|
||
info = ImageInfo(size=file.size, mimetype=mime_type)
|
||
|
||
if attrs.is_sticker:
|
||
info["fi.mau.telegram.sticker"] = {
|
||
"alt": attrs.sticker_alt,
|
||
"id": str(document.id),
|
||
"pack": attrs.sticker_pack_ref,
|
||
}
|
||
|
||
if attrs.mime_type and not file.was_converted:
|
||
file.mime_type = attrs.mime_type or file.mime_type
|
||
if file.width and file.height:
|
||
info.width, info.height = file.width, file.height
|
||
elif attrs.width and attrs.height:
|
||
info.width, info.height = attrs.width, attrs.height
|
||
|
||
if file.thumbnail:
|
||
if file.thumbnail.decryption_info:
|
||
info.thumbnail_file = file.thumbnail.decryption_info
|
||
else:
|
||
info.thumbnail_url = file.thumbnail.mxc
|
||
info.thumbnail_info = ThumbnailInfo(
|
||
mimetype=file.thumbnail.mime_type,
|
||
height=file.thumbnail.height or thumb_size.h,
|
||
width=file.thumbnail.width or thumb_size.w,
|
||
size=file.thumbnail.size,
|
||
)
|
||
elif attrs.is_sticker:
|
||
if not info.width or not info.height:
|
||
info.width = 256
|
||
info.height = 256
|
||
|
||
# This is a hack for bad clients like Element iOS that require a thumbnail
|
||
info.thumbnail_info = ImageInfo.deserialize(info.serialize())
|
||
if file.decryption_info:
|
||
info.thumbnail_file = file.decryption_info
|
||
else:
|
||
info.thumbnail_url = file.mxc
|
||
|
||
return info, name
|
||
|
||
|
||
def _format_dice(roll: MessageMediaDice) -> str:
|
||
if roll.emoticon == "\U0001F3B0":
|
||
emojis = {
|
||
0: "\U0001F36B", # "🍫",
|
||
1: "\U0001F352", # "🍒",
|
||
2: "\U0001F34B", # "🍋",
|
||
3: "7\ufe0f\u20e3", # "7️⃣",
|
||
}
|
||
res = roll.value - 1
|
||
slot1, slot2, slot3 = emojis[res % 4], emojis[res // 4 % 4], emojis[res // 16]
|
||
return f"{slot1} {slot2} {slot3} ({roll.value})"
|
||
elif roll.emoticon == "\u26BD":
|
||
results = {
|
||
1: "miss",
|
||
2: "hit the woodwork",
|
||
3: "goal", # seems to go in through the center
|
||
4: "goal",
|
||
5: "goal 🎉", # seems to go in through the top right corner, includes confetti
|
||
}
|
||
elif roll.emoticon == "\U0001F3B3":
|
||
results = {
|
||
1: "miss",
|
||
2: "1 pin down",
|
||
3: "3 pins down, split",
|
||
4: "4 pins down, split",
|
||
5: "5 pins down",
|
||
6: "strike 🎉",
|
||
}
|
||
# elif roll.emoticon == "\U0001F3C0":
|
||
# results = {
|
||
# 2: "rolled off",
|
||
# 3: "stuck",
|
||
# }
|
||
# elif roll.emoticon == "\U0001F3AF":
|
||
# results = {
|
||
# 1: "bounced off",
|
||
# 2: "outer rim",
|
||
#
|
||
# 6: "bullseye",
|
||
# }
|
||
else:
|
||
return str(roll.value)
|
||
return f"{results[roll.value]} ({roll.value})"
|