matrix-nio/nio/rooms.py

554 lines
18 KiB
Python

# -*- coding: utf-8 -*-
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
# Copyright © 2021 Famedly GmbH
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
from builtins import super
from collections import defaultdict
from enum import Enum
from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Tuple, Union
from jsonschema.exceptions import SchemaError, ValidationError
from logbook import Logger
from .events import (
AccountDataEvent,
EphemeralEvent,
Event,
FullyReadEvent,
InviteAliasEvent,
InviteMemberEvent,
InviteNameEvent,
PowerLevels,
PowerLevelsEvent,
Receipt,
ReceiptEvent,
RoomAliasEvent,
RoomAvatarEvent,
RoomCreateEvent,
RoomEncryptionEvent,
RoomGuestAccessEvent,
RoomHistoryVisibilityEvent,
RoomJoinRulesEvent,
RoomMemberEvent,
RoomNameEvent,
RoomTopicEvent,
RoomUpgradeEvent,
TagEvent,
TypingNoticeEvent,
)
from .log import logger_group
from .responses import RoomSummary, UnreadNotifications
logger = Logger("nio.rooms")
logger_group.add_logger(logger)
__all__ = [
"MatrixRoom",
"MatrixInvitedRoom",
"MatrixUser",
]
class MatrixRoom:
"""Represents a Matrix room."""
def __init__(self, room_id, own_user_id, encrypted=False):
# type: (str, str, bool) -> None
"""Initialize a MatrixRoom object."""
# yapf: disable
self.room_id = room_id # type: str
self.own_user_id = own_user_id
self.creator = "" # type: str
self.federate = True # type: bool
self.room_version = "1" # type: str
self.guest_access = "forbidden" # type: str
self.join_rule = "invite" # type: str
self.history_visibility = "shared" # type: str
self.canonical_alias = None # type: Optional[str]
self.topic = None # type: Optional[str]
self.name = None # type: Optional[str]
self.users = dict() # type: Dict[str, MatrixUser]
self.invited_users = dict() # type: Dict[str, MatrixUser]
self.names = defaultdict(list) # type: DefaultDict[str, List[str]]
self.encrypted = encrypted # type: bool
self.power_levels = PowerLevels() # type: PowerLevels
self.typing_users = [] # type: List[str]
self.read_receipts = {} # type: Dict[str, Receipt]
self.summary = None # type: Optional[RoomSummary]
self.room_avatar_url = None # type: Optional[str]
self.fully_read_marker: Optional[str] = None
self.tags: Dict[str, Optional[Dict[str, float]]] = {}
self.unread_notifications: int = 0
self.unread_highlights: int = 0
self.members_synced: bool = False
self.replacement_room: Union[str, None] = None
# yapf: enable
@property
def display_name(self) -> str:
"""Calculate display name for a room.
Prefer returning the room name if it exists, falling back to
a group-style name if not.
Follows:
https://matrix.org/docs/spec/client_server/r0.6.0#id342
"""
return self.named_room_name() or self.group_name()
def named_room_name(self) -> Optional[str]:
"""Return the name of the room if it's a named room, otherwise None."""
return self.name or self.canonical_alias or None
def group_name(self) -> str:
"""Return the group-style name of the room.
In other words, a display name based on the names of room members. This
is used for ad-hoc groups of people (usually direct chats).
"""
empty, user_ids, others = self.group_name_structure()
names = [self.user_name(u) or u for u in user_ids]
if others:
text = f"{', '.join(names)} and {others} other{'' if others == 1 else 's'}"
elif len(names) == 0:
text = ""
elif len(names) == 1:
text = names[0]
else:
text = f"{', '.join(names[:-1])} and {names[-1]}"
if empty and text:
text = f"Empty Room (had {text})"
elif empty:
text = "Empty Room"
return text
def group_name_structure(self) -> Tuple[bool, List[str], int]:
"""Get if room is empty, ID for listed users and the N others count."""
try:
heroes, joined, invited = self._summary_details()
except ValueError:
users = [
u
for u in sorted(self.users, key=lambda u: self.user_name(u))
if u != self.own_user_id
]
empty = not users
if len(users) <= 5:
return (empty, users, 0)
return (empty, users[:5], len(users) - 5)
empty = self.member_count <= 1
if len(heroes) >= self.member_count - 1:
return (empty, heroes, 0)
return (empty, heroes, self.member_count - 1 - len(heroes))
def user_name(self, user_id: str) -> Optional[str]:
"""Get disambiguated display name for a user.
Returns display name of a user if display name is unique or returns
a display name in form "<display name> (<matrix id>)" if there is
more than one user with same display name.
"""
if user_id not in self.users:
return None
user = self.users[user_id]
if len(self.names[user.name]) > 1:
return user.disambiguated_name
return user.name
def user_name_clashes(self, name: str) -> List[str]:
"""Get a list of users that have same display name."""
return self.names[name]
def avatar_url(self, user_id: str) -> Optional[str]:
"""Get avatar url for a user.
Returns a matrix content URI, or None if the user has no avatar.
"""
if user_id not in self.users:
return None
return self.users[user_id].avatar_url
@property
def gen_avatar_url(self) -> Optional[str]:
"""
Get the calculated room's avatar url.
Either the room's avatar if one is set, or the avatar of the
first user that's not ourselves if the room is an unnamed group or
has exactly two users.
"""
if self.room_avatar_url:
return self.room_avatar_url
try:
heroes, _, _ = self._summary_details()
except ValueError:
if self.is_group and len(self.users) == 2:
return self.avatar_url(
next(
u
for u in sorted(self.users, key=lambda u: self.user_name(u))
if u != self.own_user_id
)
)
return None
if self.is_group and self.member_count == 2 and len(heroes) >= 1:
return self.avatar_url(heroes[0])
return None
@property
def machine_name(self) -> str:
"""Calculate an unambiguous, unique machine name for a room.
Either use the more human-friendly canonical alias, if it exists, or
the internal room ID if not.
"""
return self.canonical_alias or self.room_id
@property
def is_named(self) -> bool:
"""Determine whether a room is named.
A named room is a room with either the name or a canonical alias set.
"""
return bool(self.canonical_alias or self.name)
@property
def is_group(self) -> bool:
"""Determine whether a room is an ad-hoc group (often a direct chat).
A group is an unnamed room with no canonical alias.
"""
return not self.is_named
def add_member(
self,
user_id: str,
display_name: Optional[str],
avatar_url: Optional[str],
invited: bool = False,
) -> bool:
if user_id in self.users:
return False
level = self.power_levels.users.get(
user_id,
self.power_levels.defaults.users_default,
)
user = MatrixUser(user_id, display_name, avatar_url, level, invited)
self.users[user_id] = user
if invited:
self.invited_users[user_id] = user
name = display_name if display_name else user_id
self.names[name].append(user_id)
return True
def remove_member(self, user_id: str) -> bool:
user = self.users.pop(user_id, None)
if user:
self.names[user.name].remove(user.user_id)
invited_user = self.invited_users.pop(user_id, None)
if invited_user:
try:
self.names[invited_user.name].remove(invited_user.user_id)
except ValueError:
pass
return bool(user or invited_user)
def handle_membership(
self,
event: Union[RoomMemberEvent, InviteMemberEvent],
) -> bool:
"""Handle a membership event for the room.
Args:
event (RoomMemberEvent): The event that should be handled that
updates the room state.
Returns True if the member list of the room has changed False
otherwise.
"""
target_user = event.state_key
invited = event.membership == "invite"
if event.membership in ("invite", "join"):
# Add member if not already present in self.users,
# or the member is invited but not present in self.invited_users
if target_user not in self.users or (
invited and target_user not in self.invited_users
):
display_name = event.content.get("displayname", None)
avatar_url = event.content.get("avatar_url", None)
return self.add_member(
target_user,
display_name,
avatar_url,
invited,
)
user = self.users[target_user]
# Handle membership change
user.invited = invited
if not invited and target_user in self.invited_users:
del self.invited_users[target_user]
# Handle profile changes
if "displayname" in event.content:
self.names[user.name].remove(user.user_id)
user.display_name = event.content["displayname"]
self.names[user.name].append(user.user_id)
if "avatar_url" in event.content:
user.avatar_url = event.content["avatar_url"]
return False
elif event.membership in ("leave", "ban"):
return self.remove_member(target_user)
return False
def handle_ephemeral_event(self, event: EphemeralEvent) -> None:
if isinstance(event, TypingNoticeEvent):
self.typing_users = event.users
if isinstance(event, ReceiptEvent):
read_receipts = filter(lambda x: x.receipt_type == "m.read", event.receipts)
for read_receipt in read_receipts:
self.read_receipts[read_receipt.user_id] = read_receipt
def handle_event(self, event: Event) -> None:
logger.info(
f"Room {self.room_id} handling event of type {type(event).__name__}"
)
if isinstance(event, RoomCreateEvent):
self.creator = event.creator
self.federate = event.federate
self.room_version = event.room_version
elif isinstance(event, RoomGuestAccessEvent):
self.guest_access = event.guest_access
elif isinstance(event, RoomHistoryVisibilityEvent):
self.history_visibility = event.history_visibility
elif isinstance(event, RoomJoinRulesEvent):
self.join_rule = event.join_rule
elif isinstance(event, RoomNameEvent):
self.name = event.name
elif isinstance(event, RoomAliasEvent):
self.canonical_alias = event.canonical_alias
elif isinstance(event, RoomTopicEvent):
self.topic = event.topic
elif isinstance(event, RoomAvatarEvent):
self.room_avatar_url = event.avatar_url
elif isinstance(event, RoomEncryptionEvent):
self.encrypted = True
elif isinstance(event, RoomUpgradeEvent):
self.replacement_room = event.replacement_room
elif isinstance(event, PowerLevelsEvent):
self.power_levels.update(event.power_levels)
# Update the power levels of the joined users
for user_id, level in self.power_levels.users.items():
if user_id in self.users:
logger.info(
f"Changing power level for user {user_id} from {self.users[user_id].power_level} to {level}"
)
self.users[user_id].power_level = level
def handle_account_data(self, event: AccountDataEvent) -> None:
if isinstance(event, FullyReadEvent):
self.fully_read_marker = event.event_id
if isinstance(event, TagEvent):
self.tags = event.tags
def update_unread_notifications(self, unread: UnreadNotifications) -> None:
if unread.notification_count is not None:
self.unread_notifications = unread.notification_count
if unread.highlight_count is not None:
self.unread_highlights = unread.highlight_count
def update_summary(self, summary: RoomSummary) -> None:
if not self.summary:
self.summary = summary
return
if summary.joined_member_count is not None:
self.summary.joined_member_count = summary.joined_member_count
if summary.invited_member_count is not None:
self.summary.invited_member_count = summary.invited_member_count
if summary.heroes is not None:
self.summary.heroes = summary.heroes
def _summary_details(self) -> Tuple[List[str], int, int]:
"""Return the summary attributes if it can be used for calculations."""
valid = bool(
self.summary is not None
and self.summary.joined_member_count is not None
and self.summary.invited_member_count is not None,
)
if not valid:
raise ValueError("Unusable summary")
return ( # type: ignore
self.summary.heroes or [], # type: ignore
self.summary.joined_member_count, # type: ignore
self.summary.invited_member_count, # type: ignore
)
@property
def joined_count(self) -> int:
try:
return self._summary_details()[1]
except ValueError:
return len(tuple(u for u in self.users.values() if not u.invited))
@property
def invited_count(self) -> int:
try:
return self._summary_details()[2]
except ValueError:
return len(tuple(u for u in self.users.values() if u.invited))
@property
def member_count(self) -> int:
try:
_, joined, invited = self._summary_details()
except ValueError:
return len(self.users)
return joined + invited
class MatrixInvitedRoom(MatrixRoom):
def __init__(self, room_id: str, own_user_id: str) -> None:
self.inviter: Optional[str] = None
super().__init__(room_id, own_user_id)
def handle_membership(
self,
event: Union[RoomMemberEvent, InviteMemberEvent],
) -> bool:
"""Handle a membership event for the invited room.
Args:
event (RoomMemberEvent): The event that should be handled that
updates the room state.
Returns True if the member list of the room has changed False
otherwise.
"""
if event.membership == "invite" and event.state_key == self.own_user_id:
self.inviter = event.sender
return super().handle_membership(event)
def handle_event(self, event: Event) -> None:
logger.info(
f"Room {self.room_id} handling event of type {type(event).__name__}"
)
if isinstance(event, InviteMemberEvent):
self.handle_membership(event)
elif isinstance(event, InviteNameEvent):
self.name = event.name
elif isinstance(event, InviteAliasEvent):
self.canonical_alias = event.canonical_alias
class MatrixUser:
def __init__(
self,
user_id: str,
display_name: Optional[str] = None,
avatar_url: Optional[str] = None,
power_level: int = 0,
invited: bool = False,
presence: str = "offline",
last_active_ago: Optional[int] = None,
currently_active: Optional[bool] = None,
status_msg: Optional[str] = None,
):
# yapf: disable
self.user_id = user_id
self.display_name = display_name
self.avatar_url = avatar_url
self.power_level = power_level
self.invited = invited
self.presence = presence
self.last_active_ago = last_active_ago
self.currently_active = currently_active
self.status_msg = status_msg
# yapf: enable
@property
def name(self) -> str:
return self.display_name or self.user_id
@property
def disambiguated_name(self) -> str:
# as per https://matrix.org/docs/spec/client_server/r0.4.0.html#id346
if self.display_name:
return f"{self.display_name} ({self.user_id})"
return self.user_id