4080 lines
159 KiB
Python
4080 lines
159 KiB
Python
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
|
# Copyright (C) 2023 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
AsyncGenerator,
|
|
Awaitable,
|
|
Callable,
|
|
List,
|
|
Literal,
|
|
NamedTuple,
|
|
Union,
|
|
cast,
|
|
)
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from html import escape as escape_html
|
|
from sqlite3 import IntegrityError
|
|
from string import Template
|
|
import asyncio
|
|
import base64
|
|
import itertools
|
|
import random
|
|
import time
|
|
|
|
from asyncpg import UniqueViolationError
|
|
from telethon.errors import (
|
|
ChatAdminRequiredError,
|
|
ChatNotModifiedError,
|
|
ChatRestrictedError,
|
|
ChatWriteForbiddenError,
|
|
EntitiesTooLongError,
|
|
EntityBoundsInvalidError,
|
|
EntityMentionUserInvalidError,
|
|
InputUserDeactivatedError,
|
|
MessageEmptyError,
|
|
MessageIdInvalidError,
|
|
MessageNotModifiedError,
|
|
MessageTooLongError,
|
|
PhotoExtInvalidError,
|
|
PhotoInvalidDimensionsError,
|
|
PhotoSaveFileInvalidError,
|
|
ReactionInvalidError,
|
|
RPCError,
|
|
SlowModeWaitError,
|
|
UserBannedInChannelError,
|
|
UserIsBlockedError,
|
|
YouBlockedUserError,
|
|
)
|
|
from telethon.tl.custom import Dialog
|
|
from telethon.tl.functions.channels import (
|
|
CreateChannelRequest,
|
|
EditPhotoRequest,
|
|
EditTitleRequest,
|
|
InviteToChannelRequest,
|
|
JoinChannelRequest,
|
|
UpdateUsernameRequest,
|
|
ViewSponsoredMessageRequest,
|
|
)
|
|
from telethon.tl.functions.messages import (
|
|
AddChatUserRequest,
|
|
CreateChatRequest,
|
|
EditChatAboutRequest,
|
|
EditChatPhotoRequest,
|
|
EditChatTitleRequest,
|
|
ExportChatInviteRequest,
|
|
GetMessageReactionsListRequest,
|
|
GetMessagesReactionsRequest,
|
|
GetPeerDialogsRequest,
|
|
MigrateChatRequest,
|
|
SendReactionRequest,
|
|
SetTypingRequest,
|
|
UnpinAllMessagesRequest,
|
|
UpdatePinnedMessageRequest,
|
|
)
|
|
from telethon.tl.patched import Message, MessageService
|
|
from telethon.tl.types import (
|
|
Channel,
|
|
ChannelFull,
|
|
Chat,
|
|
ChatBannedRights,
|
|
ChatEmpty,
|
|
ChatFull,
|
|
ChatPhoto,
|
|
ChatPhotoEmpty,
|
|
DocumentAttributeAudio,
|
|
DocumentAttributeFilename,
|
|
DocumentAttributeImageSize,
|
|
DocumentAttributeSticker,
|
|
DocumentAttributeVideo,
|
|
GeoPoint,
|
|
InputChannel,
|
|
InputChatUploadedPhoto,
|
|
InputDialogPeer,
|
|
InputMediaUploadedDocument,
|
|
InputMediaUploadedPhoto,
|
|
InputPeerChannel,
|
|
InputPeerChat,
|
|
InputPeerPhotoFileLocation,
|
|
InputPeerUser,
|
|
InputStickerSetEmpty,
|
|
InputUser,
|
|
MessageActionBoostApply,
|
|
MessageActionChannelCreate,
|
|
MessageActionChatAddUser,
|
|
MessageActionChatCreate,
|
|
MessageActionChatDeletePhoto,
|
|
MessageActionChatDeleteUser,
|
|
MessageActionChatEditPhoto,
|
|
MessageActionChatEditTitle,
|
|
MessageActionChatJoinedByLink,
|
|
MessageActionChatJoinedByRequest,
|
|
MessageActionChatMigrateTo,
|
|
MessageActionContactSignUp,
|
|
MessageActionGameScore,
|
|
MessageActionGiftPremium,
|
|
MessageActionGroupCall,
|
|
MessageActionPhoneCall,
|
|
MessageMediaGame,
|
|
MessageMediaGeo,
|
|
MessagePeerReaction,
|
|
MessageReactions,
|
|
PeerChannel,
|
|
PeerChat,
|
|
PeerUser,
|
|
PhoneCallDiscardReasonBusy,
|
|
PhoneCallDiscardReasonDisconnect,
|
|
PhoneCallDiscardReasonMissed,
|
|
PhoneCallRequested,
|
|
Photo,
|
|
PhotoEmpty,
|
|
ReactionCount,
|
|
ReactionCustomEmoji,
|
|
ReactionEmoji,
|
|
SendMessageCancelAction,
|
|
SendMessageTypingAction,
|
|
SponsoredMessage,
|
|
TypeChannelParticipant,
|
|
TypeChat,
|
|
TypeChatParticipant,
|
|
TypeInputChannel,
|
|
TypeInputPeer,
|
|
TypeMessage,
|
|
TypeMessageAction,
|
|
TypePeer,
|
|
TypeReaction,
|
|
TypeUser,
|
|
TypeUserFull,
|
|
TypeUserProfilePhoto,
|
|
UpdateBotMessageReaction,
|
|
UpdateChannelUserTyping,
|
|
UpdateChatUserTyping,
|
|
UpdateMessageReactions,
|
|
UpdateNewMessage,
|
|
UpdatePhoneCall,
|
|
UpdateUserTyping,
|
|
User,
|
|
UserEmpty,
|
|
UserFull,
|
|
UserProfilePhoto,
|
|
UserProfilePhotoEmpty,
|
|
)
|
|
from telethon.tl.types.messages import PeerDialogs
|
|
from telethon.utils import encode_waveform, get_peer_id
|
|
import attr
|
|
|
|
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
|
|
from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock
|
|
from mautrix.errors import IntentError, MatrixRequestError, MForbidden
|
|
from mautrix.types import (
|
|
BatchID,
|
|
BatchSendEvent,
|
|
BatchSendStateEvent,
|
|
BeeperMessageStatusEventContent,
|
|
ContentURI,
|
|
EventID,
|
|
EventType,
|
|
Format,
|
|
ImageInfo,
|
|
JoinRule,
|
|
LocationMessageEventContent,
|
|
MediaMessageEventContent,
|
|
Membership,
|
|
MemberStateEventContent,
|
|
MessageEventContent,
|
|
MessageStatus,
|
|
MessageStatusReason,
|
|
MessageType,
|
|
PowerLevelStateEventContent,
|
|
RelatesTo,
|
|
RelationType,
|
|
RoomAlias,
|
|
RoomAvatarStateEventContent,
|
|
RoomCreatePreset,
|
|
RoomID,
|
|
RoomNameStateEventContent,
|
|
RoomTopicStateEventContent,
|
|
StateEventContent,
|
|
TextMessageEventContent,
|
|
UserID,
|
|
VideoInfo,
|
|
)
|
|
from mautrix.util import background_task, magic, markdown, variation_selector
|
|
from mautrix.util.format_duration import format_duration
|
|
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
|
|
from mautrix.util.simple_lock import SimpleLock
|
|
from mautrix.util.simple_template import SimpleTemplate
|
|
|
|
from . import (
|
|
abstract_user as au,
|
|
formatter,
|
|
matrix as m,
|
|
portal_util as putil,
|
|
puppet as p,
|
|
user as u,
|
|
util,
|
|
)
|
|
from .config import Config
|
|
from .db import (
|
|
Backfill,
|
|
BackfillType,
|
|
DisappearingMessage,
|
|
Message as DBMessage,
|
|
Portal as DBPortal,
|
|
Reaction as DBReaction,
|
|
TelegramFile as DBTelegramFile,
|
|
)
|
|
from .tgclient import MautrixTelegramClient
|
|
from .types import TelegramID
|
|
from .util import sane_mimetypes
|
|
|
|
try:
|
|
from mautrix.crypto.attachments import decrypt_attachment
|
|
except ImportError:
|
|
decrypt_attachment = None
|
|
|
|
if TYPE_CHECKING:
|
|
from .__main__ import TelegramBridge
|
|
from .bot import Bot
|
|
|
|
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
|
|
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
|
|
DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)
|
|
|
|
InviteList = Union[UserID, List[UserID]]
|
|
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
|
|
TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
|
|
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]
|
|
|
|
REACTION_POLL_MIN_INTERVAL = 20
|
|
|
|
|
|
class BridgingError(Exception):
|
|
pass
|
|
|
|
|
|
class IgnoredMessageError(Exception):
|
|
pass
|
|
|
|
|
|
class WrappedReaction(NamedTuple):
|
|
reaction: ReactionEmoji | ReactionCustomEmoji
|
|
date: datetime | None
|
|
|
|
|
|
class Portal(DBPortal, BasePortal):
|
|
bot: "Bot"
|
|
config: Config
|
|
matrix: m.MatrixHandler
|
|
disappearing_msg_class = DisappearingMessage
|
|
|
|
# Instance cache
|
|
by_mxid: dict[RoomID, Portal] = {}
|
|
by_tgid: dict[tuple[TelegramID, TelegramID], Portal] = {}
|
|
|
|
# Config cache
|
|
filter_mode: str
|
|
filter_list: list[int]
|
|
filter_users: bool | None
|
|
|
|
max_initial_member_sync: int
|
|
sync_channel_members: bool
|
|
sync_matrix_state: bool
|
|
public_portals: bool
|
|
private_chat_portal_meta: Literal["default", "always", "never"]
|
|
|
|
alias_template: SimpleTemplate[str]
|
|
hs_domain: str
|
|
|
|
# Instance variables
|
|
deleted: bool
|
|
|
|
backfill_lock: SimpleLock
|
|
backfill_method_lock: asyncio.Lock
|
|
backfill_enable: bool
|
|
|
|
alias: RoomAlias | None
|
|
|
|
dedup: putil.PortalDedup
|
|
send_lock: putil.PortalSendLock
|
|
reaction_lock: putil.PortalReactionLock
|
|
_pin_lock: asyncio.Lock
|
|
|
|
_main_intent: IntentAPI | None
|
|
_room_create_lock: asyncio.Lock
|
|
|
|
_sponsored_msg: SponsoredMessage | None
|
|
_sponsored_entity: User | Channel | None
|
|
_sponsored_msg_ts: float
|
|
_sponsored_msg_lock: asyncio.Lock
|
|
_sponsored_evt_id: EventID | None
|
|
_sponsored_seen: dict[UserID, bool]
|
|
_new_messages_after_sponsored: bool
|
|
|
|
_prev_reaction_poll: dict[UserID, float]
|
|
|
|
_msg_conv: putil.TelegramMessageConverter
|
|
|
|
def __init__(
|
|
self,
|
|
tgid: TelegramID,
|
|
tg_receiver: TelegramID,
|
|
peer_type: str,
|
|
megagroup: bool = False,
|
|
mxid: RoomID | None = None,
|
|
avatar_url: ContentURI | None = None,
|
|
encrypted: bool = False,
|
|
first_event_id: EventID | None = None,
|
|
next_batch_id: BatchID | None = None,
|
|
base_insertion_id: EventID | None = None,
|
|
sponsored_event_id: EventID | None = None,
|
|
sponsored_event_ts: int | None = None,
|
|
sponsored_msg_random_id: bytes | None = None,
|
|
username: str | None = None,
|
|
title: str | None = None,
|
|
about: str | None = None,
|
|
photo_id: str | None = None,
|
|
name_set: bool = False,
|
|
avatar_set: bool = False,
|
|
local_config: dict[str, Any] | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
tgid=tgid,
|
|
tg_receiver=tg_receiver,
|
|
peer_type=peer_type,
|
|
megagroup=megagroup,
|
|
mxid=mxid,
|
|
avatar_url=avatar_url,
|
|
encrypted=encrypted,
|
|
first_event_id=first_event_id,
|
|
next_batch_id=next_batch_id,
|
|
base_insertion_id=base_insertion_id,
|
|
sponsored_event_id=sponsored_event_id,
|
|
sponsored_event_ts=sponsored_event_ts,
|
|
sponsored_msg_random_id=sponsored_msg_random_id,
|
|
username=username,
|
|
title=title,
|
|
about=about,
|
|
photo_id=photo_id,
|
|
name_set=name_set,
|
|
avatar_set=avatar_set,
|
|
local_config=local_config or {},
|
|
)
|
|
BasePortal.__init__(self)
|
|
self.log = self.log.getChild(self.tgid_log if self.tgid else self.mxid)
|
|
self._main_intent = None
|
|
self.deleted = False
|
|
|
|
self.backfill_lock = SimpleLock(
|
|
"Waiting for backfilling to finish before handling %s", log=self.log
|
|
)
|
|
self.backfill_method_lock = asyncio.Lock()
|
|
|
|
self.dedup = putil.PortalDedup(self)
|
|
self.send_lock = putil.PortalSendLock()
|
|
self.reaction_lock = putil.PortalReactionLock()
|
|
self._pin_lock = asyncio.Lock()
|
|
self._room_create_lock = asyncio.Lock()
|
|
|
|
self._sponsored_msg = None
|
|
self._sponsored_msg_ts = 0
|
|
self._sponsored_msg_lock = asyncio.Lock()
|
|
self._sponsored_seen = {}
|
|
self._new_messages_after_sponsored = True
|
|
self._bridging_blocked_at_runtime = False
|
|
|
|
self._prev_reaction_poll = defaultdict(lambda: 0.0)
|
|
|
|
self._msg_conv = putil.TelegramMessageConverter(self)
|
|
|
|
# region Properties
|
|
|
|
@property
|
|
def tgid_full(self) -> tuple[TelegramID, TelegramID]:
|
|
return self.tgid, self.tg_receiver
|
|
|
|
@property
|
|
def tgid_log(self) -> str:
|
|
if self.tgid == self.tg_receiver:
|
|
return str(self.tgid)
|
|
return f"{self.tg_receiver}<->{self.tgid}"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.title
|
|
|
|
@property
|
|
def alias(self) -> RoomAlias | None:
|
|
if not self.username:
|
|
return None
|
|
return RoomAlias(f"#{self.alias_localpart}:{self.hs_domain}")
|
|
|
|
@property
|
|
def alias_localpart(self) -> str | None:
|
|
if not self.username:
|
|
return None
|
|
return self.alias_template.format(self.username)
|
|
|
|
@property
|
|
def peer(self) -> TypePeer | TypeInputPeer:
|
|
if self.peer_type == "user":
|
|
return PeerUser(user_id=self.tgid)
|
|
elif self.peer_type == "chat":
|
|
return PeerChat(chat_id=self.tgid)
|
|
elif self.peer_type == "channel":
|
|
return PeerChannel(channel_id=self.tgid)
|
|
|
|
@property
|
|
def is_direct(self) -> bool:
|
|
return self.peer_type == "user"
|
|
|
|
@property
|
|
def is_channel(self) -> bool:
|
|
return self.peer_type == "channel"
|
|
|
|
@property
|
|
def has_bot(self) -> bool:
|
|
return bool(self.bot) and (
|
|
self.bot.is_in_chat(self.tgid)
|
|
or (self.peer_type == "user" and self.tg_receiver == self.bot.tgid)
|
|
)
|
|
|
|
@property
|
|
def main_intent(self) -> IntentAPI:
|
|
if self._main_intent is None:
|
|
raise RuntimeError("Portal must be postinit()ed before main_intent can be used")
|
|
return self._main_intent
|
|
|
|
@property
|
|
def allow_bridging(self) -> bool:
|
|
if self._bridging_blocked_at_runtime:
|
|
return False
|
|
elif self.peer_type == "user" and self.filter_users is not None:
|
|
return self.filter_users
|
|
elif self.filter_mode == "whitelist":
|
|
return self.tgid in self.filter_list
|
|
elif self.filter_mode == "blacklist":
|
|
return self.tgid not in self.filter_list
|
|
return True
|
|
|
|
@property
|
|
def set_dm_room_metadata(self) -> bool:
|
|
return (
|
|
not self.is_direct
|
|
or self.private_chat_portal_meta == "always"
|
|
or (self.encrypted and self.private_chat_portal_meta != "never")
|
|
)
|
|
|
|
@classmethod
|
|
def init_cls(cls, bridge: "TelegramBridge") -> None:
|
|
BasePortal.bridge = bridge
|
|
cls.az = bridge.az
|
|
cls.config = bridge.config
|
|
cls.loop = bridge.loop
|
|
cls.matrix = bridge.matrix
|
|
cls.bot = bridge.bot
|
|
|
|
cls.max_initial_member_sync = cls.config["bridge.max_initial_member_sync"]
|
|
cls.sync_channel_members = cls.config["bridge.sync_channel_members"]
|
|
cls.sync_matrix_state = cls.config["bridge.sync_matrix_state"]
|
|
cls.public_portals = cls.config["bridge.public_portals"]
|
|
cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
|
|
cls.filter_mode = cls.config["bridge.filter.mode"]
|
|
cls.filter_list = cls.config["bridge.filter.list"]
|
|
cls.filter_users = cls.config["bridge.filter.users"]
|
|
cls.hs_domain = cls.config["homeserver.domain"]
|
|
cls.backfill_enable = cls.config["bridge.backfill.enable"]
|
|
cls.alias_template = SimpleTemplate(
|
|
cls.config["bridge.alias_template"],
|
|
"groupname",
|
|
prefix="#",
|
|
suffix=f":{cls.hs_domain}",
|
|
)
|
|
NotificationDisabler.puppet_cls = p.Puppet
|
|
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
|
|
|
|
# endregion
|
|
# region Matrix -> Telegram metadata
|
|
|
|
async def save(self) -> None:
|
|
if self.deleted:
|
|
await super().insert()
|
|
await self.postinit()
|
|
self.deleted = False
|
|
else:
|
|
await super().save()
|
|
|
|
async def get_telegram_users_in_matrix_room(
|
|
self, source: u.User, pre_create: bool = False
|
|
) -> tuple[list[InputUser], list[UserID], list[u.User]]:
|
|
user_tgids = {}
|
|
users = []
|
|
intent = self.az.intent if pre_create else self.main_intent
|
|
user_mxids = await intent.get_room_members(self.mxid, (Membership.JOIN, Membership.INVITE))
|
|
for mxid in user_mxids:
|
|
if mxid == self.az.bot_mxid:
|
|
continue
|
|
mx_user = await u.User.get_by_mxid(mxid, create=False)
|
|
if mx_user and mx_user.tgid:
|
|
users.append(mx_user)
|
|
user_tgids[mx_user.tgid] = mxid
|
|
puppet_id = p.Puppet.get_id_from_mxid(mxid)
|
|
if puppet_id:
|
|
user_tgids[puppet_id] = mxid
|
|
input_users = []
|
|
errors = []
|
|
for tgid, mxid in user_tgids.items():
|
|
try:
|
|
input_users.append(await source.client.get_input_entity(tgid))
|
|
except ValueError as e:
|
|
source.log.debug(
|
|
f"Failed to find the input entity for {tgid} ({mxid}) for "
|
|
f"creating a group: {e}"
|
|
)
|
|
errors.append(mxid)
|
|
return input_users, errors, users
|
|
|
|
async def upgrade_telegram_chat(self, source: u.User) -> None:
|
|
if self.peer_type != "chat":
|
|
raise ValueError("Only normal group chats are upgradable to supergroups.")
|
|
|
|
response = await source.client(MigrateChatRequest(chat_id=self.tgid))
|
|
entity = None
|
|
for chat in response.chats:
|
|
if isinstance(chat, Channel):
|
|
entity = chat
|
|
break
|
|
if not entity:
|
|
raise ValueError("Upgrade may have failed: output channel not found.")
|
|
await self._migrate_and_save_telegram(TelegramID(entity.id))
|
|
await self.update_info(source, entity)
|
|
|
|
async def _migrate_and_save_telegram(self, new_id: TelegramID) -> None:
|
|
async with self._async_get_locks[(new_id,)]:
|
|
await self._migrate_and_save_telegram_locked(new_id)
|
|
|
|
async def _migrate_and_save_telegram_locked(self, new_id: TelegramID) -> None:
|
|
self.log.info(f"Starting migration to {new_id}")
|
|
try:
|
|
del self.by_tgid[self.tgid_full]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
existing = self.by_tgid[(new_id, new_id)]
|
|
except KeyError:
|
|
existing = None
|
|
self.by_tgid[(new_id, new_id)] = self
|
|
if existing:
|
|
if existing.mxid:
|
|
self.log.warning(f"Deleting existing portal room {existing.mxid} for {new_id}")
|
|
await existing.cleanup_and_delete()
|
|
else:
|
|
self.log.debug(f"Deleting old database entry for {new_id}")
|
|
await existing.delete()
|
|
old_id = self.tgid
|
|
await self.update_id(new_id, "channel")
|
|
self.log = self.__class__.log.getChild(self.tgid_log)
|
|
self.log.info(f"Telegram chat upgraded from {old_id}")
|
|
|
|
async def set_telegram_username(self, source: u.User, username: str) -> None:
|
|
if self.peer_type != "channel":
|
|
raise ValueError("Only channels and supergroups have usernames.")
|
|
await source.client(UpdateUsernameRequest(await self.get_input_entity(source), username))
|
|
if await self._update_username(username):
|
|
await self.save()
|
|
|
|
async def create_telegram_chat(self, source: u.User, supergroup: bool = False) -> None:
|
|
if not self.mxid:
|
|
raise ValueError("Can't create Telegram chat for portal without Matrix room.")
|
|
invites, errors, users = await self.get_telegram_users_in_matrix_room(
|
|
source, pre_create=True
|
|
)
|
|
if len(errors) > 0:
|
|
error_list = "\n".join(f"* [{mxid}](https://matrix.to/#/{mxid})" for mxid in errors)
|
|
command_prefix = self.config["bridge.command_prefix"]
|
|
message = (
|
|
f"Failed to add the following users to the chat:\n\n{error_list}\n\n"
|
|
f"You can try `{command_prefix} search -r <username>` to help the bridge find "
|
|
"those users."
|
|
)
|
|
await self.az.intent.send_notice(
|
|
self.mxid, text=message, html=markdown.render(message)
|
|
)
|
|
elif self.tgid:
|
|
raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.")
|
|
|
|
if len(invites) < 2:
|
|
if self.bot is not None:
|
|
info, mxid = await self.bot.get_me()
|
|
raise ValueError(
|
|
"Not enough Telegram users to create a chat. "
|
|
"Invite more Telegram ghost users to the room, such as the "
|
|
f"relaybot ([{info.first_name}](https://matrix.to/#/{mxid}))."
|
|
)
|
|
raise ValueError(
|
|
"Not enough Telegram users to create a chat. "
|
|
"Invite more Telegram ghost users to the room."
|
|
)
|
|
if self.peer_type == "chat":
|
|
response = await source.client(CreateChatRequest(title=self.title, users=invites))
|
|
entity = response.chats[0]
|
|
elif self.peer_type == "channel":
|
|
response = await source.client(
|
|
CreateChannelRequest(
|
|
title=self.title, about=self.about or "", megagroup=supergroup
|
|
)
|
|
)
|
|
entity = response.chats[0]
|
|
await source.client(
|
|
InviteToChannelRequest(
|
|
channel=await source.client.get_input_entity(entity), users=invites
|
|
)
|
|
)
|
|
else:
|
|
raise ValueError("Invalid peer type for Telegram chat creation")
|
|
|
|
self.tgid = entity.id
|
|
self.tg_receiver = self.tgid
|
|
await self.postinit()
|
|
await self.insert()
|
|
await self.update_info(source, entity)
|
|
self.log = self.__class__.log.getChild(self.tgid_log)
|
|
|
|
if self.bot and self.bot.tgid in invites:
|
|
await self.bot.add_chat(self.tgid, self.peer_type)
|
|
|
|
levels = await self.main_intent.get_power_levels(self.mxid)
|
|
if levels.get_user_level(self.main_intent.mxid) == 100:
|
|
levels = putil.get_base_power_levels(self, levels, entity)
|
|
await self.main_intent.set_power_levels(self.mxid, levels)
|
|
await self.handle_matrix_power_levels(source, levels.users, {}, None)
|
|
await self.update_bridge_info()
|
|
for user in users:
|
|
await user.register_portal(self)
|
|
await self.main_intent.send_notice(self.mxid, f"Telegram chat created. ID: {self.tgid}")
|
|
|
|
async def handle_matrix_invite(
|
|
self, invited_by: u.User, puppet: p.Puppet | au.AbstractUser
|
|
) -> None:
|
|
if isinstance(puppet, p.Puppet) and puppet.is_channel:
|
|
raise ValueError("Can't invite channels to chats")
|
|
try:
|
|
if self.peer_type == "chat":
|
|
await invited_by.client(
|
|
AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)
|
|
)
|
|
elif self.peer_type == "channel":
|
|
await invited_by.client(
|
|
InviteToChannelRequest(channel=self.peer, users=[puppet.tgid])
|
|
)
|
|
# We don't care if there are invites for private chat portals with the relaybot.
|
|
elif not self.bot or self.tg_receiver != self.bot.tgid:
|
|
raise RejectMatrixInvite("You can't invite additional users to private chats.")
|
|
except RPCError as e:
|
|
raise RejectMatrixInvite(e.message) from e
|
|
|
|
# endregion
|
|
# region Telegram -> Matrix metadata
|
|
|
|
def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, Any]:
|
|
invite_content = {}
|
|
if double_puppet:
|
|
invite_content["fi.mau.will_auto_accept"] = True
|
|
if self.is_direct:
|
|
invite_content["is_direct"] = True
|
|
return invite_content
|
|
|
|
async def invite_to_matrix(self, users: InviteList) -> None:
|
|
if isinstance(users, list):
|
|
for user in users:
|
|
await self.invite_to_matrix(user)
|
|
else:
|
|
puppet = await p.Puppet.get_by_custom_mxid(users)
|
|
await self.main_intent.invite_user(
|
|
self.mxid, users, check_cache=True, extra_content=self._get_invite_content(puppet)
|
|
)
|
|
if puppet:
|
|
try:
|
|
await puppet.intent.ensure_joined(self.mxid)
|
|
except Exception:
|
|
self.log.exception("Failed to ensure %s is joined to portal", users)
|
|
|
|
async def update_matrix_room(
|
|
self,
|
|
user: au.AbstractUser,
|
|
entity: TypeChat | User,
|
|
puppet: p.Puppet = None,
|
|
levels: PowerLevelStateEventContent = None,
|
|
users: list[User] = None,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> None:
|
|
try:
|
|
await self._update_matrix_room(user, entity, puppet, levels, users, client)
|
|
except Exception:
|
|
self.log.exception("Fatal error updating Matrix room")
|
|
|
|
async def _update_matrix_room(
|
|
self,
|
|
user: au.AbstractUser,
|
|
entity: TypeChat | User,
|
|
puppet: p.Puppet = None,
|
|
levels: PowerLevelStateEventContent = None,
|
|
users: list[User] = None,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> None:
|
|
if not client:
|
|
client = user.client
|
|
if not self.is_direct:
|
|
await self.update_info(user, entity, client=client)
|
|
if not users:
|
|
users = await self._get_users(client, entity)
|
|
await self._sync_telegram_users(user, users, client=client)
|
|
await self.update_power_levels(users, levels)
|
|
else:
|
|
if not puppet:
|
|
puppet = await self.get_dm_puppet()
|
|
await puppet.update_info(user, entity)
|
|
await puppet.intent_for(self).join_room(self.mxid)
|
|
await self.update_info_from_puppet(puppet, user, entity.photo)
|
|
|
|
puppet = await p.Puppet.get_by_custom_mxid(user.mxid)
|
|
if puppet:
|
|
try:
|
|
did_join = await puppet.intent.ensure_joined(self.mxid)
|
|
if isinstance(user, u.User) and did_join and self.peer_type == "user":
|
|
await user.update_direct_chats({self.main_intent.mxid: [self.mxid]})
|
|
except Exception:
|
|
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)
|
|
|
|
if self.sync_matrix_state:
|
|
await self.main_intent.get_joined_members(self.mxid)
|
|
|
|
async def update_info_from_puppet(
|
|
self,
|
|
puppet: p.Puppet | None = None,
|
|
source: au.AbstractUser | None = None,
|
|
photo: UserProfilePhoto | None = None,
|
|
) -> None:
|
|
if puppet is None:
|
|
puppet = await self.get_dm_puppet()
|
|
changed = await self._update_avatar_from_puppet(puppet, source, photo)
|
|
changed = await self._update_title(puppet.displayname) or changed
|
|
if changed:
|
|
await self.save()
|
|
await self.update_bridge_info()
|
|
|
|
async def create_matrix_room(
|
|
self,
|
|
user: au.AbstractUser,
|
|
entity: TypeChat | User = None,
|
|
invites: InviteList = None,
|
|
update_if_exists: bool = True,
|
|
from_dialog_sync: bool = False,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> RoomID | None:
|
|
if self.mxid:
|
|
if update_if_exists:
|
|
if not entity:
|
|
try:
|
|
entity = await self.get_entity(user, client)
|
|
except Exception:
|
|
self.log.exception(f"Failed to get entity through {user.tgid} for update")
|
|
return self.mxid
|
|
update = self.update_matrix_room(user, entity)
|
|
background_task.create(update)
|
|
await self.invite_to_matrix(invites or [])
|
|
return self.mxid
|
|
elif user.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
|
|
raise Exception("create_matrix_room called as relaybot")
|
|
async with self._room_create_lock:
|
|
try:
|
|
return await self._create_matrix_room(
|
|
user, entity, invites, client=client, from_dialog_sync=from_dialog_sync
|
|
)
|
|
except Exception:
|
|
self.log.exception("Fatal error creating Matrix room")
|
|
|
|
@property
|
|
def bridge_info_state_key(self) -> str:
|
|
return f"net.maunium.telegram://telegram/{self.tgid}"
|
|
|
|
@property
|
|
def bridge_info(self) -> dict[str, Any]:
|
|
info = {
|
|
"bridgebot": self.az.bot_mxid,
|
|
"creator": self.main_intent.mxid,
|
|
"protocol": {
|
|
"id": "telegram",
|
|
"displayname": "Telegram",
|
|
"avatar_url": self.config["appservice.bot_avatar"],
|
|
"external_url": "https://telegram.org",
|
|
},
|
|
"channel": {
|
|
"id": str(self.tgid),
|
|
"displayname": self.title,
|
|
"avatar_url": self.avatar_url,
|
|
},
|
|
}
|
|
if self.username:
|
|
info["channel"]["external_url"] = f"https://t.me/{self.username}"
|
|
elif self.peer_type == "user":
|
|
# TODO this doesn't feel very reliable
|
|
puppet = p.Puppet.by_tgid.get(self.tgid, None)
|
|
if puppet and puppet.username:
|
|
info["channel"]["external_url"] = f"https://t.me/{puppet.username}"
|
|
return info
|
|
|
|
async def update_bridge_info(self) -> None:
|
|
if not self.mxid:
|
|
self.log.debug("Not updating bridge info: no Matrix room created")
|
|
return
|
|
try:
|
|
self.log.debug("Updating bridge info...")
|
|
await self.main_intent.send_state_event(
|
|
self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key
|
|
)
|
|
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
|
|
await self.main_intent.send_state_event(
|
|
self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key
|
|
)
|
|
except Exception:
|
|
self.log.warning("Failed to update bridge info", exc_info=True)
|
|
|
|
async def _create_matrix_room(
|
|
self,
|
|
user: au.AbstractUser,
|
|
entity: TypeChat | User,
|
|
invites: InviteList,
|
|
from_dialog_sync: bool,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> RoomID | None:
|
|
if self.mxid:
|
|
return self.mxid
|
|
elif not self.allow_bridging:
|
|
return None
|
|
if not client:
|
|
client = user.client
|
|
|
|
invites = invites or []
|
|
|
|
dialog = None
|
|
if not from_dialog_sync and not user.is_bot:
|
|
self.log.debug("Fetching dialog info for new portal")
|
|
try:
|
|
dialogs: PeerDialogs | None = await user.client(
|
|
GetPeerDialogsRequest(
|
|
peers=[InputDialogPeer(await self.get_input_entity(user))]
|
|
)
|
|
)
|
|
except Exception:
|
|
self.log.warning("Failed to fetch dialog info", exc_info=True)
|
|
dialogs = None
|
|
if dialogs and dialogs.chats and dialogs.chats[0].id == self.tgid:
|
|
entity = dialogs.chats[0]
|
|
self.log.debug("Got entity info from get dialogs request")
|
|
elif dialogs and self.is_direct and dialogs.users:
|
|
for dialog_user in dialogs.users:
|
|
if dialog_user.id == self.tgid:
|
|
entity = dialog_user
|
|
self.log.debug("Got user entity info from get dialogs request")
|
|
break
|
|
if dialogs and dialogs.dialogs:
|
|
entities = {
|
|
get_peer_id(x): x
|
|
for x in itertools.chain(dialogs.users, dialogs.chats)
|
|
if not isinstance(x, (UserEmpty, ChatEmpty))
|
|
}
|
|
msg = dialogs.messages[0] if len(dialogs.messages) == 1 else None
|
|
dialog = Dialog(user.client, dialogs.dialogs[0], entities, msg)
|
|
self.log.debug("Got dialog info for new portal: %s", dialog)
|
|
|
|
if not entity:
|
|
entity = await self.get_entity(user, client)
|
|
self.log.trace("Fetched data: %s", entity)
|
|
|
|
participants_count = 2
|
|
if isinstance(entity, Chat):
|
|
participants_count = entity.participants_count
|
|
if entity.deactivated or entity.migrated_to:
|
|
self.log.error(
|
|
"Throwing error for attempted portal creation "
|
|
f"({entity.deactivated=}, {entity.migrated_to=})"
|
|
)
|
|
raise RuntimeError("Tried to create portal for deactivated chat")
|
|
elif isinstance(entity, Channel) and not entity.broadcast:
|
|
participants_count = entity.participants_count
|
|
if participants_count is None and self.config["bridge.max_member_count"] > 0:
|
|
self.log.warning(f"Participant count not found in entity, fetching manually")
|
|
participants_count = (await client.get_participants(entity, limit=0)).total
|
|
if participants_count and 0 < self.config["bridge.max_member_count"] < participants_count:
|
|
self.log.warning(f"Not bridging chat, too many participants (%d)", participants_count)
|
|
self._bridging_blocked_at_runtime = True
|
|
return None
|
|
|
|
self.log.debug("Preparing to create room")
|
|
|
|
if self.is_direct:
|
|
puppet = await self.get_dm_puppet()
|
|
await puppet.update_info(user, entity, client_override=client)
|
|
self._main_intent = puppet.intent_for(self)
|
|
if self.tgid == user.tgid:
|
|
self.title = "Telegram Saved Messages"
|
|
self.about = "Your Telegram cloud storage chat"
|
|
else:
|
|
puppet = None
|
|
self._main_intent = self.az.intent
|
|
await self.update_info(user, entity, client=client)
|
|
|
|
preset = RoomCreatePreset.PRIVATE
|
|
if self.peer_type == "channel" and entity.username:
|
|
if self.public_portals:
|
|
preset = RoomCreatePreset.PUBLIC
|
|
self.username = entity.username
|
|
alias = self.alias_localpart
|
|
else:
|
|
# TODO invite link alias?
|
|
alias = None
|
|
|
|
if alias:
|
|
# TODO? properly handle existing room aliases
|
|
await self.main_intent.remove_room_alias(alias)
|
|
|
|
power_levels = putil.get_base_power_levels(self, entity=entity)
|
|
users = None
|
|
if not self.is_direct:
|
|
users = await self._get_users(client, entity)
|
|
if self.has_bot:
|
|
extra_invites = self.config["bridge.relaybot.group_chat_invite"]
|
|
invites += extra_invites
|
|
for invite in extra_invites:
|
|
power_levels.users.setdefault(invite, 100)
|
|
await putil.participants_to_power_levels(self, users, power_levels)
|
|
elif self.bot and self.tg_receiver == self.bot.tgid:
|
|
assert puppet is not None
|
|
invites += self.config["bridge.relaybot.private_chat.invite"]
|
|
for invite in invites:
|
|
power_levels.users.setdefault(invite, 100)
|
|
self.title = puppet.displayname
|
|
|
|
initial_state = [
|
|
{
|
|
"type": EventType.ROOM_POWER_LEVELS.serialize(),
|
|
"content": power_levels.serialize(),
|
|
},
|
|
{
|
|
"type": str(StateBridge),
|
|
"state_key": self.bridge_info_state_key,
|
|
"content": self.bridge_info,
|
|
},
|
|
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
|
|
{
|
|
"type": str(StateHalfShotBridge),
|
|
"state_key": self.bridge_info_state_key,
|
|
"content": self.bridge_info,
|
|
},
|
|
]
|
|
autojoin_invites = self.bridge.homeserver_software.is_hungry
|
|
create_invites = set()
|
|
if autojoin_invites:
|
|
create_invites |= set(invites)
|
|
invites = []
|
|
if not self.is_direct:
|
|
create_invites |= await self._sync_telegram_users(user, users, client=client)
|
|
if self.config["bridge.encryption.default"] and self.matrix.e2ee:
|
|
self.encrypted = True
|
|
initial_state.append(
|
|
{
|
|
"type": str(EventType.ROOM_ENCRYPTION),
|
|
"content": self.get_encryption_state_event_json(),
|
|
}
|
|
)
|
|
if self.is_direct:
|
|
create_invites.add(self.az.bot_mxid)
|
|
if self.is_direct:
|
|
assert puppet is not None
|
|
self.title = puppet.displayname
|
|
self.avatar_url = puppet.avatar_url
|
|
self.photo_id = puppet.photo_id
|
|
creation_content = {}
|
|
if not self.config["bridge.federate_rooms"]:
|
|
creation_content["m.federate"] = False
|
|
if self.avatar_url and self.set_dm_room_metadata:
|
|
initial_state.append(
|
|
{
|
|
"type": str(EventType.ROOM_AVATAR),
|
|
"content": {"url": self.avatar_url},
|
|
}
|
|
)
|
|
|
|
with self.backfill_lock:
|
|
self.log.debug(
|
|
f"Creating room with parameters invite={create_invites}, {autojoin_invites=}, "
|
|
f"{preset=}, {alias=!r}, name={self.title!r}, topic={self.about!r}, "
|
|
f"{creation_content=}, is_direct={self.is_direct}, {self.set_dm_room_metadata=}"
|
|
)
|
|
room_id = await self.main_intent.create_room(
|
|
alias_localpart=alias,
|
|
preset=preset,
|
|
is_direct=self.is_direct,
|
|
invitees=list(create_invites),
|
|
name=self.title if self.set_dm_room_metadata else None,
|
|
topic=self.about,
|
|
initial_state=initial_state,
|
|
creation_content=creation_content,
|
|
beeper_auto_join_invites=autojoin_invites,
|
|
)
|
|
if not room_id:
|
|
raise Exception(f"Failed to create room")
|
|
self.name_set = bool(self.title) and self.set_dm_room_metadata
|
|
self.avatar_set = bool(self.avatar_url) and self.set_dm_room_metadata
|
|
|
|
if not autojoin_invites and self.encrypted and self.matrix.e2ee and self.is_direct:
|
|
try:
|
|
await self.az.intent.ensure_joined(room_id)
|
|
except Exception:
|
|
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")
|
|
|
|
self.mxid = room_id
|
|
self.by_mxid[self.mxid] = self
|
|
await self.save()
|
|
self.log.debug(f"Matrix room created: {self.mxid}")
|
|
await self.az.state_store.set_power_levels(self.mxid, power_levels)
|
|
await user.register_portal(self)
|
|
if dialog and isinstance(user, u.User):
|
|
await user.post_sync_dialog(
|
|
self, puppet=None, was_created=True, **user.dialog_to_sync_args(dialog)
|
|
)
|
|
|
|
if not autojoin_invites or not self.is_direct:
|
|
await self.invite_to_matrix(invites)
|
|
await self.update_matrix_room(
|
|
user, entity, puppet, levels=power_levels, users=users, client=client
|
|
)
|
|
else:
|
|
# When using autojoining, all metadata is already set, so just update state caches
|
|
await self.main_intent.get_joined_members(self.mxid)
|
|
|
|
self.first_event_id = await self.main_intent.send_message_event(
|
|
self.mxid, DummyPortalCreated, {}
|
|
)
|
|
await self.save()
|
|
|
|
if self.backfill_enable:
|
|
try:
|
|
await self.forward_backfill(user, initial=True, client=client)
|
|
except Exception:
|
|
self.log.exception("Error in initial backfill")
|
|
if self._enable_batch_sending:
|
|
await self.enqueue_backfill(user, priority=50)
|
|
|
|
return self.mxid
|
|
|
|
async def _get_users(
|
|
self,
|
|
client: MautrixTelegramClient,
|
|
entity: TypeInputPeer | InputUser | TypeChat | TypeUser | InputChannel,
|
|
) -> list[TypeUser]:
|
|
if self.peer_type == "channel" and not self.megagroup and not self.sync_channel_members:
|
|
return []
|
|
limit = self.max_initial_member_sync
|
|
if limit == 0:
|
|
return []
|
|
return await putil.get_users(client, self.tgid, entity, limit, self.peer_type)
|
|
|
|
async def update_power_levels(
|
|
self,
|
|
users: list[TypeUser | TypeChatParticipant | TypeChannelParticipant],
|
|
levels: PowerLevelStateEventContent = None,
|
|
) -> None:
|
|
if not levels:
|
|
levels = await self.main_intent.get_power_levels(self.mxid)
|
|
if await putil.participants_to_power_levels(self, users, levels):
|
|
await self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
async def update_default_banned_rights(self, dbr: ChatBannedRights) -> None:
|
|
self.log.debug("Default rights in chat changed: %s", dbr)
|
|
levels = await self.main_intent.get_power_levels(self.mxid)
|
|
levels = putil.get_base_power_levels(self, levels, dbr=dbr)
|
|
await self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
async def _add_bot_chat(self, bot: User) -> None:
|
|
if self.bot and bot.id == self.bot.tgid:
|
|
await self.bot.add_chat(self.tgid, self.peer_type)
|
|
return
|
|
|
|
user = await u.User.get_by_tgid(TelegramID(bot.id))
|
|
if user and user.is_bot:
|
|
await user.register_portal(self)
|
|
|
|
async def _sync_telegram_users(
|
|
self,
|
|
source: au.AbstractUser,
|
|
users: list[User],
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> set[UserID] | None:
|
|
allowed_tgids = set()
|
|
join_mxids = set()
|
|
skip_deleted = self.config["bridge.skip_deleted_members"]
|
|
for entity in users:
|
|
puppet = await p.Puppet.get_by_tgid(TelegramID(entity.id))
|
|
if entity.bot:
|
|
await self._add_bot_chat(entity)
|
|
allowed_tgids.add(entity.id)
|
|
|
|
await puppet.update_info(source, entity, client_override=client)
|
|
if skip_deleted and entity.deleted:
|
|
continue
|
|
|
|
if self.mxid:
|
|
await puppet.intent_for(self).ensure_joined(self.mxid)
|
|
else:
|
|
join_mxids.add(puppet.intent_for(self).mxid)
|
|
|
|
user = await u.User.get_by_tgid(TelegramID(entity.id))
|
|
if user:
|
|
if self.mxid:
|
|
await self.invite_to_matrix(user.mxid)
|
|
else:
|
|
join_mxids.add(user.mxid)
|
|
|
|
if not self.mxid:
|
|
return join_mxids
|
|
|
|
# We can't trust the member list if any of the following cases is true:
|
|
# * There are close to 10 000 users, because Telegram might not be sending all members.
|
|
# * The member sync count is limited, because then we might ignore some members.
|
|
# * It's a channel, because non-admins don't have access to the member list
|
|
# and even admins can only see 200 members.
|
|
# * The source user is not in the chat, because that likely means it's a group
|
|
# with the member list hidden (so only admins are visible).
|
|
trust_member_list = (
|
|
(
|
|
len(allowed_tgids) < 9900
|
|
if self.max_initial_member_sync < 0
|
|
else len(allowed_tgids) < self.max_initial_member_sync - 10
|
|
)
|
|
and (self.megagroup or self.peer_type != "channel")
|
|
and source.tgid in allowed_tgids
|
|
)
|
|
if not trust_member_list:
|
|
return None
|
|
|
|
for user_mxid in await self.main_intent.get_room_members(self.mxid):
|
|
if user_mxid == self.az.bot_mxid:
|
|
continue
|
|
|
|
puppet = await p.Puppet.get_by_mxid(user_mxid)
|
|
if puppet:
|
|
# TODO figure out when/how to clean up channels from the member list
|
|
if puppet.id in allowed_tgids or puppet.is_channel:
|
|
continue
|
|
if self.bot and puppet.id == self.bot.tgid:
|
|
await self.bot.remove_chat(self.tgid)
|
|
try:
|
|
await self.main_intent.kick_user(
|
|
self.mxid, user_mxid, "User had left this Telegram chat."
|
|
)
|
|
except MForbidden:
|
|
pass
|
|
continue
|
|
|
|
mx_user = await u.User.get_by_mxid(user_mxid, create=False)
|
|
if mx_user:
|
|
if mx_user.tgid in allowed_tgids:
|
|
continue
|
|
if mx_user.is_bot:
|
|
await mx_user.unregister_portal(*self.tgid_full)
|
|
if not self.has_bot:
|
|
try:
|
|
await self.main_intent.kick_user(
|
|
self.mxid, mx_user.mxid, "You had left this Telegram chat."
|
|
)
|
|
except MForbidden:
|
|
pass
|
|
|
|
return None
|
|
|
|
async def _add_telegram_user(
|
|
self, user_id: TelegramID, source: au.AbstractUser | None = None
|
|
) -> None:
|
|
puppet = await p.Puppet.get_by_tgid(user_id)
|
|
if source:
|
|
try:
|
|
entity: User = await source.client.get_entity(PeerUser(user_id))
|
|
except ValueError:
|
|
self.log.warning(
|
|
f"Couldn't get info of {user_id} through {source.tgid} to add them to the room"
|
|
)
|
|
return
|
|
await puppet.update_info(source, entity)
|
|
await puppet.intent_for(self).ensure_joined(self.mxid)
|
|
|
|
user = await u.User.get_by_tgid(user_id)
|
|
if user:
|
|
await user.register_portal(self)
|
|
await self.invite_to_matrix(user.mxid)
|
|
|
|
async def delete_telegram_user(self, user_id: TelegramID, sender: p.Puppet | None) -> None:
|
|
puppet = await p.Puppet.get_by_tgid(user_id)
|
|
if sender is None:
|
|
sender = puppet
|
|
user = await u.User.get_by_tgid(user_id)
|
|
kick_message = (
|
|
f"Kicked by {sender.displayname}"
|
|
if sender and sender.tgid != puppet.tgid
|
|
else "Left Telegram chat"
|
|
)
|
|
puppet_extra_content = None
|
|
if sender.is_real_user:
|
|
puppet_extra_content = {DOUBLE_PUPPET_SOURCE_KEY: self.bridge.name}
|
|
if sender.tgid != puppet.tgid:
|
|
try:
|
|
await sender.intent_for(self).kick_user(
|
|
self.mxid, puppet.mxid, extra_content=puppet_extra_content
|
|
)
|
|
except MForbidden:
|
|
try:
|
|
await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message)
|
|
except MForbidden as e:
|
|
self.log.warning(f"Failed to kick {puppet.mxid}: {e}")
|
|
elif await self.az.state_store.is_joined(self.mxid, puppet.intent_for(self).mxid):
|
|
await puppet.intent_for(self).leave_room(self.mxid, extra_content=puppet_extra_content)
|
|
if user:
|
|
await user.unregister_portal(*self.tgid_full)
|
|
if sender.tgid != puppet.tgid:
|
|
try:
|
|
await sender.intent_for(self).kick_user(
|
|
self.mxid, user.mxid, extra_content=puppet_extra_content
|
|
)
|
|
return
|
|
except MForbidden:
|
|
pass
|
|
try:
|
|
await self.main_intent.kick_user(self.mxid, user.mxid, kick_message)
|
|
except MForbidden as e:
|
|
self.log.warning(f"Failed to kick {user.mxid}: {e}")
|
|
|
|
async def update_info(
|
|
self,
|
|
user: au.AbstractUser,
|
|
entity: TypeChat = None,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> None:
|
|
if self.peer_type == "user":
|
|
self.log.warning("Called update_info() for direct chat portal")
|
|
return
|
|
|
|
changed = False
|
|
self.log.debug("Updating info")
|
|
try:
|
|
if not entity:
|
|
entity = await self.get_entity(user, client)
|
|
self.log.trace("Fetched data: %s", entity)
|
|
|
|
if self.peer_type == "channel":
|
|
changed = self.megagroup != entity.megagroup or changed
|
|
self.megagroup = entity.megagroup
|
|
changed = await self._update_username(entity.username) or changed
|
|
|
|
if hasattr(entity, "about"):
|
|
changed = self._update_about(entity.about) or changed
|
|
|
|
changed = await self._update_title(entity.title) or changed
|
|
|
|
if isinstance(entity.photo, ChatPhoto):
|
|
changed = await self._update_avatar(user, entity.photo, client=client) or changed
|
|
except Exception:
|
|
self.log.exception(f"Failed to update info from source {user.tgid}")
|
|
|
|
if changed:
|
|
await self.save()
|
|
await self.update_bridge_info()
|
|
|
|
async def _update_username(self, username: str, save: bool = False) -> bool:
|
|
if self.username == username:
|
|
return False
|
|
|
|
if self.username:
|
|
await self.main_intent.remove_room_alias(self.alias_localpart)
|
|
self.username = username or None
|
|
if self.mxid:
|
|
if self.username:
|
|
await self.main_intent.add_room_alias(
|
|
self.mxid, self.alias_localpart, override=True
|
|
)
|
|
if self.public_portals:
|
|
await self.main_intent.set_join_rule(self.mxid, JoinRule.PUBLIC)
|
|
else:
|
|
await self.main_intent.set_join_rule(self.mxid, JoinRule.INVITE)
|
|
|
|
if save:
|
|
await self.save()
|
|
return True
|
|
|
|
async def _try_set_state(
|
|
self, sender: p.Puppet | None, evt_type: EventType, content: StateEventContent
|
|
) -> None:
|
|
if sender:
|
|
try:
|
|
intent = sender.intent_for(self)
|
|
if sender.is_real_user:
|
|
content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
|
|
await intent.send_state_event(self.mxid, evt_type, content)
|
|
except MForbidden:
|
|
await self.main_intent.send_state_event(self.mxid, evt_type, content)
|
|
else:
|
|
await self.main_intent.send_state_event(self.mxid, evt_type, content)
|
|
|
|
async def _update_about(
|
|
self, about: str, sender: p.Puppet | None = None, save: bool = False
|
|
) -> bool:
|
|
if self.about == about:
|
|
return False
|
|
|
|
self.about = about
|
|
if self.mxid:
|
|
await self._try_set_state(
|
|
sender, EventType.ROOM_TOPIC, RoomTopicStateEventContent(topic=self.about)
|
|
)
|
|
if save:
|
|
await self.save()
|
|
return True
|
|
|
|
async def _update_title(
|
|
self, title: str, sender: p.Puppet | None = None, save: bool = False
|
|
) -> bool:
|
|
if self.title == title and (self.name_set or not self.set_dm_room_metadata):
|
|
return False
|
|
|
|
self.title = title
|
|
self.name_set = False
|
|
if self.mxid and self.set_dm_room_metadata:
|
|
try:
|
|
await self._try_set_state(
|
|
sender, EventType.ROOM_NAME, RoomNameStateEventContent(name=self.title)
|
|
)
|
|
self.name_set = True
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to set room name: {e}")
|
|
if save:
|
|
await self.save()
|
|
return True
|
|
|
|
async def _update_avatar_from_puppet(
|
|
self, puppet: p.Puppet, user: au.AbstractUser | None, photo: UserProfilePhoto | None
|
|
) -> bool:
|
|
if self.photo_id == puppet.photo_id and (self.avatar_set or not self.set_dm_room_metadata):
|
|
return False
|
|
if puppet.avatar_url:
|
|
self.photo_id = puppet.photo_id
|
|
self.avatar_url = puppet.avatar_url
|
|
self.avatar_set = False
|
|
if self.mxid and self.set_dm_room_metadata:
|
|
try:
|
|
await self._try_set_state(
|
|
None,
|
|
EventType.ROOM_AVATAR,
|
|
RoomAvatarStateEventContent(url=self.avatar_url),
|
|
)
|
|
self.avatar_set = True
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to set room avatar: {e}")
|
|
return True
|
|
elif photo is not None and user is not None and self.set_dm_room_metadata:
|
|
return await self._update_avatar(user, photo=photo)
|
|
else:
|
|
return False
|
|
|
|
async def _update_avatar(
|
|
self,
|
|
user: au.AbstractUser,
|
|
photo: TypeChatPhoto | TypeUserProfilePhoto,
|
|
sender: p.Puppet | None = None,
|
|
save: bool = False,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> bool:
|
|
if isinstance(photo, (ChatPhoto, UserProfilePhoto)):
|
|
loc = InputPeerPhotoFileLocation(
|
|
peer=await self.get_input_entity(user), photo_id=photo.photo_id, big=True
|
|
)
|
|
photo_id = str(photo.photo_id)
|
|
elif isinstance(photo, Photo):
|
|
loc, _ = self._msg_conv.get_largest_photo_size(photo)
|
|
photo_id = str(loc.id)
|
|
elif isinstance(photo, (UserProfilePhotoEmpty, ChatPhotoEmpty, PhotoEmpty, type(None))):
|
|
photo_id = ""
|
|
loc = None
|
|
else:
|
|
raise ValueError(f"Unknown photo type {type(photo)}")
|
|
if (
|
|
self.peer_type == "user"
|
|
and not photo_id
|
|
and not self.config["bridge.allow_avatar_remove"]
|
|
):
|
|
return False
|
|
if self.photo_id != photo_id or not self.avatar_set:
|
|
if not photo_id:
|
|
self.photo_id = ""
|
|
self.avatar_url = None
|
|
elif self.photo_id != photo_id or not self.avatar_url:
|
|
file = await util.transfer_file_to_matrix(
|
|
client or user.client,
|
|
self.main_intent,
|
|
loc,
|
|
async_upload=self.config["homeserver.async_media"],
|
|
)
|
|
if not file:
|
|
return False
|
|
self.photo_id = photo_id
|
|
self.avatar_url = file.mxc
|
|
if self.mxid:
|
|
try:
|
|
await self._try_set_state(
|
|
sender,
|
|
EventType.ROOM_AVATAR,
|
|
RoomAvatarStateEventContent(url=self.avatar_url),
|
|
)
|
|
self.avatar_set = True
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to set room avatar: {e}")
|
|
self.avatar_set = False
|
|
if save:
|
|
await self.save()
|
|
return True
|
|
return False
|
|
|
|
# endregion
|
|
# region Matrix -> Telegram bridging
|
|
|
|
async def _send_delivery_receipt(
|
|
self, event_id: EventID, room_id: RoomID | None = None
|
|
) -> None:
|
|
if event_id and self.config["bridge.delivery_receipts"]:
|
|
try:
|
|
await self.az.intent.mark_read(room_id or self.mxid, event_id)
|
|
except Exception:
|
|
self.log.exception("Failed to send delivery receipt for %s", event_id)
|
|
|
|
async def _get_state_change_message(
|
|
self, event: str, user: u.User, **kwargs: Any
|
|
) -> str | None:
|
|
tpl = self.get_config(f"state_event_formats.{event}")
|
|
if len(tpl) == 0:
|
|
# Empty format means they don't want the message
|
|
return None
|
|
displayname = await self.get_displayname(user)
|
|
|
|
tpl_args = {
|
|
"mxid": user.mxid,
|
|
"username": user.mxid_localpart,
|
|
"displayname": escape_html(displayname),
|
|
"distinguisher": self._get_distinguisher(user.mxid),
|
|
**kwargs,
|
|
}
|
|
return Template(tpl).safe_substitute(tpl_args)
|
|
|
|
async def _send_state_change_message(
|
|
self, event: str, user: u.User, event_id: EventID, **kwargs: Any
|
|
) -> None:
|
|
if not self.has_bot:
|
|
return
|
|
elif (
|
|
self.peer_type == "user"
|
|
and not self.config["bridge.relaybot.private_chat.state_changes"]
|
|
):
|
|
return
|
|
async with self.send_lock(self.bot.tgid):
|
|
message = await self._get_state_change_message(event, user, **kwargs)
|
|
if not message:
|
|
return
|
|
message, entities = await formatter.matrix_to_telegram(self.bot.client, html=message)
|
|
response = await self.bot.client.send_message(
|
|
self.peer, message, formatting_entities=entities
|
|
)
|
|
space = self.tgid if self.peer_type == "channel" else self.bot.tgid
|
|
self.dedup.check(response, (event_id, space))
|
|
|
|
async def name_change_matrix(
|
|
self, user: u.User, displayname: str, prev_displayname: str, event_id: EventID
|
|
) -> None:
|
|
await self._send_state_change_message(
|
|
"name_change",
|
|
user,
|
|
event_id,
|
|
displayname=displayname,
|
|
prev_displayname=prev_displayname,
|
|
)
|
|
|
|
async def get_displayname(self, user: u.User) -> str:
|
|
return await self.main_intent.get_room_displayname(self.mxid, user.mxid) or user.mxid
|
|
|
|
def set_typing(
|
|
self, user: u.User, typing: bool = True, action: type = SendMessageTypingAction
|
|
) -> Awaitable[bool]:
|
|
return user.client(
|
|
SetTypingRequest(self.peer, action() if typing else SendMessageCancelAction())
|
|
)
|
|
|
|
async def _get_sponsored_message(
|
|
self, user: u.User
|
|
) -> tuple[SponsoredMessage | None, Channel | User | None]:
|
|
if user.is_bot:
|
|
return None, None
|
|
elif self._sponsored_msg_ts + 5 * 60 > time.monotonic():
|
|
return self._sponsored_msg, self._sponsored_entity
|
|
|
|
self.log.trace(f"Fetching a new sponsored message through {user.mxid}")
|
|
self._sponsored_msg, t_id, self._sponsored_entity = await putil.get_sponsored_message(
|
|
user, await self.get_input_entity(user)
|
|
)
|
|
self._sponsored_msg_ts = time.monotonic()
|
|
if self._sponsored_msg is not None and self._sponsored_entity is None:
|
|
self.log.warning(f"GetSponsoredMessages didn't return entity for {t_id}")
|
|
return self._sponsored_msg, self._sponsored_entity
|
|
|
|
async def _send_sponsored_msg(self, user: u.User) -> None:
|
|
msg, entity = await self._get_sponsored_message(user)
|
|
if msg is None:
|
|
self.log.trace("Didn't get a sponsored message")
|
|
return
|
|
if self.sponsored_event_id is not None:
|
|
self.log.debug(
|
|
f"Redacting old sponsored {self.sponsored_event_id}"
|
|
" in preparation for sending new one"
|
|
)
|
|
await self.main_intent.redact(self.mxid, self.sponsored_event_id)
|
|
content = await putil.make_sponsored_message_content(user, msg, entity)
|
|
self.log.trace("Sending sponsored message")
|
|
self.sponsored_event_id = await self._send_message(self.main_intent, content)
|
|
self.sponsored_event_ts = int(time.time())
|
|
self.sponsored_msg_random_id = msg.random_id
|
|
self._new_messages_after_sponsored = False
|
|
self._sponsored_seen = {}
|
|
await self.save()
|
|
self.log.debug(
|
|
f"Sent sponsored message {base64.b64encode(self.sponsored_msg_random_id)} "
|
|
f"to Matrix {self.sponsored_event_id} / {self.sponsored_event_ts}"
|
|
)
|
|
|
|
@property
|
|
def _sponsored_is_expired(self) -> bool:
|
|
return (
|
|
self.sponsored_event_id is None
|
|
or self.sponsored_event_ts + 24 * 60 * 60 < int(time.time())
|
|
) and self._new_messages_after_sponsored
|
|
|
|
async def _try_handle_read_for_sponsored_msg(
|
|
self, user: u.User, event_id: EventID, timestamp: int
|
|
) -> None:
|
|
try:
|
|
await self._handle_read_for_sponsored_msg(user, event_id, timestamp)
|
|
except Exception:
|
|
self.log.warning(
|
|
"Error handling read receipt for sponsored message processing", exc_info=True
|
|
)
|
|
|
|
async def _handle_read_for_sponsored_msg(
|
|
self, user: u.User, event_id: EventID, timestamp: int
|
|
) -> None:
|
|
if user.is_bot or not self.username:
|
|
return
|
|
if self._sponsored_is_expired:
|
|
self.log.trace("Sponsored message is expired, sending new one")
|
|
async with self._sponsored_msg_lock:
|
|
if self._sponsored_is_expired:
|
|
await self._send_sponsored_msg(user)
|
|
return
|
|
|
|
if (
|
|
self.sponsored_event_id == event_id or self.sponsored_event_ts <= timestamp
|
|
) and not self._sponsored_seen.get(user.mxid, False):
|
|
self._sponsored_seen[user.mxid] = True
|
|
self.log.debug(
|
|
f"Marking sponsored message {self.sponsored_event_id} as seen by {user.mxid}"
|
|
)
|
|
await user.client(
|
|
ViewSponsoredMessageRequest(
|
|
channel=await self.get_input_entity(user),
|
|
random_id=self.sponsored_msg_random_id,
|
|
)
|
|
)
|
|
|
|
async def mark_read(self, user: u.User, event_id: EventID, timestamp: int) -> None:
|
|
if user.is_bot:
|
|
return
|
|
space = self.tgid if self.peer_type == "channel" else user.tgid
|
|
message = await DBMessage.get_by_mxid(event_id, self.mxid, space)
|
|
if not message:
|
|
message = await DBMessage.find_last(self.mxid, space)
|
|
if not message:
|
|
self.log.debug(
|
|
f"Dropping Matrix read receipt from {user.mxid}: "
|
|
f"target message {event_id} not known and last message in chat not found"
|
|
)
|
|
return
|
|
else:
|
|
self.log.debug(
|
|
f"Matrix read receipt target {event_id} not known, marking "
|
|
f"messages up to most recent ({message.mxid}/{message.tgid}) "
|
|
f"as read by {user.mxid}/{user.tgid}"
|
|
)
|
|
else:
|
|
self.log.debug(
|
|
"Handling Matrix read receipt: marking messages up to "
|
|
f"{message.mxid}/{message.tgid} as read by {user.mxid}/{user.tgid}"
|
|
)
|
|
await user.client.send_read_acknowledge(
|
|
self.peer, max_id=message.tgid, clear_mentions=True, clear_reactions=True
|
|
)
|
|
if self.peer_type == "channel":
|
|
if not self.megagroup:
|
|
background_task.create(
|
|
self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)
|
|
)
|
|
else:
|
|
background_task.create(self._poll_telegram_reactions(user))
|
|
|
|
async def _preproc_kick_ban(
|
|
self, user: u.User | p.Puppet, source: u.User
|
|
) -> au.AbstractUser | None:
|
|
if user.tgid == source.tgid:
|
|
return None
|
|
if self.peer_type == "user" and user.tgid == self.tgid:
|
|
await self.delete()
|
|
return None
|
|
if isinstance(user, u.User) and await user.needs_relaybot(self):
|
|
if not self.bot:
|
|
return None
|
|
# TODO kick message
|
|
return None
|
|
if await source.needs_relaybot(self):
|
|
if not self.has_bot:
|
|
return None
|
|
return self.bot
|
|
return source
|
|
|
|
async def kick_matrix(self, user: u.User | p.Puppet, source: u.User) -> None:
|
|
source = await self._preproc_kick_ban(user, source)
|
|
if source is not None:
|
|
await source.client.kick_participant(self.peer, user.peer)
|
|
|
|
async def ban_matrix(self, user: u.User | p.Puppet, source: u.User):
|
|
source = await self._preproc_kick_ban(user, source)
|
|
if source is not None:
|
|
await source.client.edit_permissions(self.peer, user.peer, view_messages=False)
|
|
|
|
async def leave_matrix(self, user: u.User, event_id: EventID) -> None:
|
|
if await user.needs_relaybot(self):
|
|
await self._send_state_change_message("leave", user, event_id)
|
|
return
|
|
|
|
if self.peer_type == "user":
|
|
await self.main_intent.leave_room(self.mxid)
|
|
await self.delete()
|
|
try:
|
|
del self.by_tgid[self.tgid_full]
|
|
del self.by_mxid[self.mxid]
|
|
except KeyError:
|
|
pass
|
|
elif self.config["bridge.bridge_matrix_leave"]:
|
|
await user.client.delete_dialog(self.peer)
|
|
|
|
async def join_matrix(self, user: u.User, event_id: EventID) -> None:
|
|
if await user.needs_relaybot(self):
|
|
await self._send_state_change_message("join", user, event_id)
|
|
return
|
|
|
|
if self.peer_type == "channel" and not user.is_bot:
|
|
await user.client(JoinChannelRequest(channel=await self.get_input_entity(user)))
|
|
else:
|
|
# We'll just assume the user is already in the chat.
|
|
pass
|
|
|
|
@staticmethod
|
|
def hash_user_id(val: UserID) -> int:
|
|
"""
|
|
A simple Matrix user ID hashing algorithm that matches what Element does.
|
|
|
|
Args:
|
|
val: the Matrix user ID.
|
|
|
|
Returns:
|
|
A 32-bit hash of the user ID.
|
|
"""
|
|
out = 0
|
|
for char in val:
|
|
out = (out << 5) - out + ord(char)
|
|
# Emulate JS's 32-bit signed bitwise OR `hash |= 0`
|
|
out = (out & 2**31 - 1) - (out & 2**31)
|
|
return abs(out)
|
|
|
|
def _get_distinguisher(self, user_id: UserID) -> str:
|
|
ruds = self.get_config("relay_user_distinguishers") or []
|
|
return ruds[self.hash_user_id(user_id) % len(ruds)] if ruds else ""
|
|
|
|
async def _apply_msg_format(self, sender: u.User, content: MessageEventContent) -> None:
|
|
if isinstance(content, TextMessageEventContent):
|
|
content.ensure_has_html()
|
|
else:
|
|
content.format = Format.HTML
|
|
content.formatted_body = escape_html(content.body).replace("\n", "<br/>")
|
|
|
|
tpl = (
|
|
self.get_config(f"message_formats.[{content.msgtype.value}]")
|
|
or "<b>$sender_displayname</b>: $message"
|
|
)
|
|
displayname = await self.get_displayname(sender)
|
|
tpl_args = dict(
|
|
sender_mxid=sender.mxid,
|
|
sender_username=sender.mxid_localpart,
|
|
sender_displayname=escape_html(displayname),
|
|
message=content.formatted_body,
|
|
body=content.body,
|
|
formatted_body=content.formatted_body,
|
|
distinguisher=self._get_distinguisher(sender.mxid),
|
|
)
|
|
content.formatted_body = Template(tpl).safe_substitute(tpl_args)
|
|
|
|
async def _apply_emote_format(self, sender: u.User, content: TextMessageEventContent) -> None:
|
|
content.ensure_has_html()
|
|
|
|
tpl = self.get_config("emote_format")
|
|
puppet = await p.Puppet.get_by_tgid(sender.tgid)
|
|
content.formatted_body = Template(tpl).safe_substitute(
|
|
dict(
|
|
sender_mxid=sender.mxid,
|
|
sender_username=sender.mxid_localpart,
|
|
sender_displayname=escape_html(await self.get_displayname(sender)),
|
|
mention=f"<a href='https://matrix.to/#/{puppet.mxid}'>{puppet.displayname}</a>",
|
|
username=sender.tg_username,
|
|
displayname=puppet.displayname,
|
|
body=content.body,
|
|
formatted_body=content.formatted_body,
|
|
)
|
|
)
|
|
content.msgtype = MessageType.TEXT
|
|
|
|
async def _pre_process_matrix_message(
|
|
self, sender: u.User, use_relaybot: bool, content: MessageEventContent
|
|
) -> None:
|
|
if use_relaybot:
|
|
await self._apply_msg_format(sender, content)
|
|
elif content.msgtype == MessageType.EMOTE:
|
|
await self._apply_emote_format(sender, content)
|
|
|
|
async def _handle_matrix_text(
|
|
self,
|
|
sender: u.User,
|
|
logged_in: bool,
|
|
event_id: EventID,
|
|
space: TelegramID,
|
|
client: MautrixTelegramClient,
|
|
content: TextMessageEventContent,
|
|
reply_to: TelegramID | None,
|
|
) -> None:
|
|
message, entities = await formatter.matrix_to_telegram(
|
|
client, text=content.body, html=content.formatted(Format.HTML)
|
|
)
|
|
sender_id = sender.tgid if logged_in else self.bot.tgid
|
|
async with self.send_lock(sender_id):
|
|
lp = self.get_config("telegram_link_preview")
|
|
if content.get_edit():
|
|
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
|
|
if orig_msg:
|
|
resp = await client.edit_message(
|
|
self.peer,
|
|
orig_msg.tgid,
|
|
message,
|
|
formatting_entities=entities,
|
|
link_preview=lp,
|
|
)
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender_id,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=-1,
|
|
response=resp,
|
|
msgtype=content.msgtype,
|
|
)
|
|
return
|
|
response = await client.send_message(
|
|
self.peer,
|
|
message,
|
|
reply_to=reply_to,
|
|
formatting_entities=entities,
|
|
link_preview=lp,
|
|
)
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender_id,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=0,
|
|
response=response,
|
|
msgtype=content.msgtype,
|
|
)
|
|
|
|
async def _handle_matrix_file(
|
|
self,
|
|
sender: u.User,
|
|
logged_in: bool,
|
|
event_id: EventID,
|
|
space: TelegramID,
|
|
client: MautrixTelegramClient,
|
|
content: MediaMessageEventContent,
|
|
reply_to: TelegramID,
|
|
file_name: str,
|
|
caption: TextMessageEventContent = None,
|
|
) -> None:
|
|
sender_id = sender.tgid if logged_in else self.bot.tgid
|
|
mime = content.info.mimetype
|
|
if isinstance(content.info, (ImageInfo, VideoInfo)):
|
|
w, h = content.info.width, content.info.height
|
|
else:
|
|
w = h = None
|
|
max_image_size = self.config["bridge.image_as_file_size"] * 1000**2
|
|
max_image_pixels = self.config["bridge.image_as_file_pixels"]
|
|
|
|
attributes = []
|
|
if self.config["bridge.parallel_file_transfer"] and content.url:
|
|
file_handle, file_size = await util.parallel_transfer_to_telegram(
|
|
client, self.main_intent, content.url, sender_id
|
|
)
|
|
else:
|
|
if content.file:
|
|
if not decrypt_attachment:
|
|
raise BridgingError(
|
|
f"Can't bridge encrypted media event {event_id}: "
|
|
"encryption dependencies not installed"
|
|
)
|
|
file = await self.main_intent.download_media(content.file.url)
|
|
file = decrypt_attachment(
|
|
file, content.file.key.key, content.file.hashes.get("sha256"), content.file.iv
|
|
)
|
|
else:
|
|
file = await self.main_intent.download_media(content.url)
|
|
|
|
if content.msgtype == MessageType.STICKER:
|
|
if mime == "image/gif":
|
|
# Remove sticker description
|
|
file_name = "sticker.gif"
|
|
else:
|
|
if mime not in ("video/webm", "application/x-tgsticker"):
|
|
mime, file, w, h = util.convert_image(
|
|
file, source_mime=mime, target_type="webp"
|
|
)
|
|
attributes.append(
|
|
DocumentAttributeSticker(
|
|
alt=content.body, stickerset=InputStickerSetEmpty()
|
|
)
|
|
)
|
|
|
|
file_handle = await client.upload_file(file)
|
|
file_size = len(file)
|
|
|
|
file_handle.name = file_name
|
|
force_document = file_size >= max_image_size
|
|
attributes.append(DocumentAttributeFilename(file_name=file_name))
|
|
|
|
if content.msgtype == MessageType.VIDEO:
|
|
attributes.append(
|
|
DocumentAttributeVideo(
|
|
duration=int(content.info.duration // 1000 if content.info.duration else 0),
|
|
w=w or 0,
|
|
h=h or 0,
|
|
)
|
|
)
|
|
elif content.msgtype == MessageType.AUDIO:
|
|
waveform = content.get("org.matrix.msc1767.audio", {}).get("waveform", [])
|
|
if waveform:
|
|
waveform_max = max(waveform)
|
|
waveform = [round(part / max(waveform_max / 32, 1)) for part in waveform]
|
|
attributes.append(
|
|
DocumentAttributeAudio(
|
|
duration=int(content.info.duration // 1000 if content.info.duration else 0),
|
|
voice="org.matrix.msc3245.voice" in content,
|
|
waveform=encode_waveform(waveform) if waveform else None,
|
|
)
|
|
)
|
|
elif w and h:
|
|
attributes.append(DocumentAttributeImageSize(w, h))
|
|
force_document = force_document or w * h >= max_image_pixels
|
|
|
|
if "fi.mau.telegram.force_document" in content:
|
|
force_document = bool(content["fi.mau.telegram.force_document"])
|
|
|
|
if (mime == "image/png" or mime == "image/jpeg") and not force_document:
|
|
media = InputMediaUploadedPhoto(file_handle)
|
|
else:
|
|
media = InputMediaUploadedDocument(
|
|
file=file_handle,
|
|
attributes=attributes,
|
|
mime_type=mime or "application/octet-stream",
|
|
)
|
|
|
|
capt, entities = (
|
|
await formatter.matrix_to_telegram(
|
|
client, text=caption.body, html=caption.formatted(Format.HTML)
|
|
)
|
|
if caption
|
|
else (None, None)
|
|
)
|
|
|
|
async with self.send_lock(sender_id):
|
|
if await self._matrix_document_edit(
|
|
sender, sender_id, client, content, space, capt, entities, media, event_id
|
|
):
|
|
return
|
|
try:
|
|
try:
|
|
response = await client.send_media(
|
|
self.peer, media, reply_to=reply_to, caption=capt, entities=entities
|
|
)
|
|
except (
|
|
PhotoInvalidDimensionsError,
|
|
PhotoSaveFileInvalidError,
|
|
PhotoExtInvalidError,
|
|
):
|
|
media = InputMediaUploadedDocument(
|
|
file=media.file, mime_type=mime, attributes=attributes
|
|
)
|
|
response = await client.send_media(
|
|
self.peer, media, reply_to=reply_to, caption=capt, entities=entities
|
|
)
|
|
except Exception:
|
|
raise
|
|
else:
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender_id,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=0,
|
|
response=response,
|
|
msgtype=content.msgtype,
|
|
)
|
|
|
|
async def _matrix_document_edit(
|
|
self,
|
|
sender: u.User,
|
|
sender_tgid: TelegramID,
|
|
client: MautrixTelegramClient,
|
|
content: MessageEventContent,
|
|
space: TelegramID,
|
|
caption: str,
|
|
caption_entities,
|
|
media: Any,
|
|
event_id: EventID,
|
|
) -> bool:
|
|
if content.get_edit():
|
|
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
|
|
if orig_msg:
|
|
response = await client.edit_message(
|
|
self.peer,
|
|
orig_msg.tgid,
|
|
caption,
|
|
formatting_entities=caption_entities,
|
|
file=media,
|
|
)
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender_tgid,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=-1,
|
|
response=response,
|
|
msgtype=content.msgtype,
|
|
)
|
|
return True
|
|
return False
|
|
|
|
async def _handle_matrix_location(
|
|
self,
|
|
sender: u.User,
|
|
logged_in: bool,
|
|
event_id: EventID,
|
|
space: TelegramID,
|
|
client: MautrixTelegramClient,
|
|
content: LocationMessageEventContent,
|
|
reply_to: TelegramID,
|
|
) -> None:
|
|
sender_id = sender.tgid if logged_in else self.bot.tgid
|
|
try:
|
|
lat, long = content.geo_uri[len("geo:") :].split(";")[0].split(",")
|
|
lat, long = float(lat), float(long)
|
|
except (KeyError, ValueError):
|
|
self.log.exception("Failed to parse location")
|
|
return None
|
|
try:
|
|
caption = content["org.matrix.msc3488.location"]["description"]
|
|
entities = []
|
|
except KeyError:
|
|
caption, entities = await formatter.matrix_to_telegram(client, text=content.body)
|
|
media = MessageMediaGeo(geo=GeoPoint(lat=lat, long=long, access_hash=0))
|
|
|
|
async with self.send_lock(sender_id):
|
|
if await self._matrix_document_edit(
|
|
sender, sender_id, client, content, space, caption, entities, media, event_id
|
|
):
|
|
return
|
|
try:
|
|
response = await client.send_media(
|
|
self.peer, media, reply_to=reply_to, caption=caption, entities=entities
|
|
)
|
|
except Exception:
|
|
raise
|
|
else:
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender_id,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=0,
|
|
response=response,
|
|
msgtype=content.msgtype,
|
|
)
|
|
|
|
async def _mark_matrix_handled(
|
|
self,
|
|
sender: u.User,
|
|
sender_tgid: TelegramID,
|
|
event_type: EventType,
|
|
event_id: EventID,
|
|
space: TelegramID,
|
|
edit_index: int,
|
|
response: TypeMessage,
|
|
msgtype: MessageType | None = None,
|
|
) -> None:
|
|
self.log.trace("Raw event handling response for %s: %s", event_id, response)
|
|
event_hash, _ = self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
|
|
if edit_index < 0:
|
|
prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1)
|
|
edit_index = prev_edit.edit_index + 1
|
|
await DBMessage(
|
|
tgid=TelegramID(response.id),
|
|
tg_space=space,
|
|
mx_room=self.mxid,
|
|
mxid=event_id,
|
|
edit_index=edit_index,
|
|
content_hash=event_hash,
|
|
sender_mxid=sender.mxid,
|
|
sender=sender_tgid,
|
|
).insert()
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
event_id,
|
|
self.mxid,
|
|
event_type,
|
|
message_type=msgtype,
|
|
)
|
|
await self._send_delivery_receipt(event_id)
|
|
background_task.create(self._send_message_status(event_id, err=None))
|
|
if response.ttl_period:
|
|
await self._mark_disappearing(
|
|
event_id=event_id,
|
|
seconds=response.ttl_period,
|
|
expires_at=int(response.date.timestamp()) + response.ttl_period,
|
|
)
|
|
self.log.debug(
|
|
f"Handled Matrix message {event_id} -> {response.id} (edit index {edit_index})"
|
|
)
|
|
|
|
@staticmethod
|
|
def _error_to_human_message(err: Exception) -> str | None:
|
|
if isinstance(err, YouBlockedUserError):
|
|
return "You blocked this user"
|
|
elif isinstance(err, UserIsBlockedError):
|
|
return "You were blocked by this user"
|
|
elif isinstance(err, UserBannedInChannelError):
|
|
return "You're banned from sending messages in supergroups/channels"
|
|
elif isinstance(err, InputUserDeactivatedError):
|
|
return "This user was deleted"
|
|
elif isinstance(err, ChatAdminRequiredError):
|
|
return "Only admins can do that"
|
|
elif isinstance(err, (ChatRestrictedError, ChatWriteForbiddenError)):
|
|
return "You can't send messages in this chat"
|
|
elif isinstance(err, SlowModeWaitError):
|
|
return f"Slow mode enabled, wait {format_duration(err.seconds)} before sending"
|
|
elif isinstance(err, MessageEmptyError):
|
|
return "Message is empty"
|
|
elif isinstance(err, MessageTooLongError):
|
|
return "Message is too long"
|
|
elif isinstance(err, EntitiesTooLongError):
|
|
return "Message has too many formatting entities"
|
|
elif isinstance(err, EntityBoundsInvalidError):
|
|
return "Message formatting entities are malformed"
|
|
elif isinstance(err, EntityMentionUserInvalidError):
|
|
return "You mentioned an invalid user"
|
|
return None
|
|
|
|
async def _send_message_status(self, event_id: EventID, err: Exception | None) -> None:
|
|
if not self.config["bridge.message_status_events"]:
|
|
return
|
|
intent = self.az.intent if self.encrypted else self.main_intent
|
|
status = BeeperMessageStatusEventContent(
|
|
network=self.bridge_info_state_key,
|
|
relates_to=RelatesTo(
|
|
rel_type=RelationType.REFERENCE,
|
|
event_id=event_id,
|
|
),
|
|
)
|
|
if isinstance(err, IgnoredMessageError):
|
|
status.reason = MessageStatusReason.UNSUPPORTED
|
|
status.error = str(err)
|
|
status.status = MessageStatus.FAIL
|
|
elif err:
|
|
status.reason = MessageStatusReason.GENERIC_ERROR
|
|
status.error = f"{type(err).__name__}: {err}"
|
|
status.status = MessageStatus.RETRIABLE
|
|
status.message = self._error_to_human_message(err)
|
|
else:
|
|
status.status = MessageStatus.SUCCESS
|
|
|
|
await intent.send_message_event(
|
|
room_id=self.mxid,
|
|
event_type=EventType.BEEPER_MESSAGE_STATUS,
|
|
content=status,
|
|
)
|
|
|
|
async def _send_bridge_error(
|
|
self,
|
|
sender: u.User,
|
|
err: Exception,
|
|
event_id: EventID,
|
|
event_type: EventType,
|
|
message_type: MessageType | None = None,
|
|
msg: str | None = None,
|
|
) -> None:
|
|
sender.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.PERM_FAILURE,
|
|
event_id,
|
|
self.mxid,
|
|
event_type,
|
|
message_type=message_type,
|
|
error=err,
|
|
)
|
|
|
|
if msg and self.config["bridge.delivery_error_reports"]:
|
|
if not isinstance(err, MessageNotModifiedError):
|
|
await self._send_message(
|
|
self.main_intent, TextMessageEventContent(msgtype=MessageType.NOTICE, body=msg)
|
|
)
|
|
await self._send_message_status(event_id, err)
|
|
|
|
async def handle_matrix_message(
|
|
self, sender: u.User, content: MessageEventContent, event_id: EventID
|
|
) -> None:
|
|
try:
|
|
await self._handle_matrix_message(sender, content, event_id)
|
|
except RPCError as e:
|
|
self.log.exception(f"RPCError while bridging {event_id}: {e}")
|
|
await self._send_bridge_error(
|
|
sender,
|
|
e,
|
|
event_id,
|
|
EventType.ROOM_MESSAGE,
|
|
message_type=content.msgtype,
|
|
msg=f"\u26a0 Your message may not have been bridged: {e}",
|
|
)
|
|
except Exception as e:
|
|
if isinstance(e, IgnoredMessageError):
|
|
self.log.debug(f"Ignored {event_id}: {e}")
|
|
else:
|
|
self.log.exception(f"Failed to bridge {event_id}")
|
|
await self._send_bridge_error(
|
|
sender,
|
|
e,
|
|
event_id,
|
|
EventType.ROOM_MESSAGE,
|
|
message_type=content.msgtype,
|
|
)
|
|
|
|
async def _find_source_msg(
|
|
self, sender: u.User, content: MessageEventContent
|
|
) -> DBMessage | None:
|
|
try:
|
|
source = content["fi.mau.telegram.source"]
|
|
except KeyError:
|
|
return None
|
|
if not isinstance(source, dict):
|
|
return None
|
|
try:
|
|
msg_id = source["id"]
|
|
space = source["space"]
|
|
chat_id = source["chat_id"]
|
|
peer_type = source["peer_type"]
|
|
except KeyError:
|
|
return None
|
|
if (
|
|
not isinstance(msg_id, int)
|
|
or not isinstance(chat_id, int)
|
|
or not isinstance(space, int)
|
|
or not isinstance(peer_type, str)
|
|
):
|
|
return None
|
|
elif await sender.needs_relaybot(self):
|
|
return None
|
|
if peer_type == "user" and space != sender.tgid:
|
|
return
|
|
dbm = await DBMessage.get_one_by_tgid(TelegramID(msg_id), TelegramID(space))
|
|
if dbm and peer_type == "chat" and space != sender.tgid:
|
|
dbm = DBMessage.get_by_mxid(dbm.mxid, dbm.mx_room, sender.tgid)
|
|
return dbm
|
|
|
|
async def _handle_matrix_forward(
|
|
self,
|
|
sender: u.User,
|
|
msg: DBMessage,
|
|
event_id: EventID,
|
|
space: TelegramID,
|
|
msgtype: MessageType,
|
|
) -> bool:
|
|
source_portal = await Portal.get_by_mxid(msg.mx_room)
|
|
if not source_portal:
|
|
return False
|
|
async with self.send_lock(sender.tgid):
|
|
try:
|
|
response = await sender.client.forward_messages(
|
|
self.peer,
|
|
messages=[msg.tgid],
|
|
from_peer=source_portal.peer,
|
|
)
|
|
except Exception as e:
|
|
self.log.warning(
|
|
f"Failed to send {event_id} from {sender.mxid} as forward of {msg.tgid} "
|
|
f"from {source_portal.tgid}: {e}, falling back to normal message handling"
|
|
)
|
|
return False
|
|
else:
|
|
await self._mark_matrix_handled(
|
|
sender=sender,
|
|
sender_tgid=sender.tgid,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
event_id=event_id,
|
|
space=space,
|
|
edit_index=0,
|
|
response=response[0],
|
|
msgtype=msgtype,
|
|
)
|
|
return True
|
|
|
|
async def _handle_matrix_message(
|
|
self, sender: u.User, content: MessageEventContent, event_id: EventID
|
|
) -> None:
|
|
if not content.msgtype:
|
|
raise IgnoredMessageError("Message doesn't have a msgtype")
|
|
elif not content.body:
|
|
raise IgnoredMessageError("Message doesn't have a body")
|
|
|
|
logged_in = not await sender.needs_relaybot(self)
|
|
client = sender.client if logged_in else self.bot.client
|
|
space = (
|
|
self.tgid
|
|
if self.peer_type == "channel" # Channels have their own ID space
|
|
else (sender.tgid if logged_in else self.bot.tgid)
|
|
)
|
|
source_msg = await self._find_source_msg(sender, content)
|
|
if source_msg and await self._handle_matrix_forward(
|
|
sender, source_msg, event_id, space, content.msgtype
|
|
):
|
|
return
|
|
reply_to = await formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)
|
|
|
|
media = (
|
|
MessageType.STICKER,
|
|
MessageType.IMAGE,
|
|
MessageType.FILE,
|
|
MessageType.AUDIO,
|
|
MessageType.VIDEO,
|
|
)
|
|
|
|
if content.msgtype == MessageType.NOTICE:
|
|
bridge_notices = self.get_config("bridge_notices.default")
|
|
excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
|
|
if not bridge_notices and not excepted:
|
|
raise IgnoredMessageError("Notices are not configured to be bridged.")
|
|
|
|
if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE):
|
|
await self._pre_process_matrix_message(sender, not logged_in, content)
|
|
await self._handle_matrix_text(
|
|
sender, logged_in, event_id, space, client, content, reply_to
|
|
)
|
|
elif content.msgtype == MessageType.LOCATION:
|
|
await self._pre_process_matrix_message(sender, not logged_in, content)
|
|
await self._handle_matrix_location(
|
|
sender, logged_in, event_id, space, client, content, reply_to
|
|
)
|
|
elif content.msgtype in media:
|
|
file_name = content.body
|
|
try:
|
|
caption_content: TextMessageEventContent | None = sender.command_status["caption"]
|
|
reply_to = reply_to or await formatter.matrix_reply_to_telegram(
|
|
caption_content, space, room_id=self.mxid
|
|
)
|
|
sender.command_status = None
|
|
except (KeyError, TypeError):
|
|
if not logged_in or (
|
|
"filename" in content and content["filename"] != content.body
|
|
):
|
|
if "filename" in content:
|
|
file_name = content["filename"]
|
|
caption_content = TextMessageEventContent(
|
|
msgtype=MessageType.TEXT,
|
|
body=content.body,
|
|
)
|
|
if (
|
|
"formatted_body" in content
|
|
and str(content.get("format")) == Format.HTML.value
|
|
):
|
|
caption_content["formatted_body"] = content["formatted_body"]
|
|
caption_content["format"] = Format.HTML
|
|
else:
|
|
caption_content = None
|
|
if caption_content:
|
|
caption_content.msgtype = content.msgtype
|
|
await self._pre_process_matrix_message(sender, not logged_in, caption_content)
|
|
await self._handle_matrix_file(
|
|
sender,
|
|
logged_in,
|
|
event_id,
|
|
space,
|
|
client,
|
|
content,
|
|
reply_to,
|
|
file_name,
|
|
caption_content,
|
|
)
|
|
else:
|
|
self.log.debug(
|
|
f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}"
|
|
)
|
|
self.log.trace("Unhandled Matrix event content: %s", content)
|
|
raise IgnoredMessageError(f"Unhandled msgtype {content.msgtype}")
|
|
|
|
async def handle_matrix_unpin_all(self, sender: u.User, pin_event_id: EventID) -> None:
|
|
await sender.client(UnpinAllMessagesRequest(peer=self.peer))
|
|
await self._send_delivery_receipt(pin_event_id)
|
|
|
|
async def handle_matrix_pin(
|
|
self, sender: u.User, changes: dict[EventID, bool], pin_event_id: EventID
|
|
) -> None:
|
|
tg_space = self.tgid if self.peer_type == "channel" else sender.tgid
|
|
ids = {
|
|
msg.mxid: msg.tgid
|
|
for msg in await DBMessage.get_by_mxids(
|
|
list(changes.keys()), mx_room=self.mxid, tg_space=tg_space
|
|
)
|
|
}
|
|
for event_id, pinned in changes.items():
|
|
try:
|
|
await sender.client(
|
|
UpdatePinnedMessageRequest(peer=self.peer, id=ids[event_id], unpin=not pinned)
|
|
)
|
|
except (ChatNotModifiedError, MessageIdInvalidError, KeyError):
|
|
pass
|
|
await self._send_delivery_receipt(pin_event_id)
|
|
|
|
async def handle_matrix_deletion(
|
|
self, deleter: u.User, event_id: EventID, redaction_event_id: EventID
|
|
) -> None:
|
|
try:
|
|
await self._handle_matrix_deletion(deleter, event_id)
|
|
except IgnoredMessageError as e:
|
|
self.log.debug(str(e))
|
|
await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
|
|
except Exception as e:
|
|
self.log.exception(f"Failed to bridge redaction by {deleter.mxid}")
|
|
await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
|
|
else:
|
|
deleter.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
redaction_event_id,
|
|
self.mxid,
|
|
EventType.ROOM_REDACTION,
|
|
)
|
|
await self._send_delivery_receipt(redaction_event_id)
|
|
background_task.create(self._send_message_status(redaction_event_id, err=None))
|
|
|
|
async def _handle_matrix_reaction_deletion(
|
|
self, deleter: u.User, event_id: EventID, tg_space: TelegramID
|
|
) -> None:
|
|
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
|
|
if not reaction:
|
|
raise IgnoredMessageError(f"Ignoring Matrix redaction of unknown event {event_id}")
|
|
elif reaction.tg_sender != deleter.tgid:
|
|
raise IgnoredMessageError(f"Ignoring Matrix redaction of reaction by another user")
|
|
msg = await DBMessage.get_by_mxid(reaction.msg_mxid, reaction.mx_room, tg_space)
|
|
if not msg or msg.redacted:
|
|
raise IgnoredMessageError(
|
|
f"Ignoring Matrix redaction of reaction to unknown event {reaction.msg_mxid}"
|
|
)
|
|
async with self.reaction_lock(msg.mxid):
|
|
await reaction.delete()
|
|
new_reactions = None
|
|
if await deleter.get_max_reactions() > 1:
|
|
new_reactions = [
|
|
react.telegram
|
|
for react in await DBReaction.get_by_sender(
|
|
msg.mxid, msg.mx_room, deleter.tgid
|
|
)
|
|
] or None
|
|
await deleter.client(
|
|
SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_reactions)
|
|
)
|
|
self.log.debug(
|
|
f"Handled Matrix deletion of reaction {event_id} to {msg.tgid} "
|
|
f"(new reaction count: {len(new_reactions) if new_reactions else 0})"
|
|
)
|
|
|
|
async def _handle_matrix_deletion(self, deleter: u.User, event_id: EventID) -> None:
|
|
real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot
|
|
tg_space = self.tgid if self.peer_type == "channel" else real_deleter.tgid
|
|
message = await DBMessage.get_by_mxid(event_id, self.mxid, tg_space)
|
|
if not message:
|
|
await self._handle_matrix_reaction_deletion(real_deleter, event_id, tg_space)
|
|
elif message.redacted:
|
|
raise IgnoredMessageError(
|
|
"Ignoring Matrix redaction of already redacted event "
|
|
f"{message.mxid} in {message.mx_room}"
|
|
)
|
|
elif message.edit_index != 0:
|
|
await message.mark_redacted()
|
|
raise IgnoredMessageError(
|
|
f"Ignoring Matrix redaction of edit event {message.mxid} in {message.mx_room}"
|
|
)
|
|
else:
|
|
await message.mark_redacted()
|
|
await real_deleter.client.delete_messages(self.peer, [message.tgid])
|
|
self.log.debug(f"Handled Matrix redaction of {event_id} / {message.tgid}")
|
|
|
|
async def handle_matrix_reaction(
|
|
self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
|
|
) -> None:
|
|
emoji_id = emoji
|
|
reaction = ReactionEmoji(emoticon=variation_selector.remove(emoji))
|
|
if emoji.startswith("mxc://"):
|
|
db_reaction = await DBTelegramFile.find_by_mxc(ContentURI(emoji))
|
|
if not db_reaction or not db_reaction.id.isdecimal():
|
|
self.log.debug(f"Dropping unknown reaction {emoji} by {user.mxid}")
|
|
if not self.has_bot:
|
|
await self.main_intent.redact(
|
|
self.mxid, reaction_event_id, reason="Unrecognized custom emoji"
|
|
)
|
|
await self._send_bridge_error(
|
|
user,
|
|
Exception("Unrecognized custom emoji"),
|
|
reaction_event_id,
|
|
EventType.REACTION,
|
|
)
|
|
return
|
|
reaction = ReactionCustomEmoji(document_id=int(db_reaction.id))
|
|
emoji_id = db_reaction.id
|
|
elif (
|
|
self.config["bridge.always_custom_emoji_reaction"]
|
|
or reaction.emoticon not in await user.get_available_reactions()
|
|
):
|
|
try:
|
|
doc_id = util.unicode_custom_emoji_map[reaction.emoticon]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.log.trace(
|
|
f"Using custom reaction {doc_id} instead of unicode {reaction.emoticon} "
|
|
f"for {user.mxid}'s reaction"
|
|
)
|
|
reaction = ReactionCustomEmoji(document_id=doc_id)
|
|
emoji_id = str(doc_id)
|
|
try:
|
|
async with self.reaction_lock(target_event_id):
|
|
await self._handle_matrix_reaction(
|
|
user, target_event_id, emoji_id, reaction, reaction_event_id
|
|
)
|
|
except IgnoredMessageError as e:
|
|
self.log.debug(str(e))
|
|
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
|
|
except ReactionInvalidError as e:
|
|
# Don't redact reactions in relaybot chats, there are usually other Matrix users too.
|
|
if not self.has_bot:
|
|
await self.main_intent.redact(
|
|
self.mxid, reaction_event_id, reason="Emoji not allowed"
|
|
)
|
|
self.log.debug(f"Failed to bridge reaction by {user.mxid}: emoji not allowed")
|
|
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
|
|
except Exception as e:
|
|
self.log.exception(f"Failed to bridge reaction by {user.mxid}")
|
|
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
|
|
else:
|
|
user.send_remote_checkpoint(
|
|
MessageSendCheckpointStatus.SUCCESS,
|
|
reaction_event_id,
|
|
self.mxid,
|
|
EventType.REACTION,
|
|
)
|
|
await self._send_delivery_receipt(reaction_event_id)
|
|
background_task.create(self._send_message_status(reaction_event_id, err=None))
|
|
|
|
async def _handle_matrix_reaction(
|
|
self,
|
|
user: u.User,
|
|
target_event_id: EventID,
|
|
emoji_id: str,
|
|
reaction: TypeReaction,
|
|
reaction_event_id: EventID,
|
|
) -> None:
|
|
tg_space = self.tgid if self.peer_type == "channel" else user.tgid
|
|
msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space)
|
|
if not msg:
|
|
raise IgnoredMessageError(
|
|
f"Ignoring Matrix reaction to unknown event {target_event_id}"
|
|
)
|
|
elif msg.redacted:
|
|
raise IgnoredMessageError(
|
|
f"Ignoring Matrix reaction to redacted event {target_event_id}"
|
|
)
|
|
elif msg.edit_index != 0:
|
|
raise IgnoredMessageError(f"Ignoring Matrix reaction to edit event {target_event_id}")
|
|
|
|
existing_reacts = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
|
|
new_tg_reactions: list[TypeReaction] = []
|
|
reactions_to_remove: list[DBReaction] = []
|
|
max_reactions = await user.get_max_reactions()
|
|
max_reactions -= 1 # Leave one reaction of space for the new reaction
|
|
for db_reaction in existing_reacts:
|
|
if db_reaction.reaction == emoji_id:
|
|
raise IgnoredMessageError("Ignoring duplicate Matrix reaction")
|
|
if len(new_tg_reactions) < max_reactions:
|
|
new_tg_reactions.append(db_reaction.telegram)
|
|
else:
|
|
reactions_to_remove.append(db_reaction)
|
|
new_tg_reactions.append(reaction)
|
|
|
|
await user.client(
|
|
SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_tg_reactions)
|
|
)
|
|
puppet = await user.get_puppet()
|
|
removed = 0
|
|
for db_reaction in reactions_to_remove:
|
|
removed += 1
|
|
await db_reaction.delete()
|
|
await puppet.intent_for(self).redact(db_reaction.mx_room, db_reaction.mxid)
|
|
self.log.debug(
|
|
f"Handled Matrix reaction {reaction_event_id} to {msg.tgid} "
|
|
f"(new reaction count: {len(new_tg_reactions)}, removed {removed} old reactions)"
|
|
)
|
|
await DBReaction(
|
|
mxid=reaction_event_id,
|
|
mx_room=self.mxid,
|
|
msg_mxid=msg.mxid,
|
|
tg_sender=user.tgid,
|
|
reaction=emoji_id,
|
|
).save()
|
|
|
|
async def _update_telegram_power_level(
|
|
self, sender: u.User, user_id: TelegramID, level: int
|
|
) -> None:
|
|
moderator = level >= 50
|
|
admin = level >= 75
|
|
await sender.client.edit_admin(
|
|
self.peer,
|
|
user_id,
|
|
change_info=moderator,
|
|
post_messages=moderator,
|
|
edit_messages=moderator,
|
|
delete_messages=moderator,
|
|
ban_users=moderator,
|
|
invite_users=moderator,
|
|
pin_messages=moderator,
|
|
add_admins=admin,
|
|
)
|
|
|
|
async def handle_matrix_power_levels(
|
|
self,
|
|
sender: u.User,
|
|
new_users: dict[UserID, int],
|
|
old_users: dict[UserID, int],
|
|
event_id: EventID | None,
|
|
) -> None:
|
|
# TODO handle all power level changes and bridge exact admin rights to supergroups/channels
|
|
for user, level in new_users.items():
|
|
if not user or user == self.main_intent.mxid or user == sender.mxid:
|
|
continue
|
|
user_id = p.Puppet.get_id_from_mxid(user)
|
|
if not user_id:
|
|
mx_user = await u.User.get_by_mxid(user, create=False)
|
|
if not mx_user or not mx_user.tgid:
|
|
continue
|
|
user_id = mx_user.tgid
|
|
if not user_id or user_id == sender.tgid:
|
|
continue
|
|
if user not in old_users or level != old_users[user]:
|
|
await self._update_telegram_power_level(sender, user_id, level)
|
|
|
|
async def handle_matrix_about(self, sender: u.User, about: str, event_id: EventID) -> None:
|
|
if self.peer_type not in ("chat", "channel"):
|
|
return
|
|
peer = await self.get_input_entity(sender)
|
|
await sender.client(EditChatAboutRequest(peer=peer, about=about))
|
|
self.about = about
|
|
await self.save()
|
|
await self._send_delivery_receipt(event_id)
|
|
|
|
async def handle_matrix_title(self, sender: u.User, title: str, event_id: EventID) -> None:
|
|
if self.peer_type not in ("chat", "channel"):
|
|
return
|
|
|
|
if self.peer_type == "chat":
|
|
response = await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title))
|
|
else:
|
|
channel = await self.get_input_entity(sender)
|
|
response = await sender.client(EditTitleRequest(channel=channel, title=title))
|
|
self.dedup.register_outgoing_actions(response)
|
|
self.title = title
|
|
await self.save()
|
|
await self._send_delivery_receipt(event_id)
|
|
await self.update_bridge_info()
|
|
|
|
async def handle_matrix_avatar(
|
|
self, sender: u.User, url: ContentURI, event_id: EventID
|
|
) -> None:
|
|
if self.peer_type not in ("chat", "channel"):
|
|
# Invalid peer type
|
|
return
|
|
elif self.avatar_url == url:
|
|
return
|
|
|
|
self.avatar_url = url
|
|
file = await self.main_intent.download_media(url)
|
|
mime = magic.mimetype(file)
|
|
ext = sane_mimetypes.guess_extension(mime)
|
|
uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}")
|
|
photo = InputChatUploadedPhoto(file=uploaded)
|
|
|
|
if self.peer_type == "chat":
|
|
response = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo))
|
|
else:
|
|
channel = await self.get_input_entity(sender)
|
|
response = await sender.client(EditPhotoRequest(channel=channel, photo=photo))
|
|
self.dedup.register_outgoing_actions(response)
|
|
for update in response.updates:
|
|
is_photo_update = (
|
|
isinstance(update, UpdateNewMessage)
|
|
and isinstance(update.message, MessageService)
|
|
and isinstance(update.message.action, MessageActionChatEditPhoto)
|
|
)
|
|
if is_photo_update:
|
|
loc, size = self._msg_conv.get_largest_photo_size(update.message.action.photo)
|
|
self.photo_id = str(loc.id)
|
|
await self.save()
|
|
break
|
|
await self._send_delivery_receipt(event_id)
|
|
await self.update_bridge_info()
|
|
|
|
async def handle_matrix_upgrade(
|
|
self, sender: UserID, new_room: RoomID, event_id: EventID
|
|
) -> None:
|
|
_, server = self.main_intent.parse_user_id(sender)
|
|
old_room = self.mxid
|
|
await self.migrate_and_save_matrix(new_room)
|
|
await self.main_intent.join_room(new_room, servers=[server])
|
|
entity: TypeChat | User | None = None
|
|
user: au.AbstractUser | None = None
|
|
if self.bot and self.has_bot:
|
|
user = self.bot
|
|
entity = await self.get_entity(self.bot)
|
|
if not entity:
|
|
user_mxids = await self.main_intent.get_room_members(self.mxid)
|
|
for user_str in user_mxids:
|
|
user_id = UserID(user_str)
|
|
if user_id == self.az.bot_mxid:
|
|
continue
|
|
user = await u.User.get_by_mxid(user_id, create=False)
|
|
if user and user.tgid:
|
|
entity = await self.get_entity(user)
|
|
if entity:
|
|
break
|
|
if not entity:
|
|
self.log.error(
|
|
"Failed to fully migrate to upgraded Matrix room: no Telegram user found."
|
|
)
|
|
return
|
|
await self.update_matrix_room(user, entity)
|
|
self.log.info(f"{sender} upgraded room from {old_room} to {self.mxid}")
|
|
await self._send_delivery_receipt(event_id, room_id=old_room)
|
|
|
|
async def migrate_and_save_matrix(self, new_id: RoomID) -> None:
|
|
try:
|
|
del self.by_mxid[self.mxid]
|
|
except KeyError:
|
|
pass
|
|
self.mxid = new_id
|
|
self.next_batch_id = None
|
|
self.first_event_id = None
|
|
self.by_mxid[self.mxid] = self
|
|
await self.save()
|
|
|
|
# endregion
|
|
# region Telegram -> Matrix bridging
|
|
|
|
async def handle_telegram_typing(self, user: p.Puppet, update: UpdateTyping) -> None:
|
|
if user.is_real_user:
|
|
# Ignore typing notifications from double puppeted users to avoid echoing
|
|
return
|
|
is_typing = isinstance(update.action, SendMessageTypingAction)
|
|
await user.default_mxid_intent.set_typing(self.mxid, timeout=5000 if is_typing else 0)
|
|
|
|
async def handle_telegram_edit(
|
|
self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
|
|
) -> None:
|
|
if not self.mxid:
|
|
self.log.debug("Ignoring edit to %d as chat has no Matrix room", evt.id)
|
|
return
|
|
elif hasattr(evt, "media") and isinstance(evt.media, MessageMediaGame):
|
|
self.log.debug("Ignoring game message edit event")
|
|
return
|
|
|
|
if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None:
|
|
background_task.create(
|
|
self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
|
|
)
|
|
sender_id = sender.tgid if sender else self.tgid
|
|
|
|
async with self.send_lock(sender_id, required=False):
|
|
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
|
|
|
temporary_identifier = EventID(
|
|
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP"
|
|
)
|
|
event_hash, duplicate_found = self.dedup.check(
|
|
evt, (temporary_identifier, tg_space), force_hash=True
|
|
)
|
|
if duplicate_found:
|
|
mxid, other_tg_space = duplicate_found
|
|
if tg_space != other_tg_space:
|
|
prev_edit_msg = await DBMessage.get_one_by_tgid(
|
|
TelegramID(evt.id), tg_space, edit_index=-1
|
|
)
|
|
if (
|
|
not prev_edit_msg
|
|
or prev_edit_msg.mxid == mxid
|
|
or prev_edit_msg.content_hash == event_hash
|
|
):
|
|
return
|
|
await DBMessage(
|
|
mxid=mxid,
|
|
mx_room=self.mxid,
|
|
tg_space=tg_space,
|
|
tgid=TelegramID(evt.id),
|
|
edit_index=prev_edit_msg.edit_index + 1,
|
|
content_hash=event_hash,
|
|
sender=sender_id,
|
|
).insert()
|
|
return
|
|
|
|
editing_msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
|
|
if not editing_msg:
|
|
self.log.info(
|
|
f"Didn't find edited message {evt.id}@{tg_space} (src {source.tgid}) "
|
|
"in database."
|
|
)
|
|
return
|
|
prev_edit_msg = (
|
|
await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
|
|
)
|
|
if prev_edit_msg.content_hash == event_hash:
|
|
self.log.debug(
|
|
f"Ignoring edit of message {evt.id}@{tg_space} (src {source.tgid}):"
|
|
" content hash didn't change"
|
|
)
|
|
await DBMessage.delete_temp_mxid(temporary_identifier, self.mxid)
|
|
return
|
|
|
|
intent = sender.intent_for(self) if sender else self.main_intent
|
|
is_bot = sender.is_bot if sender else False
|
|
converted = await self._msg_conv.convert(
|
|
source, intent, is_bot, self.is_channel, evt, no_reply_fallback=True
|
|
)
|
|
converted.content.set_edit(editing_msg.mxid)
|
|
await intent.set_typing(self.mxid, timeout=0)
|
|
timestamp = evt.edit_date if evt.edit_date != evt.date else None
|
|
event_id = await self._send_message(
|
|
intent, converted.content, timestamp=timestamp, event_type=converted.type
|
|
)
|
|
|
|
await DBMessage(
|
|
mxid=event_id,
|
|
mx_room=self.mxid,
|
|
tg_space=tg_space,
|
|
tgid=TelegramID(evt.id),
|
|
edit_index=prev_edit_msg.edit_index + 1,
|
|
content_hash=event_hash,
|
|
sender=sender_id,
|
|
).insert()
|
|
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
|
|
|
|
@property
|
|
def _backfill_config_type(self) -> str:
|
|
if self.peer_type == "user":
|
|
return "user"
|
|
elif self.peer_type == "chat":
|
|
return "normal_group"
|
|
elif self.megagroup:
|
|
return "supergroup"
|
|
else:
|
|
return "channel"
|
|
|
|
@property
|
|
def _default_max_batches(self) -> int:
|
|
return self.config[f"bridge.backfill.incremental.max_batches.{self._backfill_config_type}"]
|
|
|
|
@property
|
|
def _enable_batch_sending(self) -> bool:
|
|
return self.bridge.matrix.versions.supports("com.beeper.batch_sending")
|
|
|
|
async def enqueue_backfill(
|
|
self,
|
|
source: u.User,
|
|
priority: int,
|
|
max_batches: int | None = None,
|
|
messages_per_batch: int | None = None,
|
|
anchor_msg_id: int | None = None,
|
|
extra_data: dict[str, Any] | None = None,
|
|
type: BackfillType = BackfillType.HISTORICAL,
|
|
) -> None:
|
|
new_backfill = Backfill.new(
|
|
user_mxid=source.mxid,
|
|
priority=priority,
|
|
type=type,
|
|
portal_tgid=self.tgid,
|
|
portal_tg_receiver=self.tg_receiver,
|
|
anchor_msg_id=anchor_msg_id,
|
|
extra_data=extra_data,
|
|
messages_per_batch=(
|
|
messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"]
|
|
),
|
|
post_batch_delay=self.config["bridge.backfill.incremental.post_batch_delay"],
|
|
max_batches=max_batches or self._default_max_batches,
|
|
)
|
|
deleted_entries = await new_backfill.insert()
|
|
if deleted_entries:
|
|
self.log.debug(
|
|
"Deleted backfill queue entries while inserting new item: %s", deleted_entries
|
|
)
|
|
source.wakeup_backfill_task.set()
|
|
|
|
async def forward_backfill(
|
|
self,
|
|
source: u.User,
|
|
initial: bool,
|
|
last_tgid: int | None = None,
|
|
override_limit: int | None = None,
|
|
client: MautrixTelegramClient | None = None,
|
|
) -> str:
|
|
if not client:
|
|
client = source.client
|
|
type = "initial" if initial else "sync"
|
|
limit = (
|
|
override_limit
|
|
or self.config[f"bridge.backfill.forward_limits.{type}.{self._backfill_config_type}"]
|
|
)
|
|
if limit == 0:
|
|
return "Limit is zero, not backfilling"
|
|
timeout = self.config["bridge.backfill.forward_timeout"]
|
|
with self.backfill_lock:
|
|
task = self.backfill(
|
|
source, client, forward=True, forward_limit=limit, last_tgid=last_tgid
|
|
)
|
|
if timeout > 0:
|
|
output = await asyncio.wait_for(task, timeout=timeout)
|
|
else:
|
|
output = await task
|
|
self.log.debug(f"Forward backfill complete, status: {output}")
|
|
return output
|
|
|
|
async def backfill(
|
|
self,
|
|
source: u.User,
|
|
client: MautrixTelegramClient,
|
|
req: Backfill | None = None,
|
|
forward: bool = False,
|
|
forward_limit: int | None = None,
|
|
last_tgid: int | None = None,
|
|
) -> str:
|
|
if not self.backfill_enable:
|
|
return "Backfilling is disabled in the bridge config"
|
|
async with self.backfill_method_lock:
|
|
return await self._locked_backfill(
|
|
source, client, req, forward, forward_limit, last_tgid
|
|
)
|
|
|
|
async def _locked_backfill(
|
|
self,
|
|
source: u.User,
|
|
client: MautrixTelegramClient,
|
|
req: Backfill | None = None,
|
|
forward: bool = False,
|
|
forward_limit: int | None = None,
|
|
last_tgid: int | None = None,
|
|
) -> str:
|
|
assert forward != bool(req)
|
|
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
|
|
return "Backfilling normal groups is disabled in the bridge config"
|
|
tg_space = source.tgid if self.peer_type != "channel" else self.tgid
|
|
if forward:
|
|
last_in_room = await DBMessage.find_last(self.mxid, tg_space)
|
|
min_id = last_in_room.tgid if last_in_room else 0
|
|
if last_tgid is None:
|
|
messages = await source.client.get_messages(self.peer, limit=1)
|
|
if not messages:
|
|
return "Chat is empty, nothing to backfill"
|
|
last_tgid = messages[0].id
|
|
if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"):
|
|
return (
|
|
f"Last bridged message {min_id} is equal to or greater than last message "
|
|
f"in Telegram chat {last_tgid}, nothing to backfill"
|
|
)
|
|
limit = last_tgid - min_id
|
|
if (forward_limit or 0) > 0:
|
|
limit = min(limit, forward_limit)
|
|
self.log.debug(
|
|
f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} "
|
|
f"(last message: {last_tgid})"
|
|
)
|
|
anchor_id = min_id
|
|
else:
|
|
limit = req.messages_per_batch
|
|
first_in_room = await DBMessage.find_first(self.mxid, tg_space)
|
|
anchor_id = first_in_room.tgid if first_in_room else None
|
|
anchor_source = "lowest in chat"
|
|
if req.anchor_msg_id and req.anchor_msg_id < anchor_id:
|
|
anchor_source = "backfill queue anchor"
|
|
anchor_id = req.anchor_msg_id
|
|
self.log.debug(
|
|
f"Backfilling up to {req.messages_per_batch} historical messages "
|
|
f"before {anchor_id} ({anchor_source}) through {source.mxid}"
|
|
)
|
|
event_count, message_count, lowest_id = await self._backfill_messages(
|
|
source, client, forward, anchor_id, limit
|
|
)
|
|
await self.save()
|
|
if forward:
|
|
self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events")
|
|
elif message_count > 0 and lowest_id and lowest_id > 1:
|
|
if req.max_batches in (0, 1):
|
|
self.log.debug(f"Backfilled enough through {source.mxid}, not enqueuing more")
|
|
return "Already backfilled enough batches, not enqueuing more"
|
|
self.log.debug(f"Enqueuing more backfill through {source.mxid}")
|
|
await self.enqueue_backfill(
|
|
source,
|
|
priority=max(100, req.priority + 1),
|
|
messages_per_batch=req.messages_per_batch,
|
|
max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1),
|
|
anchor_msg_id=lowest_id,
|
|
)
|
|
else:
|
|
self.log.debug("No more messages to backfill")
|
|
return f"Backfilled {event_count} messages"
|
|
|
|
async def _convert_batch_msg(
|
|
self,
|
|
source: u.User,
|
|
client: MautrixTelegramClient,
|
|
msg: Message,
|
|
) -> tuple[putil.ConvertedMessage, IntentAPI]:
|
|
if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
|
|
sender = await p.Puppet.get_by_peer(msg.from_id)
|
|
elif isinstance(msg.peer_id, PeerUser):
|
|
if msg.out:
|
|
sender = await p.Puppet.get_by_tgid(source.tgid)
|
|
else:
|
|
sender = await p.Puppet.get_by_peer(msg.peer_id)
|
|
else:
|
|
sender = None
|
|
if sender:
|
|
intent = sender.intent_for(self)
|
|
if not sender.displayname:
|
|
entity = await client.get_entity(sender.peer)
|
|
await sender.update_info(source, entity, client_override=client)
|
|
else:
|
|
intent = self.main_intent
|
|
if (
|
|
intent.api.is_real_user
|
|
and not intent.api.is_real_user_as_token
|
|
and not self._enable_batch_sending
|
|
):
|
|
intent = sender.default_mxid_intent
|
|
is_bot = sender.is_bot if sender else False
|
|
converted = await self._msg_conv.convert(
|
|
source,
|
|
intent,
|
|
is_bot,
|
|
self.is_channel,
|
|
msg,
|
|
client=client,
|
|
deterministic_reply_id=self.bridge.homeserver_software.is_hungry,
|
|
)
|
|
return converted, intent
|
|
|
|
async def _wrap_batch_msg(
|
|
self,
|
|
intent: IntentAPI,
|
|
msg: Message,
|
|
converted: putil.ConvertedMessage,
|
|
caption: bool = False,
|
|
event_id: EventID | None = None,
|
|
) -> BatchSendEvent:
|
|
if caption:
|
|
content = converted.caption
|
|
event_type = EventType.ROOM_MESSAGE
|
|
else:
|
|
content = converted.content
|
|
event_type = converted.type
|
|
if self.encrypted and self.matrix.e2ee:
|
|
event_type, content = await self.matrix.e2ee.encrypt(self.mxid, event_type, content)
|
|
if intent.api.is_real_user:
|
|
content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
|
|
return BatchSendEvent(
|
|
sender=intent.mxid,
|
|
timestamp=int(msg.date.timestamp() * 1000),
|
|
content=content,
|
|
type=event_type,
|
|
event_id=event_id,
|
|
)
|
|
|
|
async def _backfill_messages(
|
|
self,
|
|
source: u.User,
|
|
client: MautrixTelegramClient,
|
|
forward: bool,
|
|
anchor_id: int,
|
|
limit: int,
|
|
) -> tuple[int, int, TelegramID]:
|
|
entity = await self.get_input_entity(source)
|
|
events = []
|
|
intents = []
|
|
metas = []
|
|
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
|
|
|
lowest_id = 0
|
|
first_id_found = False
|
|
first_id = anchor_id
|
|
message_count = 0
|
|
minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id}
|
|
if not forward and not anchor_id:
|
|
anchor_id = 2**31 - 1
|
|
minmax = {}
|
|
self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}")
|
|
delay_warn_handle = self.loop.call_later(
|
|
5 * 60, lambda: self.log.warning("Iterating messages is taking long")
|
|
)
|
|
# Iterate messages newest to oldest and collect the results
|
|
async for msg in client.iter_messages(entity, limit=limit, **minmax):
|
|
message_count += 1
|
|
if message_count == 1:
|
|
self.log.debug(f"Backfill iter: got first message {msg.id}")
|
|
elif message_count % 50 == 0:
|
|
self.log.debug(f"Backfill iter: got {message_count} messages so far (at {msg.id})")
|
|
if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id):
|
|
continue
|
|
elif isinstance(msg, MessageService):
|
|
# TODO some service messages can be backfilled
|
|
continue
|
|
if not lowest_id or msg.id < lowest_id:
|
|
lowest_id = msg.id
|
|
if not first_id_found:
|
|
first_id = msg.id
|
|
first_id_found = True
|
|
|
|
converted, intent = await self._convert_batch_msg(source, client, msg)
|
|
if converted is None:
|
|
continue
|
|
d_event_id = None
|
|
if self.bridge.homeserver_software.is_hungry:
|
|
d_event_id = self._msg_conv.deterministic_event_id(tg_space, msg.id)
|
|
events.append(await self._wrap_batch_msg(intent, msg, converted, event_id=d_event_id))
|
|
intents.append(intent)
|
|
metas.append(msg)
|
|
if converted.caption:
|
|
events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True))
|
|
intents.append(intent)
|
|
metas.append(None)
|
|
delay_warn_handle.cancel()
|
|
if len(events) == 0:
|
|
self.log.debug(
|
|
f"Didn't get any events to send out of {message_count} messages fetched "
|
|
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
|
)
|
|
return 0, message_count, lowest_id
|
|
self.log.debug(
|
|
f"Got {len(events)} events to send out of {message_count} messages fetched "
|
|
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
|
)
|
|
if self._enable_batch_sending:
|
|
resp = await self.main_intent.beeper_batch_send(
|
|
self.mxid,
|
|
# We iterated the events in reverse chronological order,
|
|
# so reverse them before sending
|
|
events=list(reversed(events)),
|
|
forward=forward,
|
|
)
|
|
event_ids = resp.event_ids
|
|
else:
|
|
event_ids = [
|
|
await intent.send_message_event(
|
|
self.mxid, evt.type, evt.content, timestamp=evt.timestamp
|
|
)
|
|
for evt, intent in zip(reversed(events), reversed(intents))
|
|
]
|
|
tg_space = source.tgid if self.peer_type != "channel" else self.tgid
|
|
await DBMessage.bulk_insert(
|
|
[
|
|
DBMessage(
|
|
mxid=event_id,
|
|
mx_room=self.mxid,
|
|
tgid=msg.id,
|
|
tg_space=tg_space,
|
|
edit_index=0,
|
|
content_hash=self.dedup.hash_event(msg),
|
|
# TODO sender
|
|
)
|
|
# Original arrays are in reverse chronological order, but event IDs are
|
|
# chronological (because we reversed the original messages list before sending)
|
|
for event_id, msg in zip(event_ids, reversed(metas))
|
|
if msg is not None
|
|
]
|
|
)
|
|
return len(events), message_count, lowest_id
|
|
|
|
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
|
reactions = []
|
|
for item in counts:
|
|
if item.count == 2:
|
|
reactions += [
|
|
MessagePeerReaction(
|
|
reaction=item.reaction, peer_id=PeerUser(self.tgid), date=None
|
|
),
|
|
MessagePeerReaction(
|
|
reaction=item.reaction,
|
|
peer_id=PeerUser(self.tg_receiver),
|
|
date=None,
|
|
),
|
|
]
|
|
elif item.count == 1:
|
|
reactions.append(
|
|
MessagePeerReaction(
|
|
reaction=item.reaction,
|
|
peer_id=PeerUser(self.tg_receiver if item.chosen_order else self.tgid),
|
|
date=None,
|
|
)
|
|
)
|
|
return reactions
|
|
|
|
async def _poll_telegram_reactions(self, source: au.AbstractUser) -> None:
|
|
now = time.monotonic()
|
|
if self._prev_reaction_poll[source.mxid] + REACTION_POLL_MIN_INTERVAL > now:
|
|
self.log.trace(
|
|
f"Not polling reactions through {source.mxid}, "
|
|
f"last poll was less than {REACTION_POLL_MIN_INTERVAL} seconds ago"
|
|
)
|
|
return
|
|
self._prev_reaction_poll[source.mxid] = now
|
|
self.log.debug(f"Polling reactions for recent messages through {source.mxid}")
|
|
messages = await DBMessage.find_recent(self.mxid, source.tgid)
|
|
message_ids = [message.tgid for message in messages]
|
|
updates = await source.client(GetMessagesReactionsRequest(peer=self.peer, id=message_ids))
|
|
for user in updates.users:
|
|
user: User
|
|
puppet = await p.Puppet.get_by_tgid(TelegramID(user.id))
|
|
await puppet.update_info(source, user)
|
|
for upd in updates.updates:
|
|
if isinstance(upd, UpdateMessageReactions):
|
|
await self.handle_telegram_reactions(source, TelegramID(upd.msg_id), upd.reactions)
|
|
else:
|
|
self.log.warning(f"Unexpected update type {type(upd)} in get reactions response")
|
|
|
|
async def try_handle_telegram_reactions(
|
|
self,
|
|
source: au.AbstractUser,
|
|
msg_id: TelegramID,
|
|
data: MessageReactions,
|
|
dbm: DBMessage | None = None,
|
|
timestamp: datetime | None = None,
|
|
) -> None:
|
|
try:
|
|
await self.handle_telegram_reactions(source, msg_id, data, dbm, timestamp)
|
|
except Exception:
|
|
self.log.exception(f"Error handling reactions in message {msg_id}")
|
|
|
|
async def handle_telegram_reactions(
|
|
self,
|
|
source: au.AbstractUser,
|
|
msg_id: TelegramID,
|
|
data: MessageReactions,
|
|
dbm: DBMessage | None = None,
|
|
timestamp: datetime | None = None,
|
|
) -> None:
|
|
total_count = sum(item.count for item in data.results)
|
|
recent_reactions = data.recent_reactions or []
|
|
if total_count > 0 and not recent_reactions and not data.can_see_list:
|
|
# We don't know who reacted in a channel, so we can't bridge it properly either
|
|
return
|
|
if self.peer_type == "channel" and not self.megagroup:
|
|
# This should never happen with the previous if
|
|
self.log.warning(f"Can see reaction list in channel ({data!s})")
|
|
# return
|
|
|
|
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
|
if dbm is None:
|
|
dbm = await DBMessage.get_one_by_tgid(msg_id, tg_space)
|
|
if dbm is None:
|
|
return
|
|
|
|
if not recent_reactions or len(recent_reactions) < total_count:
|
|
if self.peer_type == "user":
|
|
recent_reactions = self._split_dm_reaction_counts(data.results)
|
|
elif source.is_bot:
|
|
# Can't fetch exact reaction senders as a bot
|
|
return
|
|
else:
|
|
# TODO should calls to this be limited?
|
|
resp = await source.client(
|
|
GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=100)
|
|
)
|
|
recent_reactions = resp.reactions
|
|
|
|
async with self.reaction_lock(dbm.mxid):
|
|
await self._handle_telegram_user_reactions_locked(
|
|
source, dbm, recent_reactions, total_count, timestamp=timestamp
|
|
)
|
|
|
|
async def handle_telegram_bot_reactions(
|
|
self, source: au.AbstractUser, update: UpdateBotMessageReaction
|
|
) -> None:
|
|
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
|
dbm = await DBMessage.get_one_by_tgid(TelegramID(update.msg_id), tg_space)
|
|
if dbm is None:
|
|
return
|
|
reactions: dict[TelegramID, list[WrappedReaction]] = {}
|
|
custom_emoji_ids: list[int] = []
|
|
if isinstance(update.actor, PeerUser):
|
|
user_id = TelegramID(update.actor.user_id)
|
|
elif isinstance(update.actor, PeerChannel):
|
|
user_id = TelegramID(update.actor.channel_id)
|
|
else:
|
|
return
|
|
for reaction in update.new_reactions:
|
|
reactions.setdefault(user_id, []).append(WrappedReaction(reaction=reaction, date=None))
|
|
async with self.reaction_lock(dbm.mxid):
|
|
await self._handle_telegram_parsed_reactions_locked(
|
|
source,
|
|
dbm,
|
|
reactions,
|
|
custom_emoji_ids,
|
|
is_full=True,
|
|
only_user_id=user_id,
|
|
timestamp=update.date,
|
|
)
|
|
|
|
@staticmethod
|
|
def _reactions_filter(lst: list[WrappedReaction], existing: DBReaction) -> bool:
|
|
if not lst:
|
|
return False
|
|
for wrapped_reaction in lst:
|
|
reaction = wrapped_reaction.reaction
|
|
if isinstance(reaction, ReactionCustomEmoji) and existing.reaction == str(
|
|
reaction.document_id
|
|
):
|
|
lst.remove(wrapped_reaction)
|
|
return True
|
|
elif isinstance(reaction, ReactionEmoji) and existing.reaction == reaction.emoticon:
|
|
lst.remove(wrapped_reaction)
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
async def _get_reaction_limit(source: au.AbstractUser, sender: TelegramID) -> int:
|
|
puppet = await p.Puppet.get_by_tgid(sender, create=False)
|
|
is_premium = puppet and puppet.is_premium
|
|
if isinstance(source, u.User) and not source.is_bot:
|
|
return await source.get_max_reactions(is_premium)
|
|
return 3 if is_premium else 1
|
|
|
|
async def _handle_telegram_user_reactions_locked(
|
|
self,
|
|
source: au.AbstractUser,
|
|
msg: DBMessage,
|
|
reaction_list: list[MessagePeerReaction],
|
|
total_count: int,
|
|
timestamp: datetime | None = None,
|
|
) -> None:
|
|
reactions: dict[TelegramID, list[WrappedReaction]] = {}
|
|
custom_emoji_ids: list[int] = []
|
|
for reaction in reaction_list:
|
|
if isinstance(reaction.peer_id, (PeerUser, PeerChannel)) and isinstance(
|
|
reaction.reaction, (ReactionEmoji, ReactionCustomEmoji)
|
|
):
|
|
sender_user_id = p.Puppet.get_id_from_peer(reaction.peer_id)
|
|
reactions.setdefault(sender_user_id, []).append(
|
|
WrappedReaction(reaction.reaction, reaction.date)
|
|
)
|
|
if isinstance(reaction.reaction, ReactionCustomEmoji):
|
|
custom_emoji_ids.append(reaction.reaction.document_id)
|
|
is_full = len(reaction_list) == total_count
|
|
await self._handle_telegram_parsed_reactions_locked(
|
|
source,
|
|
msg,
|
|
reactions,
|
|
custom_emoji_ids,
|
|
is_full=is_full,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
async def _handle_telegram_parsed_reactions_locked(
|
|
self,
|
|
source: au.AbstractUser,
|
|
msg: DBMessage,
|
|
reactions: dict[TelegramID, list[WrappedReaction]],
|
|
custom_emoji_ids: list[int],
|
|
is_full: bool,
|
|
only_user_id: TelegramID | None = None,
|
|
timestamp: datetime | None = None,
|
|
) -> None:
|
|
custom_emojis = await util.transfer_custom_emojis_to_matrix(source, custom_emoji_ids)
|
|
|
|
existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)
|
|
|
|
removed: list[DBReaction] = []
|
|
for existing_reaction in existing_reactions:
|
|
sender_id = existing_reaction.tg_sender
|
|
if only_user_id is not None and sender_id != only_user_id:
|
|
continue
|
|
new_reactions = reactions.get(sender_id)
|
|
if self._reactions_filter(new_reactions, existing_reaction):
|
|
if new_reactions is not None and len(new_reactions) == 0:
|
|
reactions.pop(sender_id)
|
|
else:
|
|
if is_full or (
|
|
new_reactions is not None
|
|
and len(new_reactions) == await self._get_reaction_limit(source, sender_id)
|
|
):
|
|
removed.append(existing_reaction)
|
|
# else: assume the reaction is still there, too much effort to fetch it
|
|
|
|
new_reaction: TypeReaction
|
|
for sender, new_reactions in reactions.items():
|
|
for new_wrapped_reaction in new_reactions:
|
|
new_reaction = new_wrapped_reaction.reaction
|
|
if isinstance(new_reaction, ReactionEmoji):
|
|
emoji_id = new_reaction.emoticon
|
|
matrix_reaction = variation_selector.add(new_reaction.emoticon)
|
|
elif isinstance(new_reaction, ReactionCustomEmoji):
|
|
emoji_id = str(new_reaction.document_id)
|
|
custom_emoji = custom_emojis[new_reaction.document_id]
|
|
if isinstance(custom_emoji, util.UnicodeCustomEmoji):
|
|
matrix_reaction = custom_emoji.emoji
|
|
else:
|
|
matrix_reaction = custom_emoji.mxc
|
|
else:
|
|
self.log.warning("Unknown reaction type %s", type(new_reaction))
|
|
continue
|
|
self.log.debug(f"Bridging reaction {emoji_id} by {sender} to {msg.tgid}")
|
|
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
|
|
mxid = await puppet.intent_for(self).react(
|
|
msg.mx_room,
|
|
msg.mxid,
|
|
matrix_reaction,
|
|
timestamp=new_wrapped_reaction.date or timestamp,
|
|
)
|
|
await DBReaction(
|
|
mxid=mxid,
|
|
mx_room=msg.mx_room,
|
|
msg_mxid=msg.mxid,
|
|
tg_sender=sender,
|
|
reaction=emoji_id,
|
|
).save()
|
|
for removed_reaction in removed:
|
|
self.log.debug(
|
|
f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
|
|
f"to {msg.tgid}"
|
|
)
|
|
puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
|
|
await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
|
|
await removed_reaction.delete()
|
|
|
|
async def handle_telegram_message(
|
|
self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
|
|
) -> None:
|
|
try:
|
|
await self._handle_telegram_message(source, sender, evt)
|
|
except Exception:
|
|
sender_id = sender.tgid if sender else None
|
|
self.log.exception(
|
|
f"Failed to handle Telegram message {evt.id} from {sender_id} via {source.tgid}"
|
|
)
|
|
if self.config["bridge.incoming_bridge_error_reports"]:
|
|
intent = sender.intent_for(self) if sender else self.main_intent
|
|
await self._send_message(
|
|
intent,
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
body="Error processing message from Telegram",
|
|
),
|
|
)
|
|
|
|
async def _handle_telegram_message(
|
|
self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
|
|
) -> None:
|
|
if not self.mxid:
|
|
if source.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
|
|
return
|
|
self.log.debug("Got telegram message %d, but no room exists, creating...", evt.id)
|
|
await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
|
|
if not self.mxid:
|
|
self.log.warning("Room doesn't exist even after creating, dropping %d", evt.id)
|
|
return
|
|
|
|
if (
|
|
self.peer_type == "user"
|
|
and sender
|
|
and sender.tgid == self.tg_receiver
|
|
and not sender.is_real_user
|
|
and not await self.az.state_store.is_joined(self.mxid, sender.mxid)
|
|
):
|
|
self.log.debug(
|
|
f"Ignoring private chat message {evt.id}@{source.tgid} as receiver does"
|
|
" not have matrix puppeting and their default puppet isn't in the room"
|
|
)
|
|
return
|
|
|
|
sender_id = sender.tgid if sender else self.tgid
|
|
async with self.send_lock(sender_id, required=False):
|
|
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
|
|
|
temporary_identifier = EventID(
|
|
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP"
|
|
)
|
|
event_hash, duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
|
|
if duplicate_found:
|
|
mxid, other_tg_space = duplicate_found
|
|
self.log.debug(
|
|
f"Ignoring message {evt.id}@{tg_space} (src {source.tgid}) "
|
|
f"as it was already handled (in space {other_tg_space})"
|
|
)
|
|
if tg_space != other_tg_space:
|
|
await DBMessage(
|
|
tgid=TelegramID(evt.id),
|
|
mx_room=self.mxid,
|
|
mxid=mxid,
|
|
tg_space=tg_space,
|
|
edit_index=0,
|
|
content_hash=event_hash,
|
|
sender=sender_id,
|
|
).insert()
|
|
return
|
|
|
|
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
|
|
if msg:
|
|
self.log.debug(
|
|
f"Ignoring message {evt.id} (src {source.tgid}) as it was already "
|
|
f"handled into {msg.mxid}."
|
|
)
|
|
return
|
|
|
|
self.log.debug(
|
|
"Handling Telegram message %d@%d from %s (ts: %s)",
|
|
evt.id,
|
|
tg_space,
|
|
sender_id,
|
|
evt.date,
|
|
)
|
|
self.log.trace("Message content: %s", evt)
|
|
|
|
if sender and not sender.displayname:
|
|
self.log.debug(
|
|
f"Telegram user {sender.tgid} sent a message, but doesn't have a displayname,"
|
|
" updating info..."
|
|
)
|
|
try:
|
|
entity = await source.client.get_entity(sender.peer)
|
|
await sender.update_info(source, entity)
|
|
if not sender.displayname:
|
|
self.log.debug(
|
|
f"Telegram user {sender.tgid} doesn't have a displayname even after"
|
|
f" updating with data {entity!s}"
|
|
)
|
|
except ValueError as e:
|
|
self.log.warning(
|
|
f"Couldn't find entity to update profile of {sender.tgid}", exc_info=True
|
|
)
|
|
|
|
if sender:
|
|
# TODO don't use double puppet when backfilling
|
|
intent = sender.intent_for(self)
|
|
else:
|
|
intent = self.main_intent
|
|
is_bot = sender.is_bot if sender else False
|
|
converted = await self._msg_conv.convert(source, intent, is_bot, self.is_channel, evt)
|
|
if not converted:
|
|
return
|
|
await intent.set_typing(self.mxid, timeout=0)
|
|
event_id = await self._send_message(
|
|
intent, converted.content, timestamp=evt.date, event_type=converted.type
|
|
)
|
|
caption_id = None
|
|
if converted.caption:
|
|
caption_id = await self._send_message(intent, converted.caption, timestamp=evt.date)
|
|
|
|
self._new_messages_after_sponsored = True
|
|
|
|
another_event_hash, prev_id = self.dedup.update(
|
|
evt, (event_id, tg_space), (temporary_identifier, tg_space)
|
|
)
|
|
assert another_event_hash == event_hash
|
|
if prev_id:
|
|
self.log.debug(
|
|
f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
|
|
f"Temporary dedup identifier was {temporary_identifier}, "
|
|
f"but dedup map contained {prev_id[1]} instead! -- "
|
|
"This was probably a race condition caused by Telegram sending updates"
|
|
"to other clients before responding to the sender. I'll just redact "
|
|
"the likely duplicate message now."
|
|
)
|
|
await intent.redact(self.mxid, event_id)
|
|
return
|
|
|
|
self.log.debug("Handled Telegram message %d@%d -> %s", evt.id, tg_space, event_id)
|
|
try:
|
|
dbm = DBMessage(
|
|
tgid=TelegramID(evt.id),
|
|
mx_room=self.mxid,
|
|
mxid=event_id,
|
|
tg_space=tg_space,
|
|
edit_index=0,
|
|
content_hash=event_hash,
|
|
sender=sender_id,
|
|
)
|
|
await dbm.insert()
|
|
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
|
|
except (IntegrityError, UniqueViolationError) as e:
|
|
self.log.error(
|
|
f"{type(e).__name__} while saving message mapping {evt.id}@{tg_space} "
|
|
f"-> {event_id}: {e}"
|
|
)
|
|
await intent.redact(self.mxid, event_id)
|
|
return
|
|
if isinstance(evt, Message) and evt.reactions:
|
|
background_task.create(
|
|
self.try_handle_telegram_reactions(
|
|
source, dbm.tgid, evt.reactions, dbm=dbm, timestamp=evt.date
|
|
)
|
|
)
|
|
await self._send_delivery_receipt(event_id)
|
|
if converted.disappear_seconds:
|
|
if converted.disappear_start_immediately:
|
|
expires_at = int(evt.date.timestamp()) + converted.disappear_seconds
|
|
else:
|
|
expires_at = None
|
|
await self._mark_disappearing(event_id, converted.disappear_seconds, expires_at)
|
|
if caption_id:
|
|
await self._mark_disappearing(caption_id, converted.disappear_seconds, expires_at)
|
|
|
|
async def _mark_disappearing(
|
|
self, event_id: EventID, seconds: int, expires_at: int | None
|
|
) -> None:
|
|
dm = DisappearingMessage(
|
|
self.mxid, event_id, seconds, expiration_ts=expires_at * 1000 if expires_at else None
|
|
)
|
|
await dm.insert()
|
|
if expires_at:
|
|
background_task.create(self._disappear_event(dm))
|
|
|
|
async def _create_room_on_action(
|
|
self, source: au.AbstractUser, action: TypeMessageAction
|
|
) -> bool:
|
|
if source.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
|
|
return False
|
|
create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
|
|
create_and_continue = (
|
|
MessageActionChatAddUser,
|
|
MessageActionChatJoinedByLink,
|
|
MessageActionChatJoinedByRequest,
|
|
)
|
|
if isinstance(action, create_and_exit) or isinstance(action, create_and_continue):
|
|
self.log.debug(
|
|
f"Got telegram action of type {type(action).__name__},"
|
|
" but no room exists, creating..."
|
|
)
|
|
await self.create_matrix_room(
|
|
source, invites=[source.mxid], update_if_exists=isinstance(action, create_and_exit)
|
|
)
|
|
if not isinstance(action, create_and_continue):
|
|
return False
|
|
return True
|
|
|
|
async def handle_telegram_direct_call(
|
|
self, source: au.AbstractUser, sender: p.Puppet, update: UpdatePhoneCall
|
|
) -> None:
|
|
if isinstance(update.phone_call, PhoneCallRequested):
|
|
call_type = "video call" if update.phone_call.video else "call"
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(msgtype=MessageType.EMOTE, body=f"started a {call_type}"),
|
|
)
|
|
|
|
async def handle_telegram_action(
|
|
self, source: au.AbstractUser, sender: p.Puppet | None, update: MessageService
|
|
) -> None:
|
|
action = update.action
|
|
should_ignore = (
|
|
not self.mxid and not await self._create_room_on_action(source, action)
|
|
) or self.dedup.check_action(update)
|
|
if should_ignore or not self.mxid:
|
|
return
|
|
if isinstance(action, MessageActionChatEditTitle):
|
|
await self._update_title(action.title, sender=sender, save=True)
|
|
await self.update_bridge_info()
|
|
elif isinstance(action, MessageActionChatEditPhoto):
|
|
await self._update_avatar(source, action.photo, sender=sender, save=True)
|
|
await self.update_bridge_info()
|
|
elif isinstance(action, MessageActionChatDeletePhoto):
|
|
await self._update_avatar(source, ChatPhotoEmpty(), sender=sender, save=True)
|
|
await self.update_bridge_info()
|
|
elif isinstance(action, MessageActionChatAddUser):
|
|
for user_id in action.users:
|
|
await self._add_telegram_user(TelegramID(user_id), source)
|
|
elif isinstance(action, (MessageActionChatJoinedByLink, MessageActionChatJoinedByRequest)):
|
|
await self._add_telegram_user(sender.id, source)
|
|
elif isinstance(action, MessageActionChatDeleteUser):
|
|
await self.delete_telegram_user(TelegramID(action.user_id), sender)
|
|
elif isinstance(action, MessageActionChatMigrateTo):
|
|
await self._migrate_and_save_telegram(TelegramID(action.channel_id))
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.EMOTE,
|
|
body="upgraded this group to a supergroup",
|
|
),
|
|
)
|
|
await self.update_bridge_info()
|
|
elif isinstance(action, MessageActionPhoneCall):
|
|
call_type = "Video call" if action.video else "Call"
|
|
end_reason = "ended"
|
|
if isinstance(action.reason, PhoneCallDiscardReasonMissed):
|
|
end_reason = "cancelled" if sender.tgid == source.tgid else "missed"
|
|
elif isinstance(action.reason, PhoneCallDiscardReasonBusy):
|
|
end_reason = "rejected"
|
|
elif isinstance(action.reason, PhoneCallDiscardReasonDisconnect):
|
|
end_reason = "disconnected"
|
|
body = f"{call_type} {end_reason}"
|
|
if action.duration:
|
|
body += f" ({format_duration(action.duration)})"
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(msgtype=MessageType.NOTICE, body=body),
|
|
)
|
|
elif isinstance(action, MessageActionGroupCall):
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.EMOTE,
|
|
body=(
|
|
"started a video chat"
|
|
if action.duration is None
|
|
else f"ended the video chat ({format_duration(action.duration)})"
|
|
),
|
|
),
|
|
)
|
|
elif isinstance(action, MessageActionGiftPremium):
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.EMOTE,
|
|
body=(
|
|
f"gifted Telegram Premium for {action.months} months "
|
|
f"({action.amount / 100} {action.currency})"
|
|
),
|
|
),
|
|
)
|
|
elif isinstance(action, MessageActionBoostApply):
|
|
await self._send_message(
|
|
sender.intent_for(self),
|
|
TextMessageEventContent(
|
|
msgtype=MessageType.EMOTE,
|
|
body=(
|
|
"boosted the group"
|
|
if action.boosts == 1
|
|
else f"boosted the group {action.boosts} times"
|
|
),
|
|
),
|
|
)
|
|
elif isinstance(action, MessageActionGameScore):
|
|
# TODO handle game score
|
|
pass
|
|
elif isinstance(action, MessageActionContactSignUp):
|
|
await self.handle_telegram_joined(source, sender, update)
|
|
else:
|
|
self.log.trace("Unhandled Telegram action in %s: %s", self.title, action)
|
|
|
|
async def handle_telegram_joined(
|
|
self,
|
|
source: au.AbstractUser,
|
|
sender: p.Puppet,
|
|
update: MessageService,
|
|
backfill: bool = False,
|
|
) -> None:
|
|
assert isinstance(update.action, MessageActionContactSignUp)
|
|
|
|
msg = await DBMessage.get_one_by_tgid(TelegramID(update.id), source.tgid)
|
|
if msg:
|
|
self.log.debug(
|
|
f"Ignoring new user message {update.id} (src {source.tgid}) as it was already "
|
|
f"handled into {msg.mxid}."
|
|
)
|
|
return
|
|
|
|
content = TextMessageEventContent(msgtype=MessageType.EMOTE, body="joined Telegram")
|
|
event_id = await self._send_message(
|
|
sender.intent_for(self), content, timestamp=update.date
|
|
)
|
|
await DBMessage(
|
|
tgid=TelegramID(update.id),
|
|
mx_room=self.mxid,
|
|
mxid=event_id,
|
|
tg_space=source.tgid,
|
|
edit_index=0,
|
|
sender=sender.id,
|
|
).insert()
|
|
if self.config["bridge.always_read_joined_telegram_notice"]:
|
|
double_puppet = await p.Puppet.get_by_tgid(source.tgid)
|
|
if double_puppet and double_puppet.is_real_user:
|
|
await double_puppet.intent.mark_read(self.mxid, event_id)
|
|
|
|
async def set_telegram_admin(self, user_id: TelegramID) -> None:
|
|
puppet = await p.Puppet.get_by_tgid(user_id)
|
|
user = await u.User.get_by_tgid(user_id)
|
|
|
|
levels = await self.main_intent.get_power_levels(self.mxid)
|
|
if user:
|
|
levels.users[user.mxid] = 50
|
|
if puppet:
|
|
levels.users[puppet.mxid] = 50
|
|
await self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
async def receive_telegram_pin_ids(
|
|
self, msg_ids: list[TelegramID], receiver: TelegramID, remove: bool
|
|
) -> None:
|
|
async with self._pin_lock:
|
|
tg_space = receiver if self.peer_type != "channel" else self.tgid
|
|
previously_pinned = await self.main_intent.get_pinned_messages(self.mxid)
|
|
currently_pinned_dict = {event_id: True for event_id in previously_pinned}
|
|
for message in await DBMessage.get_first_by_tgids(msg_ids, tg_space):
|
|
if remove:
|
|
currently_pinned_dict.pop(message.mxid, None)
|
|
else:
|
|
currently_pinned_dict[message.mxid] = True
|
|
currently_pinned = list(currently_pinned_dict.keys())
|
|
if currently_pinned != previously_pinned:
|
|
await self.main_intent.set_pinned_messages(self.mxid, currently_pinned)
|
|
|
|
async def set_telegram_admins_enabled(self, enabled: bool) -> None:
|
|
level = 50 if enabled else 10
|
|
levels = await self.main_intent.get_power_levels(self.mxid)
|
|
levels.invite = level
|
|
levels.events[EventType.ROOM_NAME] = level
|
|
levels.events[EventType.ROOM_AVATAR] = level
|
|
await self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
# endregion
|
|
# region Miscellaneous getters
|
|
|
|
def get_config(self, key: str) -> Any:
|
|
local = util.recursive_get(self.local_config, key)
|
|
if local is not None:
|
|
return local
|
|
return self.config[f"bridge.{key}"]
|
|
|
|
async def can_user_perform(self, user: u.User, event: str) -> bool:
|
|
if user.is_admin:
|
|
return True
|
|
if not self.mxid:
|
|
# No room for anybody to perform actions in
|
|
return False
|
|
try:
|
|
await self.main_intent.get_power_levels(self.mxid)
|
|
except MatrixRequestError:
|
|
return False
|
|
evt_type = EventType.find(f"fi.mau.telegram.{event}", t_class=EventType.Class.STATE)
|
|
return await self.main_intent.state_store.has_power_level(self.mxid, user.mxid, evt_type)
|
|
|
|
def get_input_entity(
|
|
self, user: au.AbstractUser
|
|
) -> Awaitable[TypeInputPeer | TypeInputChannel]:
|
|
return user.client.get_input_entity(self.peer)
|
|
|
|
async def get_entity(
|
|
self, user: au.AbstractUser, client: MautrixTelegramClient | None = None
|
|
) -> TypeChat:
|
|
if not client:
|
|
client = user.client
|
|
try:
|
|
return await client.get_entity(self.peer)
|
|
except ValueError:
|
|
if user.is_bot:
|
|
self.log.warning(f"Could not find entity with bot {user.tgid}. Failing...")
|
|
raise
|
|
self.log.warning(
|
|
f"Could not find entity with user {user.tgid}. falling back to get_dialogs."
|
|
)
|
|
async for dialog in client.iter_dialogs():
|
|
if dialog.entity.id == self.tgid:
|
|
return dialog.entity
|
|
raise
|
|
|
|
async def get_invite_link(
|
|
self,
|
|
user: u.User,
|
|
uses: int | None = None,
|
|
expire: datetime | None = None,
|
|
request_needed: bool = False,
|
|
title: str | None = None,
|
|
) -> str:
|
|
if self.peer_type == "user":
|
|
raise ValueError("You can't invite users to private chats.")
|
|
if self.username:
|
|
return f"https://t.me/{self.username}"
|
|
link = await user.client(
|
|
ExportChatInviteRequest(
|
|
peer=await self.get_input_entity(user),
|
|
expire_date=expire,
|
|
usage_limit=uses,
|
|
request_needed=request_needed,
|
|
title=title,
|
|
)
|
|
)
|
|
return link.link
|
|
|
|
# endregion
|
|
# region Matrix room cleanup
|
|
|
|
async def get_authenticated_matrix_users(self) -> list[UserID]:
|
|
try:
|
|
members = await self.main_intent.get_room_members(self.mxid)
|
|
except MatrixRequestError:
|
|
return []
|
|
authenticated: list[UserID] = []
|
|
has_bot = self.has_bot
|
|
for member in members:
|
|
if p.Puppet.get_id_from_mxid(member) or member == self.az.bot_mxid:
|
|
continue
|
|
user = await u.User.get_and_start_by_mxid(member)
|
|
authenticated_through_bot = has_bot and user.relaybot_whitelisted
|
|
if authenticated_through_bot or await user.has_full_access(allow_bot=True):
|
|
authenticated.append(user.mxid)
|
|
return authenticated
|
|
|
|
async def cleanup_portal(
|
|
self, message: str, puppets_only: bool = False, delete: bool = True
|
|
) -> None:
|
|
if self.username:
|
|
try:
|
|
await self.main_intent.remove_room_alias(self.alias_localpart)
|
|
except (MatrixRequestError, IntentError):
|
|
self.log.warning("Failed to remove alias when cleaning up room", exc_info=True)
|
|
await self.cleanup_room(self.main_intent, self.mxid, message, puppets_only)
|
|
if delete:
|
|
await self.delete()
|
|
|
|
async def delete(self) -> None:
|
|
try:
|
|
del self.by_tgid[self.tgid_full]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
del self.by_mxid[self.mxid]
|
|
except KeyError:
|
|
pass
|
|
self.name_set = False
|
|
self.avatar_set = False
|
|
self.about = None
|
|
self.next_batch_id = None
|
|
self.first_event_id = None
|
|
self.sponsored_event_id = None
|
|
self.sponsored_event_ts = None
|
|
self.sponsored_msg_random_id = None
|
|
await super().delete()
|
|
await DBMessage.delete_all(self.mxid)
|
|
await DBReaction.delete_all(self.mxid)
|
|
self.deleted = True
|
|
|
|
# endregion
|
|
# region Class instance lookup
|
|
|
|
async def get_dm_puppet(self) -> p.Puppet | None:
|
|
if not self.is_direct:
|
|
return None
|
|
return await p.Puppet.get_by_tgid(self.tgid)
|
|
|
|
async def postinit(self) -> None:
|
|
puppet = await self.get_dm_puppet()
|
|
self._main_intent = puppet.intent_for(self) if self.is_direct else self.az.intent
|
|
|
|
if self.tgid:
|
|
self.by_tgid[self.tgid_full] = self
|
|
if self.mxid:
|
|
self.by_mxid[self.mxid] = self
|
|
|
|
@classmethod
|
|
async def _yield_portals(
|
|
cls, query: Awaitable[list[DBPortal]]
|
|
) -> AsyncGenerator[Portal, None]:
|
|
portals = await query
|
|
portal: cls
|
|
for portal in portals:
|
|
try:
|
|
yield cls.by_tgid[portal.tgid_full]
|
|
except KeyError:
|
|
await portal.postinit()
|
|
yield portal
|
|
|
|
@classmethod
|
|
def all(cls) -> AsyncGenerator[Portal, None]:
|
|
return cls._yield_portals(super().all())
|
|
|
|
@classmethod
|
|
def find_private_chats_of(cls, tg_receiver: TelegramID) -> AsyncGenerator[Portal, None]:
|
|
return cls._yield_portals(super().find_private_chats_of(tg_receiver))
|
|
|
|
@classmethod
|
|
def find_private_chats_with(cls, tgid: TelegramID) -> AsyncGenerator[Portal, None]:
|
|
return cls._yield_portals(super().find_private_chats_with(tgid))
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_mxid(cls, mxid: RoomID, /) -> Portal | None:
|
|
try:
|
|
return cls.by_mxid[mxid]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = cast(cls, await super().get_by_mxid(mxid))
|
|
if portal:
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def get_username_from_mx_alias(cls, alias: str) -> str | None:
|
|
return cls.alias_template.parse(alias)
|
|
|
|
@classmethod
|
|
async def find_by_username(cls, username: str) -> Portal | None:
|
|
if not username:
|
|
return None
|
|
|
|
username = username.lower()
|
|
|
|
for _, portal in cls.by_tgid.items():
|
|
if portal.username and portal.username.lower() == username:
|
|
return portal
|
|
|
|
portal = cast(cls, await super().find_by_username(username))
|
|
if portal:
|
|
try:
|
|
return cls.by_tgid[portal.tgid_full]
|
|
except KeyError:
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
@async_getter_lock
|
|
async def get_by_tgid(
|
|
cls, tgid: TelegramID, /, *, tg_receiver: TelegramID | None = None, peer_type: str = None
|
|
) -> Portal | None:
|
|
if peer_type == "user" and tg_receiver is None:
|
|
raise ValueError('tg_receiver is required when peer_type is "user"')
|
|
tg_receiver = tg_receiver or tgid
|
|
tgid_full = (tgid, tg_receiver)
|
|
try:
|
|
return cls.by_tgid[tgid_full]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = cast(cls, await super().get_by_tgid(tgid, tg_receiver))
|
|
if portal:
|
|
await portal.postinit()
|
|
return portal
|
|
|
|
if peer_type:
|
|
cls.log.info(f"Creating portal object for {peer_type} {tgid} (receiver {tg_receiver})")
|
|
# TODO enable this for non-release builds
|
|
# (or add better wrong peer type error handling)
|
|
# if peer_type == "chat":
|
|
# import traceback
|
|
# cls.log.info("Chat portal stack trace:\n" + "".join(traceback.format_stack()))
|
|
portal = cls(tgid, peer_type=peer_type, tg_receiver=tg_receiver)
|
|
await portal.postinit()
|
|
await portal.insert()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
async def get_by_entity(
|
|
cls,
|
|
entity: TypeChat | TypePeer | TypeUser | TypeUserFull | TypeInputPeer,
|
|
tg_receiver: TelegramID | None = None,
|
|
create: bool = True,
|
|
) -> Portal | None:
|
|
entity_type = type(entity)
|
|
if entity_type in (Chat, ChatFull):
|
|
type_name = "chat"
|
|
entity_id = entity.id
|
|
elif entity_type in (PeerChat, InputPeerChat):
|
|
type_name = "chat"
|
|
entity_id = entity.chat_id
|
|
elif entity_type in (Channel, ChannelFull):
|
|
type_name = "channel"
|
|
entity_id = entity.id
|
|
elif entity_type in (PeerChannel, InputPeerChannel, InputChannel):
|
|
type_name = "channel"
|
|
entity_id = entity.channel_id
|
|
elif entity_type in (User, UserFull):
|
|
type_name = "user"
|
|
entity_id = entity.id
|
|
elif entity_type in (PeerUser, InputPeerUser, InputUser):
|
|
type_name = "user"
|
|
entity_id = entity.user_id
|
|
else:
|
|
raise ValueError(f"Unknown entity type {entity_type.__name__}")
|
|
return await cls.get_by_tgid(
|
|
TelegramID(entity_id),
|
|
tg_receiver=tg_receiver if type_name == "user" else entity_id,
|
|
peer_type=type_name if create else None,
|
|
)
|
|
|
|
# endregion
|