1080 lines
41 KiB
Python
1080 lines
41 KiB
Python
# Copyright (c) 2022 Tulir Asokan
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import time
|
|
|
|
from mautrix import __optional_imports__
|
|
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, AppService
|
|
from mautrix.errors import (
|
|
DecryptionError,
|
|
IntentError,
|
|
MatrixError,
|
|
MExclusive,
|
|
MForbidden,
|
|
MUnknownToken,
|
|
SessionNotFound,
|
|
)
|
|
from mautrix.types import (
|
|
BaseRoomEvent,
|
|
BeeperMessageStatusEventContent,
|
|
EncryptedEvent,
|
|
Event,
|
|
EventID,
|
|
EventType,
|
|
MediaRepoConfig,
|
|
Membership,
|
|
MemberStateEventContent,
|
|
MessageEvent,
|
|
MessageEventContent,
|
|
MessageStatus,
|
|
MessageStatusReason,
|
|
MessageType,
|
|
PresenceEvent,
|
|
ReactionEvent,
|
|
ReceiptEvent,
|
|
ReceiptType,
|
|
RedactionEvent,
|
|
RelatesTo,
|
|
RelationType,
|
|
RoomID,
|
|
RoomType,
|
|
SingleReceiptEventContent,
|
|
SpecVersions,
|
|
StateEvent,
|
|
StateUnsigned,
|
|
TextMessageEventContent,
|
|
TrustState,
|
|
TypingEvent,
|
|
UserID,
|
|
Version,
|
|
VersionsResponse,
|
|
)
|
|
from mautrix.util import background_task, markdown
|
|
from mautrix.util.logging import TraceLogger
|
|
from mautrix.util.message_send_checkpoint import (
|
|
CHECKPOINT_TYPES,
|
|
MessageSendCheckpoint,
|
|
MessageSendCheckpointReportedBy,
|
|
MessageSendCheckpointStatus,
|
|
MessageSendCheckpointStep,
|
|
)
|
|
from mautrix.util.opt_prometheus import Histogram
|
|
|
|
from .. import bridge as br
|
|
from . import commands as cmd
|
|
|
|
encryption_import_error = None
|
|
media_encrypt_import_error = None
|
|
|
|
try:
|
|
from .e2ee import EncryptionManager
|
|
except ImportError as e:
|
|
if __optional_imports__:
|
|
raise
|
|
encryption_import_error = e
|
|
EncryptionManager = None
|
|
|
|
try:
|
|
from mautrix.crypto.attachments import encrypt_attachment
|
|
except ImportError as e:
|
|
if __optional_imports__:
|
|
raise
|
|
media_encrypt_import_error = e
|
|
encrypt_attachment = None
|
|
|
|
EVENT_TIME = Histogram(
|
|
"bridge_matrix_event", "Time spent processing Matrix events", ["event_type"]
|
|
)
|
|
|
|
|
|
class UnencryptedMessageError(DecryptionError):
|
|
def __init__(self) -> None:
|
|
super().__init__("unencrypted message")
|
|
|
|
@property
|
|
def human_message(self) -> str:
|
|
return "the message is not encrypted"
|
|
|
|
|
|
class EncryptionUnsupportedError(DecryptionError):
|
|
def __init__(self) -> None:
|
|
super().__init__("encryption is not supported")
|
|
|
|
@property
|
|
def human_message(self) -> str:
|
|
return "the bridge is not configured to support encryption"
|
|
|
|
|
|
class DeviceUntrustedError(DecryptionError):
|
|
def __init__(self, trust: TrustState) -> None:
|
|
explanation = {
|
|
TrustState.BLACKLISTED: "device is blacklisted",
|
|
TrustState.UNVERIFIED: "unverified",
|
|
TrustState.UNKNOWN_DEVICE: "device info not found",
|
|
TrustState.FORWARDED: "keys were forwarded from an unknown device",
|
|
TrustState.CROSS_SIGNED_UNTRUSTED: (
|
|
"cross-signing keys changed after setting up the bridge"
|
|
),
|
|
}.get(trust)
|
|
base = "your device is not trusted"
|
|
self.message = f"{base} ({explanation})" if explanation else base
|
|
super().__init__(self.message)
|
|
|
|
@property
|
|
def human_message(self) -> str:
|
|
return self.message
|
|
|
|
|
|
class BaseMatrixHandler:
|
|
log: TraceLogger = logging.getLogger("mau.mx")
|
|
az: AppService
|
|
commands: cmd.CommandProcessor
|
|
config: config.BaseBridgeConfig
|
|
bridge: br.Bridge
|
|
e2ee: EncryptionManager | None
|
|
require_e2ee: bool
|
|
media_config: MediaRepoConfig
|
|
versions: VersionsResponse
|
|
minimum_spec_version: Version = SpecVersions.V11
|
|
room_locks: dict[str, asyncio.Lock]
|
|
|
|
user_id_prefix: str
|
|
user_id_suffix: str
|
|
|
|
def __init__(
|
|
self,
|
|
command_processor: cmd.CommandProcessor | None = None,
|
|
bridge: br.Bridge | None = None,
|
|
) -> None:
|
|
self.az = bridge.az
|
|
self.config = bridge.config
|
|
self.bridge = bridge
|
|
self.commands = command_processor or cmd.CommandProcessor(bridge=bridge)
|
|
self.media_config = MediaRepoConfig(upload_size=50 * 1024 * 1024)
|
|
self.versions = VersionsResponse.deserialize({"versions": ["v1.3"]})
|
|
self.az.matrix_event_handler(self.int_handle_event)
|
|
self.room_locks = defaultdict(asyncio.Lock)
|
|
|
|
self.e2ee = None
|
|
self.require_e2ee = False
|
|
if self.config["bridge.encryption.allow"]:
|
|
if not EncryptionManager:
|
|
self.log.fatal(
|
|
"Encryption enabled in config, but dependencies not installed.",
|
|
exc_info=encryption_import_error,
|
|
)
|
|
sys.exit(31)
|
|
if not encrypt_attachment:
|
|
self.log.fatal(
|
|
"Encryption enabled in config, but media encryption dependencies "
|
|
"not installed.",
|
|
exc_info=media_encrypt_import_error,
|
|
)
|
|
sys.exit(31)
|
|
self.e2ee = EncryptionManager(
|
|
bridge=bridge,
|
|
user_id_prefix=self.user_id_prefix,
|
|
user_id_suffix=self.user_id_suffix,
|
|
homeserver_address=self.config["homeserver.address"],
|
|
db_url=self.config["appservice.database"],
|
|
)
|
|
self.require_e2ee = self.config["bridge.encryption.require"]
|
|
|
|
self.management_room_text = self.config.get(
|
|
"bridge.management_room_text",
|
|
{
|
|
"welcome": "Hello, I'm a bridge bot.",
|
|
"welcome_connected": "Use `help` for help.",
|
|
"welcome_unconnected": "Use `help` for help on how to log in.",
|
|
},
|
|
)
|
|
self.management_room_multiple_messages = self.config.get(
|
|
"bridge.management_room_multiple_messages",
|
|
False,
|
|
)
|
|
|
|
async def check_versions(self) -> None:
|
|
if not self.versions.supports_at_least(self.minimum_spec_version):
|
|
self.log.fatal(
|
|
"The homeserver is outdated "
|
|
"(server supports Matrix %s, but the bridge requires at least %s)",
|
|
self.versions.latest_version,
|
|
self.minimum_spec_version,
|
|
)
|
|
sys.exit(18)
|
|
if self.bridge.homeserver_software.is_hungry and not self.versions.supports(
|
|
"com.beeper.hungry"
|
|
):
|
|
self.log.fatal(
|
|
"The config claims the homeserver is hungryserv, "
|
|
"but the /versions response didn't confirm it"
|
|
)
|
|
sys.exit(18)
|
|
|
|
async def wait_for_connection(self) -> None:
|
|
self.log.info("Ensuring connectivity to homeserver")
|
|
while True:
|
|
try:
|
|
self.versions = await self.az.intent.versions()
|
|
break
|
|
except Exception:
|
|
self.log.exception("Connection to homeserver failed, retrying in 10 seconds")
|
|
await asyncio.sleep(10)
|
|
await self.check_versions()
|
|
try:
|
|
await self.az.intent.whoami()
|
|
except MForbidden:
|
|
self.log.debug(
|
|
"Whoami endpoint returned M_FORBIDDEN, "
|
|
"trying to register bridge bot before retrying..."
|
|
)
|
|
await self.az.intent.ensure_registered()
|
|
await self.az.intent.whoami()
|
|
if self.versions.supports("fi.mau.msc2659.stable") or self.versions.supports_at_least(
|
|
SpecVersions.V17
|
|
):
|
|
try:
|
|
txn_id = self.az.intent.api.get_txn_id()
|
|
duration = await self.az.ping_self(txn_id)
|
|
self.log.debug(
|
|
"Homeserver->bridge connection works, "
|
|
f"roundtrip time is {duration} ms (txn ID: {txn_id})"
|
|
)
|
|
except Exception:
|
|
self.log.exception("Error checking homeserver -> bridge connection")
|
|
sys.exit(16)
|
|
else:
|
|
self.log.debug(
|
|
"Homeserver does not support checking status of homeserver -> bridge connection"
|
|
)
|
|
try:
|
|
self.media_config = await self.az.intent.get_media_repo_config()
|
|
except Exception:
|
|
self.log.warning("Failed to fetch media repo config", exc_info=True)
|
|
|
|
async def init_as_bot(self) -> None:
|
|
self.log.debug("Initializing appservice bot")
|
|
displayname = self.config["appservice.bot_displayname"]
|
|
if displayname:
|
|
try:
|
|
await self.az.intent.set_displayname(
|
|
displayname if displayname != "remove" else ""
|
|
)
|
|
except Exception:
|
|
self.log.exception("Failed to set bot displayname")
|
|
|
|
avatar = self.config["appservice.bot_avatar"]
|
|
if avatar:
|
|
try:
|
|
await self.az.intent.set_avatar_url(avatar if avatar != "remove" else "")
|
|
except Exception:
|
|
self.log.exception("Failed to set bot avatar")
|
|
|
|
if self.bridge.homeserver_software.is_hungry and self.bridge.beeper_network_name:
|
|
self.log.debug("Setting contact info on the appservice bot")
|
|
await self.az.intent.beeper_update_profile(
|
|
{
|
|
"com.beeper.bridge.service": self.bridge.beeper_service_name,
|
|
"com.beeper.bridge.network": self.bridge.beeper_network_name,
|
|
"com.beeper.bridge.is_bridge_bot": True,
|
|
}
|
|
)
|
|
|
|
async def init_encryption(self) -> None:
|
|
if self.e2ee:
|
|
await self.e2ee.start()
|
|
|
|
async def allow_message(self, user: br.BaseUser) -> bool:
|
|
return user.is_whitelisted or (
|
|
self.config["bridge.relay.enabled"] and user.relay_whitelisted
|
|
)
|
|
|
|
@staticmethod
|
|
async def allow_command(user: br.BaseUser) -> bool:
|
|
return user.is_whitelisted
|
|
|
|
@staticmethod
|
|
async def allow_bridging_message(user: br.BaseUser, portal: br.BasePortal) -> bool:
|
|
return await user.is_logged_in() or (user.relay_whitelisted and portal.has_relay)
|
|
|
|
@staticmethod
|
|
async def allow_puppet_invite(user: br.BaseUser, puppet: br.BasePuppet) -> bool:
|
|
return await user.is_logged_in()
|
|
|
|
async def handle_leave(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
|
|
pass
|
|
|
|
async def handle_kick(
|
|
self, room_id: RoomID, user_id: UserID, kicked_by: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_ban(
|
|
self, room_id: RoomID, user_id: UserID, banned_by: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_unban(
|
|
self, room_id: RoomID, user_id: UserID, unbanned_by: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_join(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
|
|
pass
|
|
|
|
async def handle_knock(
|
|
self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_retract_knock(
|
|
self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_reject_knock(
|
|
self, room_id: RoomID, user_id: UserID, sender: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_accept_knock(
|
|
self, room_id: RoomID, user_id: UserID, sender: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_member_info_change(
|
|
self,
|
|
room_id: RoomID,
|
|
user_id: UserID,
|
|
content: MemberStateEventContent,
|
|
prev_content: MemberStateEventContent,
|
|
event_id: EventID,
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_puppet_group_invite(
|
|
self,
|
|
room_id: RoomID,
|
|
puppet: br.BasePuppet,
|
|
invited_by: br.BaseUser,
|
|
evt: StateEvent,
|
|
members: list[UserID],
|
|
) -> None:
|
|
if self.az.bot_mxid not in members:
|
|
await puppet.default_mxid_intent.leave_room(
|
|
room_id, reason="This ghost does not join multi-user rooms without the bridge bot."
|
|
)
|
|
|
|
async def handle_puppet_dm_invite(
|
|
self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
|
|
) -> None:
|
|
portal = await invited_by.get_portal_with(puppet)
|
|
if portal:
|
|
await portal.accept_matrix_dm(room_id, invited_by, puppet)
|
|
else:
|
|
await puppet.default_mxid_intent.leave_room(
|
|
room_id, reason="This bridge does not support creating DMs."
|
|
)
|
|
|
|
async def handle_puppet_space_invite(
|
|
self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
|
|
) -> None:
|
|
await puppet.default_mxid_intent.leave_room(
|
|
room_id, reason="This ghost does not join spaces."
|
|
)
|
|
|
|
async def handle_puppet_nonportal_invite(
|
|
self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
|
|
) -> None:
|
|
intent = puppet.default_mxid_intent
|
|
await intent.join_room(room_id)
|
|
try:
|
|
create_evt = await intent.get_state_event(room_id, EventType.ROOM_CREATE)
|
|
members = await intent.get_room_members(room_id)
|
|
except MatrixError:
|
|
self.log.exception(f"Failed to get state after joining {room_id} as {intent.mxid}")
|
|
background_task.create(intent.leave_room(room_id, reason="Internal error"))
|
|
return
|
|
if create_evt.type == RoomType.SPACE:
|
|
await self.handle_puppet_space_invite(room_id, puppet, invited_by, evt)
|
|
elif len(members) > 2 or not evt.content.is_direct:
|
|
await self.handle_puppet_group_invite(room_id, puppet, invited_by, evt, members)
|
|
else:
|
|
await self.handle_puppet_dm_invite(room_id, puppet, invited_by, evt)
|
|
|
|
async def handle_puppet_invite(
|
|
self, room_id: RoomID, puppet: br.BasePuppet, invited_by: br.BaseUser, evt: StateEvent
|
|
) -> None:
|
|
intent = puppet.default_mxid_intent
|
|
if not await self.allow_puppet_invite(invited_by, puppet):
|
|
self.log.debug(f"Rejecting invite for {intent.mxid} to {room_id}: user can't invite")
|
|
await intent.leave_room(room_id, reason="You're not allowed to invite this ghost.")
|
|
return
|
|
|
|
async with self.room_locks[room_id]:
|
|
portal = await self.bridge.get_portal(room_id)
|
|
if portal:
|
|
try:
|
|
await portal.handle_matrix_invite(invited_by, puppet)
|
|
except br.RejectMatrixInvite as e:
|
|
await intent.leave_room(room_id, reason=e.message)
|
|
except br.IgnoreMatrixInvite:
|
|
pass
|
|
else:
|
|
await intent.join_room(room_id)
|
|
return
|
|
else:
|
|
await self.handle_puppet_nonportal_invite(room_id, puppet, invited_by, evt)
|
|
|
|
async def handle_invite(
|
|
self, room_id: RoomID, user_id: UserID, invited_by: br.BaseUser, evt: StateEvent
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_reject(
|
|
self, room_id: RoomID, user_id: UserID, reason: str, event_id: EventID
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_disinvite(
|
|
self,
|
|
room_id: RoomID,
|
|
user_id: UserID,
|
|
disinvited_by: UserID,
|
|
reason: str,
|
|
event_id: EventID,
|
|
) -> None:
|
|
pass
|
|
|
|
async def handle_event(self, evt: Event) -> None:
|
|
"""
|
|
Called by :meth:`int_handle_event` for message events other than m.room.message.
|
|
|
|
**N.B.** You may need to add the event class to :attr:`allowed_event_classes`
|
|
or override :meth:`allow_matrix_event` for it to reach here.
|
|
"""
|
|
|
|
async def handle_state_event(self, evt: StateEvent) -> None:
|
|
"""
|
|
Called by :meth:`int_handle_event` for state events other than m.room.membership.
|
|
|
|
**N.B.** You may need to add the event class to :attr:`allowed_event_classes`
|
|
or override :meth:`allow_matrix_event` for it to reach here.
|
|
"""
|
|
|
|
async def handle_ephemeral_event(
|
|
self, evt: ReceiptEvent | PresenceEvent | TypingEvent
|
|
) -> None:
|
|
if evt.type == EventType.RECEIPT:
|
|
await self.handle_receipt(evt)
|
|
|
|
async def send_permission_error(self, room_id: RoomID) -> None:
|
|
await self.az.intent.send_notice(
|
|
room_id,
|
|
text=(
|
|
"You are not whitelisted to use this bridge.\n\n"
|
|
"If you are the owner of this bridge, see the bridge.permissions "
|
|
"section in your config file."
|
|
),
|
|
html=(
|
|
"<p>You are not whitelisted to use this bridge.</p>"
|
|
"<p>If you are the owner of this bridge, see the "
|
|
"<code>bridge.permissions</code> section in your config file.</p>"
|
|
),
|
|
)
|
|
|
|
async def accept_bot_invite(self, room_id: RoomID, inviter: br.BaseUser) -> None:
|
|
try:
|
|
await self.az.intent.join_room(room_id)
|
|
except Exception:
|
|
self.log.exception(f"Failed to join room {room_id} as bridge bot")
|
|
return
|
|
|
|
if not await self.allow_command(inviter):
|
|
await self.send_permission_error(room_id)
|
|
await self.az.intent.leave_room(room_id)
|
|
return
|
|
|
|
await self.send_welcome_message(room_id, inviter)
|
|
|
|
async def send_welcome_message(self, room_id: RoomID, inviter: br.BaseUser) -> None:
|
|
has_two_members, bridge_bot_in_room = await self._is_direct_chat(room_id)
|
|
is_management = has_two_members and bridge_bot_in_room
|
|
|
|
welcome_messages = [self.management_room_text.get("welcome")]
|
|
|
|
if is_management:
|
|
if await inviter.is_logged_in():
|
|
welcome_messages.append(self.management_room_text.get("welcome_connected"))
|
|
else:
|
|
welcome_messages.append(self.management_room_text.get("welcome_unconnected"))
|
|
|
|
additional_help = self.management_room_text.get("additional_help")
|
|
if additional_help:
|
|
welcome_messages.append(additional_help)
|
|
else:
|
|
cmd_prefix = self.commands.command_prefix
|
|
welcome_messages.append(f"Use `{cmd_prefix} help` for help.")
|
|
|
|
if self.management_room_multiple_messages:
|
|
for m in welcome_messages:
|
|
await self.az.intent.send_notice(room_id, text=m, html=markdown.render(m))
|
|
else:
|
|
combined = "\n".join(welcome_messages)
|
|
combined_html = "".join(map(markdown.render, welcome_messages))
|
|
await self.az.intent.send_notice(room_id, text=combined, html=combined_html)
|
|
|
|
async def int_handle_invite(self, evt: StateEvent) -> None:
|
|
self.log.debug(f"{evt.sender} invited {evt.state_key} to {evt.room_id}")
|
|
inviter = await self.bridge.get_user(evt.sender)
|
|
if inviter is None:
|
|
self.log.exception(f"Failed to find user with Matrix ID {evt.sender}")
|
|
return
|
|
elif evt.state_key == self.az.bot_mxid:
|
|
await self.accept_bot_invite(evt.room_id, inviter)
|
|
return
|
|
|
|
puppet = await self.bridge.get_puppet(UserID(evt.state_key))
|
|
if puppet:
|
|
await self.handle_puppet_invite(evt.room_id, puppet, inviter, evt)
|
|
return
|
|
|
|
await self.handle_invite(evt.room_id, UserID(evt.state_key), inviter, evt)
|
|
|
|
def is_command(self, message: MessageEventContent) -> tuple[bool, str]:
|
|
text = message.body
|
|
prefix = self.config["bridge.command_prefix"]
|
|
is_command = text.startswith(prefix)
|
|
if is_command:
|
|
text = text[len(prefix) + 1 :].lstrip()
|
|
return is_command, text
|
|
|
|
async def _send_mss(
|
|
self,
|
|
evt: Event,
|
|
status: MessageStatus,
|
|
reason: MessageStatusReason | None = None,
|
|
error: str | None = None,
|
|
message: str | None = None,
|
|
) -> None:
|
|
if not self.config.get("bridge.message_status_events", False):
|
|
return
|
|
status_content = BeeperMessageStatusEventContent(
|
|
network="", # TODO set network properly
|
|
relates_to=RelatesTo(rel_type=RelationType.REFERENCE, event_id=evt.event_id),
|
|
status=status,
|
|
reason=reason,
|
|
error=error,
|
|
message=message,
|
|
)
|
|
await self.az.intent.send_message_event(
|
|
evt.room_id, EventType.BEEPER_MESSAGE_STATUS, status_content
|
|
)
|
|
|
|
async def _send_crypto_status_error(
|
|
self,
|
|
evt: Event,
|
|
err: DecryptionError | None = None,
|
|
retry_num: int = 0,
|
|
is_final: bool = True,
|
|
edit: EventID | None = None,
|
|
wait_for: int | None = None,
|
|
) -> EventID | None:
|
|
msg = str(err)
|
|
if isinstance(err, (SessionNotFound, UnencryptedMessageError)):
|
|
msg = err.human_message
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.DECRYPTED, msg, permanent=is_final, retry_num=retry_num
|
|
)
|
|
|
|
if wait_for:
|
|
msg += f". The bridge will retry for {wait_for} seconds"
|
|
full_msg = f"\u26a0 Your message was not bridged: {msg}."
|
|
if isinstance(err, EncryptionUnsupportedError):
|
|
full_msg = "🔒️ This bridge has not been configured to support encryption"
|
|
event_id = None
|
|
if self.config.get("bridge.delivery_error_reports", True):
|
|
try:
|
|
content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=full_msg)
|
|
if edit:
|
|
content.set_edit(edit)
|
|
event_id = await self.az.intent.send_message(evt.room_id, content)
|
|
except IntentError:
|
|
self.log.debug("IntentError while sending encryption error", exc_info=True)
|
|
self.log.error(
|
|
"Got IntentError while trying to send encryption error message. "
|
|
"This likely means the bridge bot is not in the room, which can "
|
|
"happen if you force-enable e2ee on the homeserver without enabling "
|
|
"it by default on the bridge (bridge -> encryption -> default)."
|
|
)
|
|
|
|
await self._send_mss(
|
|
evt,
|
|
status=MessageStatus.RETRIABLE if is_final else MessageStatus.PENDING,
|
|
reason=MessageStatusReason.UNDECRYPTABLE,
|
|
error=str(err),
|
|
message=err.human_message if err else None,
|
|
)
|
|
|
|
return event_id
|
|
|
|
async def handle_message(self, evt: MessageEvent, was_encrypted: bool = False) -> None:
|
|
room_id = evt.room_id
|
|
user_id = evt.sender
|
|
event_id = evt.event_id
|
|
message = evt.content
|
|
|
|
if not was_encrypted and self.require_e2ee:
|
|
self.log.warning(f"Dropping {event_id} from {user_id} as it's not encrypted!")
|
|
await self._send_crypto_status_error(evt, UnencryptedMessageError(), 0)
|
|
return
|
|
|
|
sender = await self.bridge.get_user(user_id)
|
|
if not sender or not await self.allow_message(sender):
|
|
self.log.debug(
|
|
f"Ignoring message {event_id} from {user_id} to {room_id}:"
|
|
" user is not whitelisted."
|
|
)
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.BRIDGE, "user is not whitelisted"
|
|
)
|
|
return
|
|
self.log.debug(f"Received Matrix event {event_id} from {sender.mxid} in {room_id}")
|
|
self.log.trace("Event %s content: %s", event_id, message)
|
|
|
|
if isinstance(message, TextMessageEventContent):
|
|
message.trim_reply_fallback()
|
|
|
|
is_command, text = self.is_command(message)
|
|
portal = await self.bridge.get_portal(room_id)
|
|
if not is_command and portal:
|
|
if await self.allow_bridging_message(sender, portal):
|
|
await portal.handle_matrix_message(sender, message, event_id)
|
|
else:
|
|
self.log.debug(
|
|
f"Ignoring event {event_id} from {sender.mxid}:"
|
|
" not allowed to send to portal"
|
|
)
|
|
self._send_message_checkpoint(
|
|
evt,
|
|
MessageSendCheckpointStep.BRIDGE,
|
|
"user is not allowed to send to the portal",
|
|
)
|
|
return
|
|
|
|
if message.msgtype != MessageType.TEXT:
|
|
self.log.debug(
|
|
f"Ignoring event {event_id}: not a portal room and not a m.text message"
|
|
)
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.BRIDGE, "not a portal room and not a m.text message"
|
|
)
|
|
return
|
|
elif not await self.allow_command(sender):
|
|
self.log.debug(
|
|
f"Ignoring command {event_id} from {sender.mxid}: not allowed to run commands"
|
|
)
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.COMMAND, "not allowed to run commands"
|
|
)
|
|
return
|
|
|
|
has_two_members, bridge_bot_in_room = await self._is_direct_chat(room_id)
|
|
is_management = has_two_members and bridge_bot_in_room
|
|
|
|
if is_command or is_management:
|
|
try:
|
|
command, arguments = text.split(" ", 1)
|
|
args = arguments.split(" ")
|
|
except ValueError:
|
|
# Not enough values to unpack, i.e. no arguments
|
|
command = text
|
|
args = []
|
|
|
|
try:
|
|
await self.commands.handle(
|
|
room_id,
|
|
event_id,
|
|
sender,
|
|
command,
|
|
args,
|
|
message,
|
|
portal,
|
|
is_management,
|
|
bridge_bot_in_room,
|
|
)
|
|
except Exception as e:
|
|
self.log.debug(f"Error handling command {command} from {sender}: {e}")
|
|
self._send_message_checkpoint(evt, MessageSendCheckpointStep.COMMAND, e)
|
|
await self._send_mss(
|
|
evt,
|
|
status=MessageStatus.FAIL,
|
|
reason=MessageStatusReason.GENERIC_ERROR,
|
|
error="",
|
|
message="Command execution failed",
|
|
)
|
|
else:
|
|
await MessageSendCheckpoint(
|
|
event_id=event_id,
|
|
room_id=room_id,
|
|
step=MessageSendCheckpointStep.COMMAND,
|
|
timestamp=int(time.time() * 1000),
|
|
status=MessageSendCheckpointStatus.SUCCESS,
|
|
reported_by=MessageSendCheckpointReportedBy.BRIDGE,
|
|
event_type=EventType.ROOM_MESSAGE,
|
|
message_type=message.msgtype,
|
|
).send(
|
|
self.bridge.config["homeserver.message_send_checkpoint_endpoint"],
|
|
self.az.as_token,
|
|
self.log,
|
|
)
|
|
await self._send_mss(evt, status=MessageStatus.SUCCESS)
|
|
else:
|
|
self.log.debug(
|
|
f"Ignoring event {event_id} from {sender.mxid}:"
|
|
" not a command and not a portal room"
|
|
)
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.COMMAND, "not a command and not a portal room"
|
|
)
|
|
await self._send_mss(
|
|
evt,
|
|
status=MessageStatus.FAIL,
|
|
reason=MessageStatusReason.UNSUPPORTED,
|
|
error="Unknown room",
|
|
message="Unknown room",
|
|
)
|
|
|
|
async def _is_direct_chat(self, room_id: RoomID) -> tuple[bool, bool]:
|
|
try:
|
|
members = await self.az.intent.get_room_members(room_id)
|
|
return len(members) == 2, self.az.bot_mxid in members
|
|
except MatrixError:
|
|
return False, False
|
|
|
|
async def handle_receipt(self, evt: ReceiptEvent) -> None:
|
|
for event_id, receipts in evt.content.items():
|
|
for user_id, data in receipts.get(ReceiptType.READ, {}).items():
|
|
user = await self.bridge.get_user(user_id, create=False)
|
|
if not user or not await user.is_logged_in():
|
|
continue
|
|
|
|
portal = await self.bridge.get_portal(evt.room_id)
|
|
if not portal:
|
|
continue
|
|
|
|
await portal.schedule_disappearing()
|
|
|
|
if (
|
|
data.get(DOUBLE_PUPPET_SOURCE_KEY) == self.az.bridge_name
|
|
and await self.bridge.get_double_puppet(user_id) is not None
|
|
):
|
|
continue
|
|
|
|
await self.handle_read_receipt(user, portal, event_id, data)
|
|
|
|
async def handle_read_receipt(
|
|
self,
|
|
user: br.BaseUser,
|
|
portal: br.BasePortal,
|
|
event_id: EventID,
|
|
data: SingleReceiptEventContent,
|
|
) -> None:
|
|
pass
|
|
|
|
async def try_handle_sync_event(self, evt: Event) -> None:
|
|
try:
|
|
if isinstance(evt, (ReceiptEvent, PresenceEvent, TypingEvent)):
|
|
await self.handle_ephemeral_event(evt)
|
|
else:
|
|
self.log.trace("Unknown event type received from sync: %s", evt)
|
|
except Exception:
|
|
self.log.exception("Error handling manually received Matrix event")
|
|
|
|
async def _post_decrypt(
|
|
self, evt: Event, retry_num: int = 0, error_event_id: EventID | None = None
|
|
) -> None:
|
|
trust_state = evt["mautrix"]["trust_state"]
|
|
if trust_state < self.e2ee.min_send_trust:
|
|
self.log.warning(
|
|
f"Dropping {evt.event_id} from {evt.sender} due to insufficient verification level"
|
|
f" (event: {trust_state}, required: {self.e2ee.min_send_trust})"
|
|
)
|
|
await self._send_crypto_status_error(
|
|
evt,
|
|
retry_num=retry_num,
|
|
err=DeviceUntrustedError(trust_state),
|
|
edit=error_event_id,
|
|
)
|
|
return
|
|
|
|
self._send_message_checkpoint(
|
|
evt, MessageSendCheckpointStep.DECRYPTED, retry_num=retry_num
|
|
)
|
|
if error_event_id:
|
|
await self.az.intent.redact(evt.room_id, error_event_id)
|
|
await self.int_handle_event(evt, was_encrypted=True)
|
|
|
|
async def handle_encrypted(self, evt: EncryptedEvent) -> None:
|
|
if not self.e2ee:
|
|
self.log.debug(
|
|
"Got encrypted message %s from %s, but encryption is not enabled",
|
|
evt.event_id,
|
|
evt.sender,
|
|
)
|
|
await self._send_crypto_status_error(evt, EncryptionUnsupportedError())
|
|
return
|
|
try:
|
|
decrypted = await self.e2ee.decrypt(evt, wait_session_timeout=3)
|
|
except SessionNotFound as e:
|
|
await self._handle_encrypted_wait(evt, e, wait=22)
|
|
except DecryptionError as e:
|
|
self.log.warning(f"Failed to decrypt {evt.event_id}: {e}")
|
|
self.log.trace("%s decryption traceback:", evt.event_id, exc_info=True)
|
|
await self._send_crypto_status_error(evt, e)
|
|
else:
|
|
await self._post_decrypt(decrypted)
|
|
|
|
async def _handle_encrypted_wait(
|
|
self, evt: EncryptedEvent, err: SessionNotFound, wait: int
|
|
) -> None:
|
|
self.log.debug(
|
|
f"Couldn't find session {err.session_id} trying to decrypt {evt.event_id},"
|
|
" waiting even longer"
|
|
)
|
|
background_task.create(
|
|
self.e2ee.crypto.request_room_key(
|
|
evt.room_id,
|
|
evt.content.sender_key,
|
|
evt.content.session_id,
|
|
from_devices={evt.sender: [evt.content.device_id]},
|
|
)
|
|
)
|
|
event_id = await self._send_crypto_status_error(evt, err, is_final=False, wait_for=wait)
|
|
got_keys = await self.e2ee.crypto.wait_for_session(
|
|
evt.room_id, err.session_id, timeout=wait
|
|
)
|
|
if got_keys:
|
|
self.log.debug(
|
|
f"Got session {err.session_id} after waiting more, "
|
|
f"trying to decrypt {evt.event_id} again"
|
|
)
|
|
try:
|
|
decrypted = await self.e2ee.decrypt(evt, wait_session_timeout=0)
|
|
except DecryptionError as e:
|
|
await self._send_crypto_status_error(evt, e, retry_num=1, edit=event_id)
|
|
self.log.warning(f"Failed to decrypt {evt.event_id}: {e}")
|
|
self.log.trace("%s decryption traceback:", evt.event_id, exc_info=True)
|
|
else:
|
|
await self._post_decrypt(decrypted, retry_num=1, error_event_id=event_id)
|
|
return
|
|
else:
|
|
self.log.warning(f"Didn't get {err.session_id}, giving up on {evt.event_id}")
|
|
await self._send_crypto_status_error(
|
|
evt, SessionNotFound(err.session_id), retry_num=1, edit=event_id
|
|
)
|
|
|
|
async def handle_encryption(self, evt: StateEvent) -> None:
|
|
await self.az.state_store.set_encryption_info(evt.room_id, evt.content)
|
|
portal = await self.bridge.get_portal(evt.room_id)
|
|
if portal:
|
|
portal.encrypted = True
|
|
await portal.save()
|
|
if portal.is_direct:
|
|
portal.log.debug("Received encryption event in direct portal: %s", evt.content)
|
|
await portal.enable_dm_encryption()
|
|
|
|
def _send_message_checkpoint(
|
|
self,
|
|
evt: Event,
|
|
step: MessageSendCheckpointStep,
|
|
err: Exception | str | None = None,
|
|
permanent: bool = True,
|
|
retry_num: int = 0,
|
|
) -> None:
|
|
endpoint = self.bridge.config["homeserver.message_send_checkpoint_endpoint"]
|
|
if not endpoint:
|
|
return
|
|
if evt.type not in CHECKPOINT_TYPES:
|
|
return
|
|
|
|
self.log.debug(f"Sending message send checkpoint for {evt.event_id} (step: {step})")
|
|
status = MessageSendCheckpointStatus.SUCCESS
|
|
if err:
|
|
status = (
|
|
MessageSendCheckpointStatus.PERM_FAILURE
|
|
if permanent
|
|
else MessageSendCheckpointStatus.WILL_RETRY
|
|
)
|
|
|
|
checkpoint = MessageSendCheckpoint(
|
|
event_id=evt.event_id,
|
|
room_id=evt.room_id,
|
|
step=step,
|
|
timestamp=int(time.time() * 1000),
|
|
status=status,
|
|
reported_by=MessageSendCheckpointReportedBy.BRIDGE,
|
|
event_type=evt.type,
|
|
message_type=evt.content.msgtype if evt.type == EventType.ROOM_MESSAGE else None,
|
|
info=str(err) if err else None,
|
|
retry_num=retry_num,
|
|
)
|
|
background_task.create(checkpoint.send(endpoint, self.az.as_token, self.log))
|
|
|
|
allowed_event_classes: tuple[type, ...] = (
|
|
MessageEvent,
|
|
StateEvent,
|
|
ReactionEvent,
|
|
EncryptedEvent,
|
|
RedactionEvent,
|
|
ReceiptEvent,
|
|
TypingEvent,
|
|
PresenceEvent,
|
|
)
|
|
|
|
async def allow_matrix_event(self, evt: Event) -> bool:
|
|
# If the event is not one of the allowed classes, ignore it.
|
|
if not isinstance(evt, self.allowed_event_classes):
|
|
return False
|
|
# For room events, make sure the message didn't originate from the bridge.
|
|
if isinstance(evt, BaseRoomEvent):
|
|
# If the event is from a bridge ghost, ignore it.
|
|
if evt.sender == self.az.bot_mxid or self.bridge.is_bridge_ghost(evt.sender):
|
|
return False
|
|
# If the event is marked as double puppeted and we can confirm that we are in fact
|
|
# double puppeting that user ID, ignore it.
|
|
if (
|
|
evt.content.get(DOUBLE_PUPPET_SOURCE_KEY) == self.az.bridge_name
|
|
and await self.bridge.get_double_puppet(evt.sender) is not None
|
|
):
|
|
return False
|
|
# For non-room events and non-bridge-originated room events, allow.
|
|
return True
|
|
|
|
async def int_handle_event(self, evt: Event, was_encrypted: bool = False) -> None:
|
|
if isinstance(evt, StateEvent) and evt.type == EventType.ROOM_MEMBER and self.e2ee:
|
|
await self.e2ee.handle_member_event(evt)
|
|
if not await self.allow_matrix_event(evt):
|
|
return
|
|
self.log.trace("Received event: %s", evt)
|
|
|
|
if not was_encrypted:
|
|
self._send_message_checkpoint(evt, MessageSendCheckpointStep.BRIDGE)
|
|
start_time = time.time()
|
|
|
|
if evt.type == EventType.ROOM_MEMBER:
|
|
evt: StateEvent
|
|
unsigned = evt.unsigned or StateUnsigned()
|
|
prev_content = unsigned.prev_content or MemberStateEventContent()
|
|
prev_membership = prev_content.membership if prev_content else Membership.JOIN
|
|
if evt.content.membership == Membership.INVITE:
|
|
if prev_membership == Membership.KNOCK:
|
|
await self.handle_accept_knock(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
else:
|
|
await self.int_handle_invite(evt)
|
|
elif evt.content.membership == Membership.LEAVE:
|
|
if prev_membership == Membership.BAN:
|
|
await self.handle_unban(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
elif prev_membership == Membership.INVITE:
|
|
if evt.sender == evt.state_key:
|
|
await self.handle_reject(
|
|
evt.room_id, UserID(evt.state_key), evt.content.reason, evt.event_id
|
|
)
|
|
else:
|
|
await self.handle_disinvite(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
elif prev_membership == Membership.KNOCK:
|
|
if evt.sender == evt.state_key:
|
|
await self.handle_retract_knock(
|
|
evt.room_id, UserID(evt.state_key), evt.content.reason, evt.event_id
|
|
)
|
|
else:
|
|
await self.handle_reject_knock(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
|
|
elif evt.sender == evt.state_key:
|
|
await self.handle_leave(evt.room_id, UserID(evt.state_key), evt.event_id)
|
|
else:
|
|
await self.handle_kick(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
elif evt.content.membership == Membership.BAN:
|
|
await self.handle_ban(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.sender,
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
elif evt.content.membership == Membership.JOIN:
|
|
if prev_membership != Membership.JOIN:
|
|
await self.handle_join(evt.room_id, UserID(evt.state_key), evt.event_id)
|
|
else:
|
|
await self.handle_member_info_change(
|
|
evt.room_id, UserID(evt.state_key), evt.content, prev_content, evt.event_id
|
|
)
|
|
elif evt.content.membership == Membership.KNOCK:
|
|
await self.handle_knock(
|
|
evt.room_id,
|
|
UserID(evt.state_key),
|
|
evt.content.reason,
|
|
evt.event_id,
|
|
)
|
|
elif evt.type in (EventType.ROOM_MESSAGE, EventType.STICKER):
|
|
evt: MessageEvent
|
|
if evt.type != EventType.ROOM_MESSAGE:
|
|
evt.content.msgtype = MessageType(str(evt.type))
|
|
await self.handle_message(evt, was_encrypted=was_encrypted)
|
|
elif evt.type == EventType.ROOM_ENCRYPTED:
|
|
await self.handle_encrypted(evt)
|
|
elif evt.type == EventType.ROOM_ENCRYPTION:
|
|
await self.handle_encryption(evt)
|
|
else:
|
|
if evt.type.is_state and isinstance(evt, StateEvent):
|
|
await self.handle_state_event(evt)
|
|
elif evt.type.is_ephemeral and isinstance(
|
|
evt, (PresenceEvent, TypingEvent, ReceiptEvent)
|
|
):
|
|
await self.handle_ephemeral_event(evt)
|
|
else:
|
|
await self.handle_event(evt)
|
|
|
|
await self.log_event_handle_duration(evt, time.time() - start_time)
|
|
|
|
async def log_event_handle_duration(self, evt: Event, duration: float) -> None:
|
|
EVENT_TIME.labels(event_type=str(evt.type)).observe(duration)
|