348 lines
12 KiB
Python
348 lines
12 KiB
Python
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
|
# Copyright (C) 2021 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
import re
|
|
|
|
from telethon.errors import (
|
|
ChatAdminRequiredError,
|
|
RPCError,
|
|
UsernameInvalidError,
|
|
UsernameNotModifiedError,
|
|
UsernameOccupiedError,
|
|
)
|
|
from telethon.helpers import add_surrogate
|
|
from telethon.tl.functions.channels import GetFullChannelRequest
|
|
from telethon.tl.functions.messages import GetExportedChatInvitesRequest, GetFullChatRequest
|
|
from telethon.tl.types import (
|
|
ChatInviteExported,
|
|
InputMessageEntityMentionName,
|
|
InputUserSelf,
|
|
MessageEntityMention,
|
|
TypeInputPeer,
|
|
TypeInputUser,
|
|
)
|
|
from telethon.tl.types.messages import ExportedChatInvites
|
|
|
|
from mautrix.types import EventID
|
|
|
|
from ... import formatter as fmt, portal as po, puppet as pu
|
|
from .. import SECTION_MISC, SECTION_PORTAL_MANAGEMENT, CommandEvent, command_handler
|
|
from .util import user_has_power_level
|
|
|
|
|
|
@command_handler(
|
|
needs_admin=False,
|
|
needs_puppeting=False,
|
|
needs_auth=False,
|
|
help_section=SECTION_MISC,
|
|
help_text="Fetch Matrix room state to ensure the bridge has up-to-date info.",
|
|
)
|
|
async def sync_state(evt: CommandEvent) -> EventID:
|
|
portal = await po.Portal.get_by_mxid(evt.room_id)
|
|
if not portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
elif not await user_has_power_level(evt.room_id, evt.az.intent, evt.sender, "bridge"):
|
|
return await evt.reply(f"You do not have the permissions to synchronize this room.")
|
|
|
|
await portal.main_intent.get_joined_members(portal.mxid)
|
|
await evt.reply("Synchronization complete")
|
|
|
|
|
|
@command_handler(
|
|
needs_admin=False, needs_puppeting=False, needs_auth=False, help_section=SECTION_MISC
|
|
)
|
|
async def sync_full(evt: CommandEvent) -> EventID:
|
|
portal = await po.Portal.get_by_mxid(evt.room_id)
|
|
if not portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
|
|
if len(evt.args) > 0 and evt.args[0] == "--usebot" and evt.sender.is_admin:
|
|
src = evt.tgbot
|
|
else:
|
|
src = evt.tgbot if await evt.sender.needs_relaybot(portal) else evt.sender
|
|
|
|
try:
|
|
if portal.peer_type == "channel":
|
|
res = await src.client(GetFullChannelRequest(portal.peer))
|
|
elif portal.peer_type == "chat":
|
|
res = await src.client(GetFullChatRequest(portal.tgid))
|
|
else:
|
|
return await evt.reply("This is not a channel or chat portal.")
|
|
except (ValueError, RPCError):
|
|
return await evt.reply("Failed to get portal info from Telegram.")
|
|
|
|
await portal.update_matrix_room(src, res.full_chat)
|
|
return await evt.reply("Portal synced successfully.")
|
|
|
|
|
|
@command_handler(
|
|
name="id",
|
|
needs_admin=False,
|
|
needs_puppeting=False,
|
|
needs_auth=False,
|
|
help_section=SECTION_MISC,
|
|
help_text="Get the ID of the Telegram chat where this room is bridged.",
|
|
)
|
|
async def get_id(evt: CommandEvent) -> EventID:
|
|
portal = await po.Portal.get_by_mxid(evt.room_id)
|
|
if not portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
tgid = portal.tgid
|
|
if portal.peer_type == "chat":
|
|
tgid = -tgid
|
|
elif portal.peer_type == "channel":
|
|
tgid = f"-100{tgid}"
|
|
await evt.reply(f"This room is bridged to Telegram chat ID `{tgid}`.")
|
|
|
|
|
|
invite_link_usage = (
|
|
"**Usage:** `$cmdprefix+sp invite-link "
|
|
"[--uses=<amount>] [--expire=<delta>] [--request-needed] -- [title]`"
|
|
"\n\n"
|
|
"* `--uses`: the number of times the invite link can be used."
|
|
" Defaults to unlimited.\n"
|
|
"* `--expire`: the duration after which the link will expire."
|
|
" A number suffixed with d(ay), h(our), m(inute) or s(econd)\n"
|
|
"* `--request-needed`: should the link require admins to approve joins?\n"
|
|
"* `title`: a description of the link (only shown to admins)."
|
|
)
|
|
|
|
|
|
def _parse_flag(args: list[str]) -> tuple[str, str]:
|
|
arg = args.pop(0).lower()
|
|
if arg == "--":
|
|
return "", ""
|
|
value = ""
|
|
if arg.startswith("--"):
|
|
value_start = arg.find("=")
|
|
if value_start > 0:
|
|
flag = arg[2:value_start]
|
|
value = arg[value_start + 1 :]
|
|
else:
|
|
flag = arg[2:]
|
|
if arg not in ("request", "request-needed"):
|
|
value = args.pop(0).lower()
|
|
elif arg.startswith("-"):
|
|
flag = arg[1]
|
|
if len(arg) > 3 and arg[2] == "=":
|
|
value = arg[3:]
|
|
elif arg != "r":
|
|
value = args.pop(0).lower()
|
|
else:
|
|
raise ValueError("invalid flag")
|
|
return flag, value
|
|
|
|
|
|
delta_regex = re.compile(
|
|
"([0-9]+)(w(?:eek)?|d(?:ay)?|h(?:our)?|m(?:in(?:ute)?)?|s(?:ec(?:ond)?)?)"
|
|
)
|
|
|
|
|
|
def _parse_delta(value: str) -> timedelta | None:
|
|
match = delta_regex.fullmatch(value)
|
|
if not match:
|
|
return None
|
|
number = int(match.group(1))
|
|
unit = match.group(2)[0]
|
|
if unit == "w":
|
|
return timedelta(weeks=number)
|
|
elif unit == "d":
|
|
return timedelta(days=number)
|
|
elif unit == "h":
|
|
return timedelta(hours=number)
|
|
elif unit == "m":
|
|
return timedelta(minutes=number)
|
|
elif unit == "s":
|
|
return timedelta(seconds=number)
|
|
else:
|
|
return None
|
|
|
|
|
|
@command_handler(
|
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
|
help_text="Get a Telegram invite link to the current chat.",
|
|
help_args="[--uses=<amount>] [--expire=<time delta, e.g. 1d>] [--request-needed] -- [title]",
|
|
)
|
|
async def invite_link(evt: CommandEvent) -> EventID:
|
|
if not evt.is_portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
|
|
# TODO once we switch to Python 3.9 minimum, use argparse with exit_on_error=False
|
|
uses = None
|
|
expire = None
|
|
request_needed = False
|
|
while evt.args:
|
|
try:
|
|
flag, value = _parse_flag(evt.args)
|
|
except (ValueError, IndexError):
|
|
return await evt.reply(invite_link_usage)
|
|
if not flag:
|
|
break
|
|
elif flag in ("uses", "u"):
|
|
try:
|
|
uses = int(value)
|
|
except ValueError:
|
|
await evt.reply("The number of uses must be an integer")
|
|
elif flag in ("expire", "e"):
|
|
expire_delta = _parse_delta(value)
|
|
if not expire_delta:
|
|
await evt.reply("Invalid format for expiry time delta")
|
|
expire = datetime.now() + expire_delta
|
|
elif flag in ("request", "request-needed", "r"):
|
|
request_needed = True
|
|
title = " ".join(evt.args)
|
|
|
|
if evt.portal.peer_type == "user":
|
|
return await evt.reply("You can't invite users to private chats.")
|
|
|
|
try:
|
|
link = await evt.portal.get_invite_link(
|
|
evt.sender, uses=uses, expire=expire, request_needed=request_needed, title=title
|
|
)
|
|
return await evt.reply(f"Invite link to {evt.portal.title}: {link}")
|
|
except ValueError as e:
|
|
return await evt.reply(e.args[0])
|
|
except ChatAdminRequiredError:
|
|
return await evt.reply("You don't have the permission to create an invite link.")
|
|
|
|
|
|
async def _format_invite_link(link: ChatInviteExported) -> str:
|
|
desc = f"* {link.link}"
|
|
if link.title:
|
|
desc += f" - {link.title}"
|
|
if link.expire_date:
|
|
desc += f" \n Expires at {link.expire_date.isoformat()}"
|
|
if link.usage_limit:
|
|
desc += f" \n Used {link.usage or 0} out of {link.usage_limit} times"
|
|
elif link.usage:
|
|
desc += f" \n Used {link.usage} times"
|
|
else:
|
|
desc += " \n Never used"
|
|
if link.request_needed:
|
|
desc += " \n Join requests enabled - using link requires admin approval"
|
|
return desc
|
|
|
|
|
|
async def _hacky_find_mention(evt: CommandEvent) -> TypeInputUser | TypeInputPeer | None:
|
|
if len(evt.args) == 0:
|
|
return None
|
|
text, entities = await fmt.matrix_to_telegram(
|
|
evt.sender.client, text=evt.content.body, html=evt.content.formatted_body
|
|
)
|
|
for entity in entities:
|
|
if isinstance(entity, MessageEntityMention):
|
|
admin_username = add_surrogate(text)[entity.offset + 1 : entity.offset + entity.length]
|
|
return await evt.sender.client.get_input_entity(admin_username)
|
|
elif isinstance(entity, InputMessageEntityMentionName):
|
|
return entity.user_id
|
|
return None
|
|
|
|
|
|
@command_handler(
|
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
|
help_text="List existing Telegram invite links to the current chat.",
|
|
help_args="[creator]",
|
|
)
|
|
async def list_invite_links(evt: CommandEvent) -> EventID:
|
|
admin_id = InputUserSelf()
|
|
try:
|
|
admin_id = await _hacky_find_mention(evt) or InputUserSelf()
|
|
except Exception:
|
|
pass
|
|
resp: ExportedChatInvites = await evt.sender.client(
|
|
GetExportedChatInvitesRequest(
|
|
peer=await evt.portal.get_input_entity(evt.sender),
|
|
admin_id=admin_id,
|
|
limit=100,
|
|
)
|
|
)
|
|
if resp.count == 0:
|
|
if isinstance(admin_id, InputUserSelf):
|
|
return await evt.reply("You haven't created any invite links to the current chat")
|
|
else:
|
|
return await evt.reply("That user hasn't created any invite links to the current chat")
|
|
formatted_links = "\n".join([await _format_invite_link(link) for link in resp.invites])
|
|
if isinstance(admin_id, InputUserSelf):
|
|
await evt.reply(f"Your links to this chat:\n\n{formatted_links}")
|
|
else:
|
|
puppet = await pu.Puppet.get_by_peer(admin_id)
|
|
await evt.reply(
|
|
f"[{puppet.displayname}](https://matrix.to/#/{puppet.mxid})'s links to this chat:\n\n"
|
|
f"{formatted_links}"
|
|
)
|
|
|
|
|
|
@command_handler(
|
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
|
help_text="Upgrade a normal Telegram group to a supergroup.",
|
|
)
|
|
async def upgrade(evt: CommandEvent) -> EventID:
|
|
portal = await po.Portal.get_by_mxid(evt.room_id)
|
|
if not portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
elif portal.peer_type == "channel":
|
|
return await evt.reply("This is already a supergroup or a channel.")
|
|
elif portal.peer_type == "user":
|
|
return await evt.reply("You can't upgrade private chats.")
|
|
|
|
try:
|
|
await portal.upgrade_telegram_chat(evt.sender)
|
|
return await evt.reply(f"Group upgraded to supergroup. New ID: -100{portal.tgid}")
|
|
except ChatAdminRequiredError:
|
|
return await evt.reply("You don't have the permission to upgrade this group.")
|
|
except ValueError as e:
|
|
return await evt.reply(e.args[0])
|
|
|
|
|
|
@command_handler(
|
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
|
help_args="<_name_|`-`>",
|
|
help_text=(
|
|
"Change the username of a supergroup/channel. To disable, use a dash (`-`) as the name."
|
|
),
|
|
)
|
|
async def group_name(evt: CommandEvent) -> EventID:
|
|
if len(evt.args) == 0:
|
|
return await evt.reply("**Usage:** `$cmdprefix+sp group-name <name/->`")
|
|
|
|
portal = await po.Portal.get_by_mxid(evt.room_id)
|
|
if not portal:
|
|
return await evt.reply("This is not a portal room.")
|
|
elif portal.peer_type != "channel":
|
|
return await evt.reply("Only channels and supergroups have usernames.")
|
|
|
|
try:
|
|
await portal.set_telegram_username(evt.sender, evt.args[0] if evt.args[0] != "-" else "")
|
|
if portal.username:
|
|
return await evt.reply(f"Username of channel changed to {portal.username}.")
|
|
else:
|
|
return await evt.reply(f"Channel is now private.")
|
|
except ChatAdminRequiredError:
|
|
return await evt.reply(
|
|
"You don't have the permission to set the username of this channel."
|
|
)
|
|
except UsernameNotModifiedError:
|
|
if portal.username:
|
|
return await evt.reply("That is already the username of this channel.")
|
|
else:
|
|
return await evt.reply("This channel is already private")
|
|
except UsernameOccupiedError:
|
|
return await evt.reply("That username is already in use.")
|
|
except UsernameInvalidError:
|
|
return await evt.reply("Invalid username")
|