notflixbot/notflixbot/matrix.py

530 lines
19 KiB
Python

import asyncio
import getpass
import json
import time
import aiohttp.client_exceptions
import click
import zmq.asyncio
from loguru import logger
from markdown import markdown
from nio import AsyncClient, AsyncClientConfig, InviteMemberEvent, JoinError
from nio import LoginError, MatrixRoom, MegolmEvent, ProfileSetAvatarError
from nio import RoomMemberEvent, RoomMessageText, RoomResolveAliasError
from nio.crypto import TrustState
from nio.exceptions import OlmUnverifiedDeviceError
from nio.responses import WhoamiError
from notflixbot import version_dict
from notflixbot.emojis import ROBOT
from notflixbot.errors import ImdbError, MatrixError, NotflixbotError
from notflixbot.notflix import Notflix
from notflixbot.youtube import Youtube
class MatrixClient:
@staticmethod
def catch(f):
async def inner(*args, **kwargs):
try:
return await f(*args, **kwargs)
except NotflixbotError as e:
logger.error(e)
raise SystemExit(2)
except click.exceptions.Abort:
logger.warning("user aborted")
raise SystemExit(1)
return inner
def __init__(self, config, ctx):
self.config = config
self.homeserver = config.homeserver
self.user_id = config.user_id
self.admin_room_ids = list()
try:
self._default_room = self.config.rooms[0]
logger.info(f"default room: {self._default_room}")
except KeyError:
self._default_room = None
logger.warning("no rooms in config, default_room not set")
self._context = ctx
self._socket = self._context.socket(zmq.PAIR)
self._socket.bind("inproc://webhook")
self._poller = zmq.asyncio.Poller()
self._poller.register(self._socket, zmq.POLLIN)
self.nio = AsyncClient(self.homeserver, self.user_id)
self.cmd_handlers = dict()
self.help_text = dict()
self._callbacks()
self._cmd_handlers()
self.notflix = Notflix(config.notflixbot)
self.youtube = Youtube(config.notflixbot)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def close(self):
if self.nio.logged_in:
if self._default_room is not None:
await self.send_msg(self._default_room, "❌ shutting down")
await self.nio.close()
await self._socket.close()
logger.info("exited.")
async def restore_login(self):
if self.config.creds is not None:
logger.warning(f"file exists: '{self.config.credentials_path}'")
if not click.confirm("overwrite?"):
logger.info("doig nothing and exiting")
raise SystemExit(1)
passwd = getpass.getpass()
return await self._login(passwd)
async def auth(self):
if self.config.creds is None:
logger.error(
"no stored credentials found, please run with --restore-login")
raise SystemExit
logger.debug(f"connecting to '{self.config.homeserver}'")
await self._set_creds()
whoami = await self.nio.whoami()
if isinstance(whoami, WhoamiError):
# whoami.status_code ("M_UNKNOWN_TOKEN")
# whoami.message ("Invalid macaroon passed.")
raise MatrixError(whoami)
else:
logger.info(f"matrix bot running as {self.nio.user_id}")
async def sync_forever(self):
"""this starts the event loop, and does the first sync, which
unblocks self._after_first_sync.
but we cant call self._after_first_sync here because then it is
blocked and waiting for the first iteration of the sync loop to start
"""
logger.info("matrix client syncing forever")
while True:
try:
return await self.nio.sync_forever(
timeout=3000, full_state=True)
except (
asyncio.exceptions.TimeoutError,
aiohttp.client_exceptions.ClientOSError
) as e:
logger.error(e)
logger.error("timed out, reconnecting after 10s..")
time.sleep(10.0)
async def start(self):
if not self.nio.logged_in:
await self.auth()
await asyncio.gather(
# order is important here, _after_first_sync awaits for first sync
# then the rest is executed
self._after_first_sync(),
asyncio.get_event_loop().create_task(
self.nio.sync_forever(timeout=3000, full_state=True)
))
async def webhook_poller(self):
logger.info("polling zmq socket")
while True:
# keyboard interrupt?
events = await self._poller.poll(3000)
if self._socket in dict(events):
z_data = await self._socket.recv_string()
m_data = json.loads(z_data)
room = m_data['room']
msg = m_data['msg']
plain = m_data.get('plain')
logger.debug(f"{room}: '{msg}'")
await self.send_msg(room, msg, plain)
async def _room_id(self, room_addr):
if room_addr.startswith('!'):
# this is a room_id
return room_addr
room = await self.nio.room_resolve_alias(room_addr)
if isinstance(room, RoomResolveAliasError):
raise MatrixError(f"cannot resolve: '{room_addr}'")
return room.room_id
async def _after_first_sync(self):
# wait for sync
await self.nio.synced.wait()
joined = await self.nio.joined_rooms()
for room_id in joined.rooms:
await self._trust_all_users_in_room(room_id)
for room_alias in self.config.admin_rooms:
admin_room_id = await self._room_id(room_alias)
self.admin_room_ids.append(admin_room_id)
if self.config.avatar:
await self._avatar()
if self._default_room is not None:
msg = f"{ROBOT} `{version_dict['name']} {version_dict['version']}`"
await self.send_msg(self._default_room, msg)
await self._key_sync()
logger.debug("first sync is done")
async def _set_creds(self):
self.nio.user_id = self.config.creds.user_id
self.nio.access_token = self.config.creds.access_token
self.nio.device_id = self.config.creds.device_id
self.nio.store_path = self.config.storage_path
self.nio.config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
self.nio.load_store()
async def _avatar(self):
avatar = await self.nio.set_avatar(self.config.avatar)
if isinstance(avatar, ProfileSetAvatarError):
logger.warning(f"error setting avatar: {avatar}")
else:
logger.debug("set avatar")
def _cmd_handlers(self):
self.cmd_handlers['!add'] = self._handle_add
self.help_text["!add"] = "usage: `!add $IMDB_URL`"
self.cmd_handlers['!ruok'] = self._handle_ruok
self.help_text['!ruok'] = "check if the bot is ok"
self.cmd_handlers['!whoami'] = self._handle_whoami
self.help_text['!whoami'] = "show your user id"
self.cmd_handlers['!key_sync'] = self._key_sync
self.help_text["!key_sync"] = "force a key sync"
self.cmd_handlers['!help'] = self._handle_help
self.help_text["!help"] = "this message"
self.cmd_handlers['!crash'] = self._handle_crash
def _callbacks(self):
self.nio.add_event_callback(
self._cb_invite_filtered, (InviteMemberEvent,))
self.nio.add_event_callback(self._cb_message, (RoomMessageText,))
self.nio.add_event_callback(self._cb_room_member, (RoomMemberEvent,))
self.nio.add_event_callback(self._cb_decryption_fail, (MegolmEvent,))
async def _key_sync(self, room=None, event=None):
if self.nio.should_upload_keys:
resp_upload = await self.nio.keys_upload()
logger.info(f"uploaded keys: {resp_upload}")
if room is not None:
await self.send_msg(room.room_id, resp_upload)
if self.nio.should_query_keys:
logger.warning(
f"should query keys for: {self.nio.users_for_key_query}")
resp_query = await self.nio.keys_query()
logger.info(f"queried for keys: {resp_query}")
if room is not None:
await self.send_msg(room.room_id, resp_query)
if self.nio.should_claim_keys:
# for user in self.nio.get_users_for_key_claiming():
# resp_claim = await self.nio.keys_claim(user) # noqa
resp_claim = await self.nio.keys_claim()
logger.warning("claimed keys: '{resp_claim}'")
if room is not None:
await self.send_msg(room.room_id, resp_claim)
if room is not None and event is not None:
await self.send_msg(room.room_id, "key sync: `ok`")
async def _login(self, passwd):
try:
resp = await self.nio.login(
passwd,
device_name=self.config.device_name
)
if isinstance(resp, LoginError):
logger.error(f"failed to login: '{resp.message}'")
raise MatrixError(resp.message)
creds = {
"user_id": resp.user_id,
"device_id": resp.device_id, # 10 uppercase letters
"access_token": resp.access_token
}
self.config.update_creds(creds)
logger.success(resp)
return True
except aiohttp.client_exceptions.ClientError as e:
raise MatrixError(repr(e))
async def _trust_all_users_in_room(self, room):
room_id = await self._room_id(room)
members = await self.nio.joined_members(room_id)
for u in members.members:
await self._trust_user_devices(u.user_id)
async def _trust_user_devices(self, user_id):
if user_id != self.config.user_id and self.config.autotrust:
for dev_id, olm_device in self.nio.device_store[user_id].items():
if olm_device.trust_state != TrustState.verified:
self.nio.verify_device(olm_device)
logger.info(f"trusting {dev_id} from user {user_id}")
else:
logger.debug(f"already trust {dev_id} from user {user_id}")
async def _cb_decryption_fail(self, room: MatrixRoom,
event: MegolmEvent) -> None:
red_x_and_lock_emoji = "❌ 🔐"
logger.warning(f"unable to decrypt message from {event.sender}")
await self.react_to_event(room.room_id, event.event_id,
red_x_and_lock_emoji)
async def _cb_room_member(self, room: MatrixRoom,
event: RoomMemberEvent) -> None:
if event.content['membership'] == "join":
if event.state_key == self.nio.user_id:
# we joined a room
logger.debug("room member event")
await self._trust_all_users_in_room(room.room_id)
async def _cb_invite(self, room: MatrixRoom,
event: InviteMemberEvent) -> None:
"""for when an invite is received, join the room specified in the invite
"""
logger.debug(f"got invite to {room.room_id} from {event.sender}")
result = await self.nio.join(room.room_id)
if isinstance(result, JoinError):
logger.error(f"error joining room {room.room_id}: {result}.")
else:
logger.info(
f"joined {room.canonical_alias} invited by {event.sender}")
async def _cb_invite_filtered(self, room: MatrixRoom,
event: InviteMemberEvent) -> None:
"""InviteMemberEvent is fired for every m.room.member state received
in a sync response's `rooms.invite` section. so we will get
some that are not our own invite events (f.ex. inviter's
membership).
this ignores them and calls Callbacks.invite
with our own invite events.
"""
if event.state_key == self.nio.user_id:
await self._cb_invite(room, event)
else:
logger.debug(f"ignoring invite event: {event}")
async def _cb_message(self, room: MatrixRoom,
event: RoomMessageText) -> None:
# ignore messages from ourselves
if event.sender == self.nio.user_id:
return
# user_displayname = room.user_name(event.sender)
# display_name = room.display_name
user_id = event.sender
room_id = room.room_id
if room.canonical_alias is not None:
room_alias = room.canonical_alias
else:
room_alias = room.room_id
msg = event.body.strip()
# logger.debug(f"room: {room_id}, user_id: {user_id}, msg: '{msg}'")
# "".split(" ")[0] -> ""
cmd = event.body.strip().split(' ')
prefix = cmd[0]
if prefix in self.cmd_handlers:
if room_id not in self.admin_room_ids:
logger.warning(
f"ignored cmd '{msg}' by {user_id} in {room_alias}")
else:
handler_func = self.cmd_handlers[prefix]
await handler_func(room, event)
elif "youtube.com" in msg or "youtu.be" in msg:
yt_unfurl = await self.youtube.unfurl(msg)
if yt_unfurl is not None:
yt_msg = yt_unfurl[0]
yt_plain = yt_unfurl[1]
await self.send_msg(room.room_id, yt_msg, yt_plain)
else:
await self._phrase_respond(room, event)
async def _phrase_respond(self, room, event):
phrases = {
'are you alive?': 'no im a `robot`',
'are you alive': 'no im a `robot`',
'i am a robot': 'FILTHY LIES',
"i'm a robot": 'FILTHY LIES',
'im a robot': 'FILTHY LIES',
'fuck you': '🖕',
'duck you': '🦆',
}
# keywords = {
# 'cheese': 'did someone say 🧀?'
# }
msg = event.body.strip().lower()
response = phrases.get(msg, None)
if response is not None:
await self.send_msg(room.room_id, response)
async def _handle_help(self, room, event):
cmds = "<br>".join([f"`{k}`: {v}" for k, v in self.help_text.items()])
n = version_dict['name']
v = version_dict['version']
await self.send_msg(room.room_id, f"`{n} v{v}` help: \n\n{cmds}")
async def _handle_whoami(self, room, event):
your_id = event.sender
my_id = self.config.creds.user_id
await self.send_msg(
room.room_id, f"i am: `{my_id}` and you are: `{your_id}`")
async def _handle_ruok(self, room, event):
await self.send_msg(room.room_id, "`iamok`")
async def _handle_crash(self, room, event):
# CRASH AND BURN
return 1 / 0
async def _handle_add(self, room, event):
try:
msg = event.body.strip().split(' ')
url = msg[1].strip()
try:
user = event.sender.split(":")[0][1:]
except IndexError as e:
logger.error(f"error parsing user_id '{event.sender}': {e}")
user = "unknown"
added_status, item = self.notflix.add_from_imdb_url(url, user)
if added_status == "added":
await self.send_msg(
room.room_id,
f"added: {item['title']} ({item['release_year']})")
elif added_status == "exists":
await self.send_msg(
room.room_id,
f"already exists: {item['title']} ({item['release_year']})")
else:
try:
errmsg = item[0]['errorMessage']
await self.send_msg(room.room_id, errmsg)
except (IndexError, KeyError):
pass
return item
except IndexError:
logger.error(f"invalid msg from {event.sender}: '{event.body}'")
self.send_msg(room.room_id, "url is missing")
except (NotflixbotError, ImdbError) as e:
logger.warning(e)
await self.send_msg(room.room_id, str(e))
async def send_msg(self, room, msg, plain=None):
"""wrapper function to handle exceptions cleanly
"""
try:
return await self._send_msg(room, msg, plain)
except OlmUnverifiedDeviceError as e:
logger.warning(e)
# self.nio.verify_device(e.devide)
await self._trust_user_devices(e.device.user_id)
return await self._send_msg(room, msg, plain)
async def _send_msg(self, room, msg, plain=None):
# msgtypes:
# * m.notice: looks more grey?
# * m.text: normal?
# * m.room.message: no markdown
if msg is None:
msg = ""
if plain is None:
plain = msg
# strip away simple markdown that i use most commonly
plain = plain.replace('`', '')
try:
room_id = await self._room_id(room)
except MatrixError as e:
# webhook isnt aware of this
logger.error(e)
return
await self.nio.room_send(
room_id,
message_type="m.room.message",
content={
'msgtype': 'm.text',
'format': 'org.matrix.custom.html',
'formatted_body': markdown(msg),
'body': plain
},
ignore_unverified_devices=False
)
logger.debug(f"sent '{msg}' to '{room_id}'")
async def react_to_event(self, room, event, reaction_text):
await self.nio.room_send(
room.room_id,
message_type="m.reaction",
content={
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event.event_id,
"key": reaction_text
}
},
ignore_unverified_devices=False
)
def markdown_json(msg):
return "\r\n".join(
[f" {a}" for a in json.dumps(msg, indent=2).splitlines()]
)
def make_pill(user_id):
return f'<a href="https://matrix.to/#/{user_id}">{user_id}</a>'
"""
from nio import SyncResponse
self.nio.add_response_callback(self._cb_sync, SyncResponse)
async def _cb_sync(self, response: SyncResponse) -> None:
# called every time `sync_forever` sucessfully syncs with the server
logger.trace(f"synced: {response}")
"""