512 lines
18 KiB
512 lines
18 KiB
# Copyright (c) 2022 Tulir Asokan
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations
from typing import Any, Awaitable, Callable, NamedTuple, Type
import asyncio
import logging
import time
import traceback
from mautrix.appservice import AppService, IntentAPI
from mautrix.errors import MForbidden
from mautrix.types import EventID, MessageEventContent, RoomID
from mautrix.util import markdown
from mautrix.util.logging import TraceLogger
from ... import bridge as br
command_handlers: dict[str, CommandHandler] = {}
command_aliases: dict[str, CommandHandler] = {}
HelpSection = NamedTuple("HelpSection", name=str, order=int, description=str)
HelpCacheKey = NamedTuple(
"HelpCacheKey", is_management=bool, is_portal=bool, is_admin=bool, is_logged_in=bool
SECTION_GENERAL = HelpSection("General", 0, "")
SECTION_AUTH = HelpSection("Authentication", 10, "")
SECTION_ADMIN = HelpSection("Administration", 50, "")
SECTION_RELAY = HelpSection("Relay mode management", 15, "")
def ensure_trailing_newline(s: str) -> str:
"""Returns the passed string, but with a guaranteed trailing newline."""
return s + ("" if s[-1] == "\n" else "\n")
class CommandEvent:
"""Holds information about a command issued in a Matrix room.
When a Matrix command was issued to the bot, CommandEvent will hold
information regarding the event.
room_id: The id of the Matrix room in which the command was issued.
event_id: The id of the matrix event which contained the command.
sender: The user who issued the command.
command: The issued command.
args: Arguments given with the issued command.
content: The raw content in the command event.
portal: The portal the command was sent to.
is_management: Determines whether the room in which the command was
issued in is a management room.
has_bridge_bot: Whether or not the bridge bot is in the room.
bridge: bridge.Bridge
az: AppService
log: TraceLogger
loop: asyncio.AbstractEventLoop
config: br.BaseBridgeConfig
processor: CommandProcessor
command_prefix: str
room_id: RoomID
event_id: EventID
sender: br.BaseUser
command: str
args: list[str]
content: MessageEventContent
portal: br.BasePortal | None
is_management: bool
has_bridge_bot: bool
def __init__(
processor: CommandProcessor,
room_id: RoomID,
event_id: EventID,
sender: br.BaseUser,
command: str,
args: list[str],
content: MessageEventContent,
portal: br.BasePortal | None,
is_management: bool,
has_bridge_bot: bool,
) -> None:
self.bridge = processor.bridge
self.az = processor.az
self.log = processor.log
self.loop = processor.loop
self.config = processor.config
self.processor = processor
self.command_prefix = processor.command_prefix
self.room_id = room_id
self.event_id = event_id
self.sender = sender
self.command = command
self.args = args
self.content = content
self.portal = portal
self.is_management = is_management
self.has_bridge_bot = has_bridge_bot
def is_portal(self) -> bool:
return self.portal is not None
async def get_help_key(self) -> HelpCacheKey:
Get the help cache key for the given CommandEvent.
Help messages are generated dynamically from the CommandHandlers that have been added so
that they would only contain relevant commands. The help cache key is tuple-unpacked and
passed to :meth:`CommandHandler.has_permission` when generating the help page. After the
first generation, the page is cached using the help cache key.
If you override this property or :meth:`CommandHandler.has_permission`, make sure to
override the other too to handle the changes properly.
When you override this property or otherwise extend CommandEvent, remember to pass the
extended CommandEvent class when initializing your CommandProcessor.
return HelpCacheKey(
is_portal=self.portal is not None,
is_logged_in=await self.sender.is_logged_in(),
def print_error_traceback(self) -> bool:
Whether or not the stack traces of unhandled exceptions during the handling of this command
should be sent to the user. If false, the error message will simply tell the user to check
the logs.
Bridges may want to limit tracebacks to bridge admins.
return self.sender.is_admin
def main_intent(self) -> IntentAPI:
return self.portal.main_intent if self.portal else self.az.intent
async def redact(self, reason: str | None = None) -> None:
Try to redact the command.
If the redaction fails with M_FORBIDDEN, the error will be logged and ignored.
if self.has_bridge_bot:
await self.az.intent.redact(self.room_id, self.event_id, reason=reason)
await self.main_intent.redact(self.room_id, self.event_id, reason=reason)
except MForbidden as e:
self.log.warning(f"Failed to redact command {self.command}: {e}")
except Exception:
self.log.warning(f"Failed to redact command {self.command}", exc_info=True)
def reply(
self, message: str, allow_html: bool = False, render_markdown: bool = True
) -> Awaitable[EventID]:
"""Write a reply to the room in which the command was issued.
Replaces occurences of "$cmdprefix" in the message with the command
prefix and replaces occurences of "$cmdprefix+sp " with the command
prefix if the command was not issued in a management room.
If allow_html and render_markdown are both False, the message will not
be rendered to html and sending of html is disabled.
message: The message to post in the room.
allow_html: Escape html in the message or don't render html at all
if markdown is disabled.
render_markdown: Use markdown formatting to render the passed
message to html.
Handler for the message sending function.
message = self._replace_command_prefix(message)
html = self._render_message(
message, allow_html=allow_html, render_markdown=render_markdown
if self.has_bridge_bot:
return self.az.intent.send_notice(self.room_id, message, html=html)
return self.main_intent.send_notice(self.room_id, message, html=html)
async def mark_read(self) -> None:
"""Marks the command as read by the bot."""
if self.has_bridge_bot:
await self.az.intent.mark_read(self.room_id, self.event_id)
def _replace_command_prefix(self, message: str) -> str:
"""Returns the string with the proper command prefix entered."""
message = message.replace(
"$cmdprefix+sp ", "" if self.is_management else f"{self.command_prefix} "
return message.replace("$cmdprefix", self.command_prefix)
def _render_message(message: str, allow_html: bool, render_markdown: bool) -> str | None:
"""Renders the message as HTML.
allow_html: Flag to allow custom HTML in the message.
render_markdown: If true, markdown styling is applied to the message.
The message rendered as HTML.
None is returned if no styled output is required.
html = ""
if render_markdown:
html = markdown.render(message, allow_html=allow_html)
elif allow_html:
html = message
return ensure_trailing_newline(html) if html else None
CommandHandlerFunc = Callable[[CommandEvent], Awaitable[Any]]
IsEnabledForFunc = Callable[[CommandEvent], bool]
class CommandHandler:
"""A command which can be executed from a Matrix room.
The command manages its permission and help texts.
When called, it will check the permission of the command event and execute
the command or, in case of error, report back to the user.
management_only: Whether the command can exclusively be issued in a
management room.
name: The name of this command.
help_section: Section of the help in which this command will appear.
name: str
management_only: bool
needs_admin: bool
needs_auth: bool
is_enabled_for: IsEnabledForFunc
_help_text: str
_help_args: str
help_section: HelpSection
def __init__(
handler: CommandHandlerFunc,
management_only: bool,
name: str,
help_text: str,
help_args: str,
help_section: HelpSection,
needs_auth: bool,
needs_admin: bool,
is_enabled_for: IsEnabledForFunc = lambda _: True,
) -> None:
handler: The function handling the execution of this command.
management_only: Whether the command can exclusively be issued
in a management room.
needs_auth: Whether the command needs the bridge to be authed already
needs_admin: Whether the command needs the issuer to be bridge admin
name: The name of this command.
help_text: The text displayed in the help for this command.
help_args: Help text for the arguments of this command.
help_section: Section of the help in which this command will appear.
for key, value in kwargs.items():
setattr(self, key, value)
self._handler = handler
self.management_only = management_only
self.needs_admin = needs_admin
self.needs_auth = needs_auth
self.name = name
self._help_text = help_text
self._help_args = help_args
self.help_section = help_section
self.is_enabled_for = is_enabled_for
async def get_permission_error(self, evt: CommandEvent) -> str | None:
"""Returns the reason why the command could not be issued.
evt: The event for which to get the error information.
A string describing the error or None if there was no error.
if self.management_only and not evt.is_management:
return (
f"`{evt.command}` is a restricted command: "
"you may only run it in management rooms."
elif self.needs_admin and not evt.sender.is_admin:
return "That command is limited to bridge administrators."
elif self.needs_auth and not await evt.sender.is_logged_in():
return "That command requires you to be logged in."
return None
def has_permission(self, key: HelpCacheKey) -> bool:
"""Checks the permission for this command with the given status.
key: The help cache key. See meth:`CommandEvent.get_cache_key`.
True if a user with the given state is allowed to issue the
return (
(not self.management_only or key.is_management)
and (not self.needs_admin or key.is_admin)
and (not self.needs_auth or key.is_logged_in)
async def __call__(self, evt: CommandEvent) -> Any:
"""Executes the command if evt was issued with proper rights.
evt: The CommandEvent for which to check permissions.
The result of the command or the error message function.
error = await self.get_permission_error(evt)
if error is not None:
return await evt.reply(error)
return await self._handler(evt)
def has_help(self) -> bool:
"""Returns true if this command has a help text."""
return bool(self.help_section) and bool(self._help_text)
def help(self) -> str:
"""Returns the help text to this command."""
return f"**{self.name}** {self._help_args} - {self._help_text}"
def command_handler(
_func: CommandHandlerFunc | None = None,
management_only: bool = False,
name: str | None = None,
help_text: str = "",
help_args: str = "",
help_section: HelpSection = None,
aliases: list[str] | None = None,
_handler_class: Type[CommandHandler] = CommandHandler,
needs_auth: bool = True,
needs_admin: bool = False,
is_enabled_for: IsEnabledForFunc = lambda _: True,
) -> Callable[[CommandHandlerFunc], CommandHandler]:
"""Decorator to create CommandHandlers"""
def decorator(func: CommandHandlerFunc) -> CommandHandler:
actual_name = name or func.__name__.replace("_", "-")
handler = _handler_class(
command_handlers[handler.name] = handler
if aliases:
for alias in aliases:
command_aliases[alias] = handler
return handler
return decorator if _func is None else decorator(_func)
class CommandProcessor:
"""Handles the raw commands issued by a user to the Matrix bot."""
log: TraceLogger = logging.getLogger("mau.commands")
az: AppService
config: br.BaseBridgeConfig
loop: asyncio.AbstractEventLoop
event_class: Type[CommandEvent]
bridge: bridge.Bridge
_ref_no: int
def __init__(
self, bridge: bridge.Bridge, event_class: Type[CommandEvent] = CommandEvent
) -> None:
self.az = bridge.az
self.config = bridge.config
self.loop = bridge.loop or asyncio.get_event_loop()
self.command_prefix = self.config["bridge.command_prefix"]
self.bridge = bridge
self.event_class = event_class
self._ref_no = int(time.time())
def ref_no(self) -> int:
Reference number for a command handling exception to help sysadmins find the error when
receiving user reports.
self._ref_no += 1
return self._ref_no
def _run_handler(
handler: Callable[[CommandEvent], Awaitable[Any]], evt: CommandEvent
) -> Awaitable[Any]:
return handler(evt)
async def handle(
room_id: RoomID,
event_id: EventID,
sender: br.BaseUser,
command: str,
args: list[str],
content: MessageEventContent,
portal: br.BasePortal | None,
is_management: bool,
has_bridge_bot: bool,
) -> None:
"""Handles the raw commands issued by a user to the Matrix bot.
If the command is not known, it might be a followup command and is
delegated to a command handler registered for that purpose in the
senders command_status as "next".
room_id: ID of the Matrix room in which the command was issued.
event_id: ID of the event by which the command was issued.
sender: The sender who issued the command.
command: The issued command, case insensitive.
args: Arguments given with the command.
content: The raw content in the command event.
portal: The portal the command was sent to.
is_management: Whether the room is a management room.
has_bridge_bot: Whether or not the bridge bot is in the room.
The result of the error message function or None if no error
occured. Unknown and delegated commands do not count as errors.
if not command_handlers or "unknown-command" not in command_handlers:
raise ValueError("command_handlers are not properly initialized.")
evt = self.event_class(
orig_command = command
command = command.lower()
handler = command_handlers.get(command, command_aliases.get(command))
if handler is None or not handler.is_enabled_for(evt):
if sender.command_status and "next" in sender.command_status:
args.insert(0, orig_command)
evt.command = ""
handler = sender.command_status["next"]
handler = command_handlers["unknown-command"]
await self._run_handler(handler, evt)
except Exception:
ref_no = self.ref_no
"Unhandled error while handling command "
f"{evt.command} {' '.join(args)} from {sender.mxid} (ref: {ref_no})"
if evt.print_error_traceback:
await evt.reply(
"Unhandled error while handling command:\n\n"
await evt.reply(
"Unhandled error while handling command. "
f"Check logs for more details (ref: {ref_no})."
return None