mautrix-telegram/mautrix_telegram/formatter/from_telegram.py

438 lines
16 KiB
Python

# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from html import escape
import logging
import re
from telethon.errors import RPCError
from telethon.helpers import add_surrogate, del_surrogate
from telethon.tl.custom import Message
from telethon.tl.types import (
Channel,
InputPeerChannelFromMessage,
InputPeerUserFromMessage,
MessageEntityBlockquote,
MessageEntityBold,
MessageEntityBotCommand,
MessageEntityCashtag,
MessageEntityCode,
MessageEntityCustomEmoji,
MessageEntityEmail,
MessageEntityHashtag,
MessageEntityItalic,
MessageEntityMention,
MessageEntityMentionName,
MessageEntityPhone,
MessageEntityPre,
MessageEntitySpoiler,
MessageEntityStrike,
MessageEntityTextUrl,
MessageEntityUnderline,
MessageEntityUrl,
MessageFwdHeader,
PeerChannel,
PeerChat,
PeerUser,
SponsoredMessage,
TypeMessageEntity,
User,
)
from mautrix.types import Format, MessageType, TextMessageEventContent
from .. import abstract_user as au, portal as po, puppet as pu, user as u
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
from ..tgclient import MautrixTelegramClient
from ..types import TelegramID
from ..util.file_transfer import UnicodeCustomEmoji, transfer_custom_emojis_to_matrix
log: logging.Logger = logging.getLogger("mau.fmt.tg")
async def _get_fwd_entity(client: MautrixTelegramClient, evt: Message) -> Channel | User | None:
try:
return await client.get_entity(evt.fwd_from.from_id)
except (ValueError, RPCError) as e:
try:
input_peer = await client.get_input_entity(evt.peer_id)
if isinstance(evt.fwd_from.from_id, PeerUser):
return await client.get_entity(
InputPeerUserFromMessage(
peer=input_peer, msg_id=evt.id, user_id=evt.fwd_from.from_id.user_id
)
)
elif isinstance(evt.fwd_from.from_id, PeerChannel):
return await client.get_entity(
InputPeerChannelFromMessage(
peer=input_peer, msg_id=evt.id, channel_id=evt.fwd_from.from_id.channel_id
)
)
except (ValueError, RPCError) as e:
pass
return None
async def _add_forward_header(
client: MautrixTelegramClient, content: TextMessageEventContent, evt: Message
) -> None:
fwd_from = evt.fwd_from
fwd_from_html, fwd_from_text = None, None
if isinstance(fwd_from.from_id, PeerUser):
user = await u.User.get_by_tgid(TelegramID(fwd_from.from_id.user_id))
if user:
fwd_from_text = user.displayname or user.mxid
fwd_from_html = (
f"<a href='https://matrix.to/#/{user.mxid}'>{escape(fwd_from_text)}</a>"
)
if not fwd_from_text:
puppet = await pu.Puppet.get_by_peer(fwd_from.from_id, create=False)
if puppet and puppet.displayname:
fwd_from_text = puppet.displayname or puppet.mxid
fwd_from_html = (
f"<a href='https://matrix.to/#/{puppet.mxid}'>{escape(fwd_from_text)}</a>"
)
if not fwd_from_text:
user = await _get_fwd_entity(client, evt)
if user:
fwd_from_text, _ = pu.Puppet.get_displayname(user, False)
fwd_from_html = f"<b>{escape(fwd_from_text)}</b>"
else:
fwd_from_text = fwd_from_html = "unknown user"
elif isinstance(fwd_from.from_id, (PeerChannel, PeerChat)):
from_id = (
fwd_from.from_id.chat_id
if isinstance(fwd_from.from_id, PeerChat)
else fwd_from.from_id.channel_id
)
portal = await po.Portal.get_by_tgid(TelegramID(from_id))
if portal and portal.title:
fwd_from_text = portal.title
if portal.alias:
fwd_from_html = (
f"<a href='https://matrix.to/#/{portal.alias}'>{escape(fwd_from_text)}</a>"
)
else:
fwd_from_html = f"channel <b>{escape(fwd_from_text)}</b>"
else:
channel = await _get_fwd_entity(client, evt)
if channel:
fwd_from_text = f"channel {channel.title}"
fwd_from_html = f"channel <b>{escape(channel.title)}</b>"
else:
fwd_from_text = fwd_from_html = "unknown channel"
elif fwd_from.from_name:
fwd_from_text = fwd_from.from_name
fwd_from_html = f"<b>{escape(fwd_from.from_name)}</b>"
else:
fwd_from_text = "unknown source"
fwd_from_html = f"unknown source"
content.ensure_has_html()
content.body = "\n".join([f"> {line}" for line in content.body.split("\n")])
content.body = f"Forwarded from {fwd_from_text}:\n{content.body}"
content.formatted_body = (
f"Forwarded message from {fwd_from_html}<br/>"
f"<tg-forward><blockquote>{content.formatted_body}</blockquote></tg-forward>"
)
class ReuploadedCustomEmoji(MessageEntityCustomEmoji):
file: DBTelegramFile
def __init__(self, parent: MessageEntityCustomEmoji, file: DBTelegramFile) -> None:
super().__init__(parent.offset, parent.length, parent.document_id)
self.file = file
async def _convert_custom_emoji(
source: au.AbstractUser,
entities: list[TypeMessageEntity],
client: MautrixTelegramClient | None = None,
) -> None:
emoji_ids = [
entity.document_id for entity in entities if isinstance(entity, MessageEntityCustomEmoji)
]
custom_emojis = await transfer_custom_emojis_to_matrix(source, emoji_ids, client=client)
if len(custom_emojis) > 0:
for i, entity in enumerate(entities):
if isinstance(entity, MessageEntityCustomEmoji):
entities[i] = ReuploadedCustomEmoji(entity, custom_emojis[entity.document_id])
async def telegram_text_to_matrix_html(
source: au.AbstractUser,
text: str,
entities: list[TypeMessageEntity],
client: MautrixTelegramClient | None = None,
) -> str:
if not entities:
return escape(text).replace("\n", "<br/>")
await _convert_custom_emoji(source, entities, client=client)
text = add_surrogate(text)
html = await _telegram_entities_to_matrix_catch(text, entities)
html = del_surrogate(html)
return html
async def telegram_to_matrix(
evt: Message | SponsoredMessage,
source: au.AbstractUser,
client: MautrixTelegramClient | None = None,
override_text: str = None,
override_entities: list[TypeMessageEntity] = None,
require_html: bool = False,
) -> TextMessageEventContent:
if not client:
client = source.client
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
body=override_text or evt.message,
)
entities = override_entities or evt.entities
if entities:
content.format = Format.HTML
content.formatted_body = await telegram_text_to_matrix_html(
source, content.body, entities, client=client
)
if require_html:
content.ensure_has_html()
if getattr(evt, "fwd_from", None):
await _add_forward_header(client, content, evt)
if isinstance(evt, Message) and evt.post and evt.post_author:
content.ensure_has_html()
content.body += f"\n- {evt.post_author}"
content.formatted_body += f"<br/><i>- <u>{evt.post_author}</u></i>"
return content
async def _telegram_entities_to_matrix_catch(text: str, entities: list[TypeMessageEntity]) -> str:
try:
return await _telegram_entities_to_matrix(text, entities)
except Exception:
log.exception(
"Failed to convert Telegram format:\nmessage=%s\nentities=%s", text, entities
)
return "[failed conversion in _telegram_entities_to_matrix]"
def within_surrogate(text, index):
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
return (
1 < index < len(text) # in bounds
and "\ud800" <= text[index - 1] <= "\udbff" # current is low surrogate
and "\udc00" <= text[index] <= "\udfff" # previous is high surrogate
)
async def _telegram_entities_to_matrix(
text: str,
entities: list[TypeMessageEntity | ReuploadedCustomEmoji],
offset: int = 0,
length: int = None,
in_codeblock: bool = False,
) -> str:
def text_to_html(
val: str, _in_codeblock: bool = in_codeblock, escape_html: bool = True
) -> str:
if escape_html:
val = escape(val)
if not _in_codeblock:
val = val.replace("\n", "<br/>")
return val
if not entities:
return text_to_html(text)
if length is None:
length = len(text)
html = []
last_offset = 0
for i, entity in enumerate(entities):
if entity.offset >= offset + length:
break
relative_offset = entity.offset - offset
if relative_offset > last_offset:
html.append(text_to_html(text[last_offset:relative_offset]))
elif relative_offset < last_offset:
continue
while within_surrogate(text, relative_offset):
relative_offset += 1
while within_surrogate(text, relative_offset + entity.length):
entity.length += 1
skip_entity = False
is_code_entity = isinstance(entity, (MessageEntityCode, MessageEntityPre))
entity_text = await _telegram_entities_to_matrix(
text=text[relative_offset : relative_offset + entity.length],
entities=entities[i + 1 :],
offset=entity.offset,
length=entity.length,
in_codeblock=is_code_entity,
)
entity_text = text_to_html(entity_text, is_code_entity, escape_html=False)
entity_type = type(entity)
if entity_type == MessageEntityBold:
html.append(f"<strong>{entity_text}</strong>")
elif entity_type == MessageEntityItalic:
html.append(f"<em>{entity_text}</em>")
elif entity_type == MessageEntityUnderline:
html.append(f"<u>{entity_text}</u>")
elif entity_type == MessageEntityStrike:
html.append(f"<del>{entity_text}</del>")
elif entity_type == MessageEntityBlockquote:
html.append(f"<blockquote>{entity_text}</blockquote>")
elif entity_type == MessageEntityCode:
html.append(
f"<pre><code>{entity_text}</code></pre>"
if "\n" in entity_text
else f"<code>{entity_text}</code>"
)
elif entity_type == MessageEntityPre:
skip_entity = _parse_pre(html, entity_text, entity.language)
elif entity_type == MessageEntityMention:
skip_entity = await _parse_mention(html, entity_text)
elif entity_type == MessageEntityMentionName:
skip_entity = await _parse_name_mention(html, entity_text, TelegramID(entity.user_id))
elif entity_type == MessageEntityEmail:
html.append(f"<a href='mailto:{entity_text}'>{entity_text}</a>")
elif entity_type in (MessageEntityTextUrl, MessageEntityUrl):
await _parse_url(
html, entity_text, entity.url if entity_type == MessageEntityTextUrl else None
)
elif entity_type == MessageEntityCustomEmoji:
html.append(entity_text)
elif entity_type == ReuploadedCustomEmoji:
if isinstance(entity.file, UnicodeCustomEmoji):
html.append(entity.file.emoji)
else:
html.append(
f"<img data-mx-emoticon data-mau-animated-emoji"
f' src="{escape(entity.file.mxc)}" height="32" width="32"'
f' alt="{entity_text}" title="{entity_text}"/>'
)
elif entity_type in (
MessageEntityBotCommand,
MessageEntityHashtag,
MessageEntityCashtag,
MessageEntityPhone,
):
html.append(f"<font color='#3771bb'>{entity_text}</font>")
elif entity_type == MessageEntitySpoiler:
html.append(f"<span data-mx-spoiler>{entity_text}</span>")
else:
skip_entity = True
last_offset = relative_offset + (0 if skip_entity else entity.length)
html.append(text_to_html(text[last_offset:]))
html_string = "".join(html)
# Remove redundant <br>'s after block tags
html_string = html_string.replace("</blockquote><br/>", "</blockquote>")
html_string = html_string.replace("</pre><br/>", "</pre>")
return html_string
def _parse_pre(html: list[str], entity_text: str, language: str) -> bool:
if language:
html.append(f"<pre><code class='language-{language}'>{entity_text}</code></pre>")
else:
html.append(f"<pre><code>{entity_text}</code></pre>")
return False
async def _parse_mention(html: list[str], entity_text: str) -> bool:
username = entity_text[1:]
mxid = None
portal = None
# This is a bit complicated because public channels have both Puppet and Portal instances.
# Basically the currently intended output is:
# User/bot mention (bridge user) -> real user mention
# User/bot mention (normal Telegram user) -> ghost user mention
# Public channel with existing portal -> room mention
# Public channel without portal -> ghost user mention
# Other chat -> room mention
user = await u.User.find_by_username(username) or await pu.Puppet.find_by_username(username)
if user:
if isinstance(user, pu.Puppet) and user.is_channel:
portal = await po.Portal.get_by_tgid(user.tgid)
mxid = user.mxid
else:
portal = await po.Portal.find_by_username(username)
if portal and (portal.mxid or not user):
mxid = portal.alias or portal.mxid
if mxid:
html.append(f"<a href='https://matrix.to/#/{mxid}'>{entity_text}</a>")
else:
return True
return False
async def _parse_name_mention(html: list[str], entity_text: str, user_id: TelegramID) -> bool:
user = await u.User.get_by_tgid(user_id)
if user:
mxid = user.mxid
else:
puppet = await pu.Puppet.get_by_tgid(user_id, create=False)
mxid = puppet.mxid if puppet else None
if mxid:
html.append(f"<a href='https://matrix.to/#/{mxid}'>{entity_text}</a>")
else:
return True
return False
message_link_regex = re.compile(
r"https?://t(?:elegram)?\.(?:me|dog)"
# /username or /c/id
r"/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})"
# /messageid
r"/([0-9]{1,20})"
)
async def _parse_url(html: list[str], entity_text: str, url: str) -> None:
url = escape(url) if url else entity_text
if not url.startswith(("https://", "http://", "ftp://", "magnet://")):
url = "http://" + url
message_link_match = message_link_regex.match(url)
if message_link_match:
group, msgid_str = message_link_match.groups()
msgid = int(msgid_str)
if group.lower().startswith("c/"):
portal = await po.Portal.get_by_tgid(TelegramID(int(group[2:])))
else:
portal = await po.Portal.find_by_username(group)
if portal:
message = await DBMessage.get_one_by_tgid(TelegramID(msgid), portal.tgid)
if message:
url = f"https://matrix.to/#/{portal.mxid}/{message.mxid}"
html.append(f"<a href='{url}'>{entity_text}</a>")