mirror of https://github.com/JOJ0/synadm.git
1417 lines
55 KiB
Python
1417 lines
55 KiB
Python
# -*- coding: utf-8 -*-
|
|
# synadm
|
|
# Copyright (C) 2020-2023 Johannes Tiefenbacher
|
|
#
|
|
# synadm is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# synadm is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""Synapse Admin API and regular Matrix API clients
|
|
|
|
Most API calls defined in this module respect the API's defaults and only pass
|
|
what's necessary in the request body.
|
|
|
|
A fully qualified Matrix user ID looks like this: @user:server, where server
|
|
often is a domain name only, e.g @user@example.org
|
|
|
|
See https://github.com/matrix-org/synapse/tree/master/docs/admin_api for
|
|
documentation of the Synapse Admin APIs and the Matrix spec at
|
|
https://matrix.org/docs/spec/#matrix-apis.
|
|
"""
|
|
|
|
import requests
|
|
from http.client import HTTPConnection
|
|
import datetime
|
|
import json
|
|
import urllib.parse
|
|
import re
|
|
|
|
|
|
class ApiRequest:
|
|
"""Basic API request handling and helper utilities
|
|
|
|
This is subclassed by SynapseAdmin and Matrix
|
|
"""
|
|
def __init__(self, log, user, token, base_url, path, timeout, debug,
|
|
verify=None):
|
|
"""Initialize an APIRequest object
|
|
|
|
Args:
|
|
log (logger object): an already initialized logger object
|
|
user (string): the user with access to the API (currently unused).
|
|
This can either be the fully qualified Matrix user ID, or just
|
|
the localpart of the user ID.
|
|
token (string): the API user's token
|
|
base_url (string): URI e.g https://fqdn:port
|
|
path (string): the path to the API endpoint; it's put after
|
|
base_url to form the basis for all API endpoint paths
|
|
timeout (int): requests module timeout used in query method
|
|
debug (bool): enable/disable debugging in requests module
|
|
verify(bool): SSL verification is turned on by default
|
|
and can be turned off using this argument.
|
|
"""
|
|
self.log = log
|
|
self.user = user
|
|
self.token = token
|
|
self.base_url = base_url.strip("/")
|
|
self.path = path.strip("/")
|
|
self.headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": "Bearer " + self.token
|
|
}
|
|
self.timeout = timeout
|
|
if debug:
|
|
HTTPConnection.debuglevel = 1
|
|
self.verify = verify
|
|
|
|
def query(self, method, urlpart, params=None, data=None, token=None,
|
|
base_url_override=None, verify=None, *args, **kwargs):
|
|
"""Generic wrapper around requests methods.
|
|
|
|
Handles requests methods, logging and exceptions, and URL encoding.
|
|
|
|
Args:
|
|
urlpart (string): The path to the API endpoint, excluding
|
|
self.base_url and self.path (the part after
|
|
proto://fqdn:port/path). It will be passed to Python's
|
|
str.format, so the string should not be already formatted
|
|
(as f-strings or with str.format) as to sanitize the URL.
|
|
params (dict, optional): URL parameters (?param1&paarm2). Defaults
|
|
to None.
|
|
data (dict, optional): Request body used in POST, PUT, DELETE
|
|
requests. Defaults to None.
|
|
base_url_override (bool): The default setting of self.base_url set
|
|
on initialization can be overwritten using this argument.
|
|
verify(bool): Mandatory SSL verification is turned on by default
|
|
and can be turned off using this method.
|
|
*args: Arguments that will be URL encoded and passed to Python's
|
|
str.format.
|
|
**kwargs: Keyword arguments that will be URL encoded (only the
|
|
values) and passed to Python's str.format.
|
|
|
|
Returns:
|
|
string or None: Usually a JSON string containing
|
|
the response of the API; responses that are not 200(OK) (usally
|
|
error messages returned by the API) will also be returned as
|
|
JSON strings. On exceptions the error type and description are
|
|
logged and None is returned.
|
|
"""
|
|
args = list(args)
|
|
kwargs = dict(kwargs)
|
|
for i in range(len(args)):
|
|
args[i] = urllib.parse.quote(args[i], safe="")
|
|
for i in kwargs.keys():
|
|
kwargs[i] = urllib.parse.quote(kwargs[i], safe="")
|
|
urlpart = urlpart.format(*args, **kwargs)
|
|
|
|
if base_url_override:
|
|
self.log.debug("base_url override!")
|
|
url = f"{base_url_override}/{self.path}/{urlpart}"
|
|
host_descr = urllib.parse.urlparse(base_url_override).netloc
|
|
else:
|
|
url = f"{self.base_url}/{self.path}/{urlpart}"
|
|
host_descr = "Synapse"
|
|
self.log.info("Querying %s on %s", method, url)
|
|
|
|
if token:
|
|
self.log.debug("Token override! Adjusting headers.")
|
|
self.headers["Authorization"] = "Bearer " + token
|
|
|
|
override_verify = self.verify
|
|
if verify is not None:
|
|
override_verify = verify
|
|
|
|
try:
|
|
resp = requests.request(
|
|
method, url, headers=self.headers, timeout=self.timeout,
|
|
params=params, json=data, verify=override_verify
|
|
)
|
|
if not resp.ok:
|
|
self.log.warning(f"{host_descr} returned status code "
|
|
f"{resp.status_code}")
|
|
return resp.json()
|
|
except Exception as error:
|
|
self.log.error("%s while querying %s: %s",
|
|
type(error).__name__, host_descr, error)
|
|
return None
|
|
|
|
def _timestamp_from_days_ago(self, days):
|
|
"""Get a unix timestamp in ms from days ago
|
|
|
|
Args:
|
|
days (int): number of days
|
|
|
|
Returns:
|
|
int: a unix timestamp in milliseconds (ms)
|
|
"""
|
|
return int((
|
|
datetime.datetime.now() - datetime.timedelta(days=days)
|
|
).timestamp() * 1000)
|
|
|
|
def _timestamp_from_days_ahead(self, days):
|
|
"""Get a unix timestamp in ms for the given number of days ahead
|
|
|
|
Args:
|
|
days (int): number of days
|
|
|
|
Returns:
|
|
int: a unix timestamp in milliseconds (ms)
|
|
"""
|
|
return int((
|
|
datetime.datetime.now() + datetime.timedelta(days=days)
|
|
).timestamp() * 1000)
|
|
|
|
def _timestamp_from_datetime(self, _datetime):
|
|
"""Get a unix timestamp in ms from a datetime object
|
|
|
|
Args:
|
|
_datetime (datetime object): an object built by datetime.datetime
|
|
|
|
Returns:
|
|
int: a unix timestamp in milliseconds (ms)
|
|
"""
|
|
return int(_datetime.timestamp()) * 1000
|
|
|
|
def _datetime_from_timestamp(self, timestamp, as_str=False):
|
|
""" Get a datetime object from a unix timestamp in ms
|
|
|
|
Args:
|
|
timestamp (int): a unix timestamp in milliseconds (ms)
|
|
|
|
Returns:
|
|
datetime object: an object built by datetime.datetime.
|
|
If as_str is set, return a string formatted by
|
|
self._format_datetime() instead.
|
|
"""
|
|
dt_o = datetime.datetime.fromtimestamp(timestamp / 1000)
|
|
if as_str:
|
|
return self._format_datetime(dt_o)
|
|
else:
|
|
return dt_o
|
|
|
|
def _format_datetime(self, datetime_obj):
|
|
""" Get a formatted date as a string.
|
|
|
|
Args:
|
|
datetime_obj (int): A datetime object.
|
|
|
|
Returns:
|
|
string: A date in the format we use it throughout synadm. No sanity
|
|
checking.
|
|
"""
|
|
return datetime_obj.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
class MiscRequest(ApiRequest):
|
|
""" Miscellaneous HTTP requests
|
|
|
|
Inheritance:
|
|
ApiRequest (object): parent class containing general properties and
|
|
methods for requesting REST API's
|
|
"""
|
|
def __init__(self, log, timeout, debug, verify=None):
|
|
"""Initialize the MiscRequest object
|
|
|
|
Args:
|
|
log (logger object): an already initialized logger object
|
|
timeout (int): requests module timeout used in ApiRequest.query
|
|
method
|
|
debug (bool): enable/disable debugging in requests module
|
|
verify(bool): SSL verification is turned on by default
|
|
and can be turned off using this method.
|
|
"""
|
|
super().__init__(
|
|
log, "", "", # Set user and token to empty string
|
|
"", "", # Set base_url and path to empty string
|
|
timeout, debug, verify
|
|
)
|
|
|
|
def federation_uri_well_known(self, base_url):
|
|
"""Retrieve the URI to the Server-Server (Federation) API port via the
|
|
.well-known resource of a Matrix server/domain.
|
|
|
|
Args:
|
|
base_url: proto://name or proto://fqdn
|
|
|
|
Returns:
|
|
string: https://fqdn:port of the delegated server for Server-Server
|
|
communication between Matrix homeservers or None on errors.
|
|
"""
|
|
resp = self.query(
|
|
"get", ".well-known/matrix/server",
|
|
base_url_override=base_url,
|
|
)
|
|
if resp is not None:
|
|
if ":" in resp["m.server"]:
|
|
return "https://" + resp["m.server"]
|
|
else:
|
|
return "https://" + resp["m.server"] + ":8448"
|
|
self.log.error(".well-known/matrix/server could not be fetched.")
|
|
return None
|
|
|
|
|
|
class Matrix(ApiRequest):
|
|
""" Matrix API client
|
|
|
|
Inheritance:
|
|
ApiRequest (object): parent class containing general properties and
|
|
methods for requesting REST API's
|
|
"""
|
|
def __init__(self, log, user, token, base_url, matrix_path,
|
|
timeout, debug, verify):
|
|
"""Initialize the Matrix API object
|
|
|
|
Args:
|
|
log (logger object): an already initialized logger object
|
|
user (string): the user with access to the Matrix API (currently
|
|
unused); This can either be the fully qualified Matrix user ID,
|
|
or just the localpart of the user ID.
|
|
token (string): the Matrix API user's token
|
|
base_url (string): URI e.g https://fqdn:port
|
|
path (string): the path to the API endpoint; it's put after
|
|
base_url and forms the basis for all API endpoint paths
|
|
timeout (int): requests module timeout used in ApiRequest.query
|
|
method
|
|
debug (bool): enable/disable debugging in requests module
|
|
verify(bool): SSL verification is turned on by default
|
|
and can be turned off using this method.
|
|
"""
|
|
super().__init__(
|
|
log, user, token,
|
|
base_url, matrix_path,
|
|
timeout, debug, verify
|
|
)
|
|
self.user = user
|
|
|
|
def user_login(self, user_id, password):
|
|
"""Login as a Matrix user and retrieve an access token
|
|
|
|
Args:
|
|
user_id (string): a fully qualified Matrix user ID
|
|
password (string): the Matrix user's password
|
|
|
|
Returns:
|
|
string: JSON string containing a token suitable to access the
|
|
Matrix API on the user's behalf, a device_id and some more
|
|
details on Matrix server and user.
|
|
"""
|
|
return self.query("post", "client/r0/login", data={
|
|
"password": password,
|
|
"type": "m.login.password",
|
|
"user": f"{user_id}",
|
|
"initial_device_display_name": "synadm matrix login command"
|
|
})
|
|
|
|
def room_get_id(self, room_alias):
|
|
""" Get the room ID for a given room alias
|
|
|
|
Args:
|
|
room_alias (string): A Matrix room alias (#name:example.org)
|
|
|
|
Returns:
|
|
string, dict or None: A dict containing the room ID for the alias.
|
|
If room_id is missing in the response we return the whole
|
|
response as it might contain Synapse's error message.
|
|
"""
|
|
room_directory = self.query(
|
|
"get", "client/r0/directory/room/{room_alias}",
|
|
room_alias=room_alias
|
|
)
|
|
if "room_id" in room_directory:
|
|
return room_directory["room_id"]
|
|
else:
|
|
return room_directory # might contain useful error message
|
|
|
|
def room_get_aliases(self, room_id):
|
|
""" Get a list of room aliases for a given room ID
|
|
|
|
Args:
|
|
room_id (string): A Matrix room ID (!abc123:example.org)
|
|
|
|
Returns:
|
|
dict or None: A dict containing a list of room aliases, Synapse's
|
|
error message or None on exceptions.
|
|
"""
|
|
return self.query(
|
|
"get", "client/r0/rooms/{room_id}/aliases",
|
|
room_id=room_id
|
|
)
|
|
|
|
def raw_request(self, endpoint, method, data, token=None):
|
|
data_dict = {}
|
|
if method != "get":
|
|
self.log.debug("The data we are trying to parse and submit:")
|
|
self.log.debug(data)
|
|
try: # user provided json might be crap
|
|
data_dict = json.loads(data)
|
|
except Exception as error:
|
|
self.log.error("loading data: %s: %s",
|
|
type(error).__name__, error)
|
|
return None
|
|
|
|
return self.query(method, endpoint, data=data_dict, token=token)
|
|
|
|
def server_name_keys_api(self, server_server_uri):
|
|
"""Retrieve the Matrix server's own homeserver name via the
|
|
Server-Server (Federation) API.
|
|
|
|
Args:
|
|
server_server_uri (string): proto://name:port or proto://fqdn:port
|
|
|
|
Returns:
|
|
string: The Matrix server's homeserver name or FQDN, usually
|
|
something like matrix.DOMAIN or DOMAIN
|
|
"""
|
|
resp = self.query(
|
|
"get", "key/v2/server", base_url_override=server_server_uri
|
|
)
|
|
if not resp or not resp.get("server_name"):
|
|
self.log.error("The homeserver name could not be fetched via the "
|
|
"federation API key/v2/server.")
|
|
return None
|
|
return resp['server_name']
|
|
|
|
|
|
class SynapseAdmin(ApiRequest):
|
|
"""Synapse Admin API client
|
|
|
|
Inheritance:
|
|
ApiRequest (object): parent class containing general properties and
|
|
methods for requesting REST API's
|
|
"""
|
|
def __init__(self, log, user, token, base_url, admin_path, timeout, debug,
|
|
verify):
|
|
"""Initialize the SynapseAdmin object
|
|
|
|
Args:
|
|
log (logger object): An already initialized logger object
|
|
user (string): An admin-enabled Synapse user (currently unused).
|
|
This can either be the fully qualified Matrix user ID,
|
|
or just the localpart of the user ID. FIXME is that true?
|
|
token (string): The admin user's token
|
|
base_url (string): URI e.g https://fqdn:port
|
|
path (string): The path to the API endpoint; it's put after
|
|
base_url and the basis for all API endpoint paths
|
|
timeout (int): Requests module timeout used in ApiRequest.query
|
|
method
|
|
debug (bool): enable/disable debugging in requests module
|
|
verify(bool): SSL verification is turned on by default
|
|
and can be turned off using this argument.
|
|
"""
|
|
super().__init__(
|
|
log, user, token,
|
|
base_url, admin_path,
|
|
timeout, debug, verify
|
|
)
|
|
self.user = user
|
|
|
|
def user_list(self, _from, _limit, _guests, _deactivated,
|
|
_name, _user_id, _admin=None):
|
|
"""List and search users
|
|
|
|
Args:
|
|
_from (int): offsets user list by this number, used for pagination
|
|
_limit (int): maximum number of users returned, used for pagination
|
|
_guests (bool): enable/disable fetching of guest users
|
|
_deactivated (bool): enable/disable fetching of deactivated users
|
|
_name (string): user name localpart to search for, see Synapse
|
|
Admin API docs for details
|
|
_user_id (string): fully qualified Matrix user ID to search for
|
|
_admin (bool or None): whether to filter for admins. a None
|
|
does not filter.
|
|
|
|
Returns:
|
|
string: JSON string containing the found users
|
|
"""
|
|
params = {
|
|
"from": _from,
|
|
"limit": _limit,
|
|
"guests": (str(_guests).lower() if isinstance(_guests, bool)
|
|
else None),
|
|
"deactivated": "true" if _deactivated else None,
|
|
"name": _name,
|
|
"user_id": _user_id
|
|
}
|
|
if _admin is not None:
|
|
params["admins"] = str(_admin).lower()
|
|
return self.query("get", "v2/users", params=params)
|
|
|
|
def user_list_paginate(self, _limit, _guests, _deactivated,
|
|
_name, _user_id, _from="0", admin=None):
|
|
# documentation is mostly duplicated from user_list...
|
|
"""Yields API responses for all of the pagination.
|
|
|
|
Args:
|
|
_limit (int): Maximum number of users returned, used for
|
|
pagination.
|
|
_guests (bool): Enable/disable fetching of guest users.
|
|
_deactivated (bool): Enable/disable fetching of deactivated
|
|
users.
|
|
_name (string): User name localpart to search for, see Synapse
|
|
Admin API docs for details.
|
|
_user_id (string): Fully qualified Matrix user ID to search for.
|
|
_from (string): Offsets user list by this number, used for
|
|
pagination.
|
|
|
|
Yields:
|
|
dict: The Admin API response for listing accounts.
|
|
https://element-hq.github.io/synapse/latest/admin_api/user_admin_api.html#list-accounts
|
|
"""
|
|
while _from is not None:
|
|
response = self.user_list(_from, _limit, _guests, _deactivated,
|
|
_name, _user_id, admin)
|
|
yield response
|
|
_from = response.get("next_token", None)
|
|
|
|
def user_membership(self, user_id, return_aliases, matrix_api):
|
|
"""Get a list of rooms the given user is member of
|
|
|
|
Args:
|
|
user_id (string): Fully qualified Matrix user ID
|
|
room_aliases (bool): Return human readable room aliases instead of
|
|
room ID's if applicable.
|
|
matrix_api (object): An initialized Matrix object needs to be
|
|
passes as we need some Matrix API functionality here.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
|
|
rooms = self.query("get", "v1/users/{user_id}/joined_rooms",
|
|
user_id=user_id)
|
|
# Translate room ID's into aliases if requested.
|
|
if return_aliases and rooms is not None and "joined_rooms" in rooms:
|
|
for i, room_id in enumerate(rooms["joined_rooms"]):
|
|
aliases = matrix_api.room_get_aliases(room_id)
|
|
if aliases["aliases"] != []:
|
|
rooms["joined_rooms"][i] = " ".join(aliases["aliases"])
|
|
return rooms
|
|
|
|
def user_deactivate(self, user_id, gdpr_erase):
|
|
"""Delete a given user
|
|
|
|
Args:
|
|
user_id (string): fully qualified Matrix user ID
|
|
gdpr_erase (bool): enable/disable gdpr-erasing the user, see
|
|
Synapse Admin API docs for details.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
return self.query("post", "v1/deactivate/{user_id}", data={
|
|
"erase": gdpr_erase
|
|
}, user_id=user_id)
|
|
|
|
def user_password(self, user_id, password, no_logout):
|
|
"""Set the user password, and log the user out if requested
|
|
|
|
Args:
|
|
user_id (string): fully qualified Matrix user ID
|
|
password (string): new password that should be set
|
|
no_logout (bool): the API defaults to logging out the user after
|
|
password reset via the Admin API, this option can be used to
|
|
disable this behaviour.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
data = {"new_password": password}
|
|
if no_logout:
|
|
data.update({"logout_devices": False})
|
|
return self.query("post", "v1/reset_password/{user_id}", data=data,
|
|
user_id=user_id)
|
|
|
|
def user_details(self, user_id):
|
|
"""Get information about a given user
|
|
|
|
Note that the Admin API docs describe this function as "Query User
|
|
Account".
|
|
|
|
Args:
|
|
user_id (string): fully qualified Matrix user ID
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
return self.query("get", "v2/users/{user_id}", user_id=user_id)
|
|
|
|
def user_login(self, user_id, expire_days, expire, _expire_ts):
|
|
"""Get an access token that can be used to authenticate as that user.
|
|
|
|
If one of the args expire_days, expire or _expire_ts is set, the
|
|
valid_until_ms field will be sent to the API endpoint. If this is not
|
|
the case the default of the API would be used. At the time of writing,
|
|
this would be that tokens never expire.
|
|
|
|
Note: If this method is called by the CLI frontend code
|
|
(synadm.cli.user.user_login_cmd), a default expiry date of 1 day (24h)
|
|
is passed.
|
|
|
|
Args:
|
|
user_id (string): fully qualified Matrix user ID
|
|
expire_days (int): token should expire after this number of days
|
|
expire (datetime): token should expire after this date/time - a
|
|
datetime object (e.g. as generated by Click.DateTime())
|
|
_expire_ts (int): token should expire after this date/time - a
|
|
unix timestamp in ms.
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
expire_ts = None
|
|
if expire_days:
|
|
self.log.debug("Received expire_days: %s", expire_days)
|
|
expire_ts = self._timestamp_from_days_ahead(expire_days)
|
|
elif expire:
|
|
self.log.debug("Received expire: %s", expire)
|
|
expire_ts = self._timestamp_from_datetime(expire)
|
|
elif _expire_ts:
|
|
self.log.debug("Received expire_ts: %s",
|
|
_expire_ts)
|
|
expire_ts = _expire_ts # Click checks for int already
|
|
|
|
data = {}
|
|
if expire_ts is not None:
|
|
data.update({
|
|
"valid_until_ms": expire_ts,
|
|
})
|
|
self.log.info("Token expiry date set to timestamp: %d,",
|
|
expire_ts)
|
|
self.log.info("which is the date/time: %s",
|
|
self._datetime_from_timestamp(expire_ts))
|
|
else:
|
|
self.log.info("Token will never expire.")
|
|
|
|
return self.query("post", "v1/users/{user_id}/login", data=data,
|
|
user_id=user_id)
|
|
|
|
def user_modify(self, user_id, password, display_name, threepid,
|
|
avatar_url, admin, deactivation, user_type, lock):
|
|
""" Create or update information about a given user
|
|
|
|
The threepid argument must be passed as a tuple in a tuple (which is
|
|
what we usually get from a Click multi-arg option)
|
|
"""
|
|
data = {}
|
|
if password:
|
|
data.update({"password": password})
|
|
if display_name:
|
|
data.update({"displayname": display_name})
|
|
if threepid:
|
|
if threepid == (('', ''),): # empty strings clear all threepids
|
|
data.update({"threepids": []})
|
|
else:
|
|
data.update({"threepids": [
|
|
{"medium": m, "address": a} for m, a in threepid
|
|
]})
|
|
if avatar_url:
|
|
data.update({"avatar_url": avatar_url})
|
|
if admin is not None:
|
|
data.update({"admin": admin})
|
|
if lock is not None:
|
|
data.update({"locked": lock})
|
|
if deactivation == "deactivate":
|
|
data.update({"deactivated": True})
|
|
if deactivation == "activate":
|
|
data.update({"deactivated": False})
|
|
if user_type:
|
|
data.update({"user_type": None if user_type == 'null' else
|
|
user_type})
|
|
return self.query("put", "v2/users/{user_id}", data=data,
|
|
user_id=user_id)
|
|
|
|
def user_whois(self, user_id):
|
|
""" Return information about the active sessions for a specific user
|
|
"""
|
|
return self.query("get", "v1/whois/{user_id}", user_id=user_id)
|
|
|
|
def user_devices(self, user_id):
|
|
""" Return information about all devices for a specific user.
|
|
|
|
Args:
|
|
user_id (string): Fully qualified Matrix user ID.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
return self.query("get", "v2/users/{user_id}/devices",
|
|
user_id=user_id)
|
|
|
|
def user_devices_get_todelete(self, devices_data, min_days, min_surviving,
|
|
device_id, readable_seen):
|
|
""" Gather a list of devices that possibly could be deleted.
|
|
|
|
This method is used by the 'user prune-devices' command.
|
|
|
|
Args:
|
|
devices_data (list): Containing dicts of all the user's devices, as
|
|
returned by the user_devices method (the user/devices API
|
|
endpoint).
|
|
min_days (int): At least this number of days need to have passed
|
|
from the last time a device was seen for it to be deleted.
|
|
A reasonable default should be sent by the CLI level method.
|
|
min_surviving: At least this amount of devices will be kept alive.
|
|
A reasonable default should be sent by the CLI level method.
|
|
device_id: Only search devices with this ID.
|
|
datetime: When True, 'last seen timestamp' is replaced with a human
|
|
readable format.
|
|
|
|
Returns:
|
|
list: Containing dicts of devices that possibly could be deleted.
|
|
If non apply, an empty list is returned.
|
|
"""
|
|
def _log_kept_min_days(seen, min_days_ts):
|
|
self.log.debug("Keeping device, since it's been used recently:")
|
|
self.log.debug("Last seen: {} / {}".format(
|
|
seen, self._datetime_from_timestamp(
|
|
seen, as_str=True))
|
|
)
|
|
self.log.debug("Delete threshold: {} / {}".format(
|
|
min_days_ts, self._datetime_from_timestamp(
|
|
min_days_ts, as_str=True))
|
|
)
|
|
|
|
devices_todelete = []
|
|
devices_count = devices_data.get("total", 0)
|
|
if devices_count <= min_surviving:
|
|
# Nothing to do but return empty list anyway. Makes sure
|
|
# checks of callers stay valid (eg. len()).
|
|
return devices_todelete
|
|
|
|
devices = devices_data.get("devices", [])
|
|
devices.sort(key=lambda k: k["last_seen_ts"] or 0)
|
|
for device in devices:
|
|
if devices_count-len(devices_todelete) <= min_surviving:
|
|
self.log.debug("Keeping device, since min_surviving threshold "
|
|
"is reached.")
|
|
break
|
|
if device_id:
|
|
if device.get("device_id", None) == device_id:
|
|
# Found device in question. Make last_seen_ts human
|
|
# readable (if requested) and add to deletion list.
|
|
if readable_seen:
|
|
device["last_seen_ts"] = self._datetime_from_timestamp(
|
|
device.get("last_seen_ts", None), as_str=True)
|
|
devices_todelete.append(device)
|
|
break
|
|
else:
|
|
# Continue looking for the device in question.
|
|
continue
|
|
if min_days:
|
|
seen = device.get("last_seen_ts", None) # Get ts or None
|
|
# A device with "null" as last seen was either seen a very long
|
|
# time ago _or_ was created through the matrix API (e.g. via
|
|
# `synadm matrix login`).
|
|
if seen:
|
|
min_days_ts = self._timestamp_from_days_ago(min_days)
|
|
if seen > min_days_ts:
|
|
# Device was seen recently enough, keep it!
|
|
_log_kept_min_days(seen, min_days_ts)
|
|
continue
|
|
# Make seen human readable if requested.
|
|
if readable_seen:
|
|
device["last_seen_ts"] = self._datetime_from_timestamp(
|
|
seen, as_str=True)
|
|
# Finally add to devices deletion list.
|
|
devices_todelete.append(device)
|
|
return devices_todelete
|
|
|
|
def user_devices_delete(self, user_id, devices):
|
|
""" Delete the specified devices for a specific user.
|
|
Returns an empty JSON dict.
|
|
|
|
devices is a list of device IDs
|
|
"""
|
|
return self.query("post", "v2/users/{user_id}/delete_devices",
|
|
data={"devices": devices}, user_id=user_id)
|
|
|
|
def user_auth_provider_search(self, provider, external_id):
|
|
""" Finds a user based on their ID (external id) in auth provider
|
|
represented by auth provider id (provider).
|
|
"""
|
|
return self.query("get",
|
|
"v1/auth_providers/{provider}/users/{external_id}",
|
|
provider=provider, external_id=external_id)
|
|
|
|
def user_3pid_search(self, medium, address):
|
|
""" Finds a user based on their Third Party ID by specifying what kind
|
|
of 3PID it is as medium.
|
|
"""
|
|
return self.query("get", "v1/threepid/{medium}/users/{address}",
|
|
address=address, medium=medium)
|
|
|
|
def room_join(self, room_id_or_alias, user_id):
|
|
""" Allow an administrator to join an user account with a given user_id
|
|
to a room with a given room_id_or_alias
|
|
"""
|
|
data = {"user_id": user_id}
|
|
return self.query("post", "v1/join/{room_id_or_alias}", data=data,
|
|
room_id_or_alias=room_id_or_alias)
|
|
|
|
def room_list(self, _from, limit, name, order_by, reverse):
|
|
""" List and search rooms
|
|
"""
|
|
return self.query("get", "v1/rooms", params={
|
|
"from": _from,
|
|
"limit": limit,
|
|
"search_term": name,
|
|
"order_by": order_by,
|
|
"dir": "b" if reverse else None
|
|
})
|
|
|
|
def room_list_paginate(self, limit, name, order_by, reverse, _from=0):
|
|
""" Yields API responses for room listing.
|
|
|
|
Args:
|
|
limit (int): Maximum number of rooms returned per pagination.
|
|
name (string or None): Search for a room by name. Passed as
|
|
`search_term` in the room list API. Use Python None to avoid
|
|
searching.
|
|
order_by (string): Synapse Room list API specific argument.
|
|
reverse (bool): Whether the results should be
|
|
_from (int): Initial offset in pagination.
|
|
|
|
Yields:
|
|
dict: The Admin API response for listing accounts.
|
|
https://element-hq.github.io/synapse/latest/admin_api/rooms.html#list-room-api
|
|
"""
|
|
while _from is not None:
|
|
response = self.query("get", "v1/rooms", params={
|
|
"from": _from,
|
|
"limit": limit,
|
|
"search_term": name,
|
|
"order_by": order_by,
|
|
"dir": "b" if reverse else None
|
|
})
|
|
yield response
|
|
_from = response.get("next_batch", None)
|
|
self.log.debug(f"room_list_paginate: next from value = {_from}")
|
|
|
|
def room_details(self, room_id):
|
|
""" Get details about a room
|
|
"""
|
|
return self.query("get", "v1/rooms/{room_id}", room_id=room_id)
|
|
|
|
def room_members(self, room_id):
|
|
""" Get a list of room members
|
|
"""
|
|
return self.query("get", "v1/rooms/{room_id}/members", room_id=room_id)
|
|
|
|
def room_state(self, room_id):
|
|
""" Get a list of all state events in a room.
|
|
|
|
Args:
|
|
room_id (string)
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
return self.query("get", "v1/rooms/{room_id}/state", room_id=room_id)
|
|
|
|
def room_power_levels(self, from_, limit, name, order_by, reverse,
|
|
room_id=None, all_details=True,
|
|
output_format="json"):
|
|
""" Get a list of configured power_levels in all rooms.
|
|
|
|
or a single room.
|
|
|
|
Args:
|
|
room_id (string): If left out, all rooms are fetched.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
if room_id:
|
|
# We use the "name search" possibility of the room list API to get
|
|
# a single room via it's ID.
|
|
rooms = self.room_list(from_, limit, room_id, order_by, reverse)
|
|
else:
|
|
rooms = self.room_list(from_, limit, name, order_by, reverse)
|
|
|
|
rooms_w_power_count = 0
|
|
for i, room in enumerate(rooms["rooms"]):
|
|
rooms["rooms"][i]["power_levels"] = {}
|
|
state = self.room_state(room["room_id"])
|
|
for s in state["state"]:
|
|
if s["type"] == "m.room.power_levels":
|
|
if output_format == "human":
|
|
levels_list = [
|
|
f"{u} {l}" for u, l in s["content"]["users"].items() # noqa: E501
|
|
]
|
|
rooms["rooms"][i][
|
|
"power_levels"
|
|
] = "\n".join(levels_list)
|
|
else:
|
|
rooms["rooms"][i][
|
|
"power_levels"
|
|
] = s["content"]["users"]
|
|
rooms_w_power_count += 1
|
|
if not all_details:
|
|
for del_item in ["creator", "encryption", "federatable",
|
|
"guest_access", "history_visibility",
|
|
"join_rules", "joined_local_members",
|
|
"joined_members", "public", "state_events",
|
|
"version"]:
|
|
del (rooms["rooms"][i][del_item])
|
|
|
|
rooms["rooms_w_power_levels_curr_batch"] = rooms_w_power_count
|
|
return rooms
|
|
|
|
def room_delete(self, room_id, new_room_user_id, room_name, message,
|
|
block, no_purge, force_purge):
|
|
""" Delete a room and purge it if requested
|
|
"""
|
|
data = {
|
|
"block": block, # data with proper defaults from cli
|
|
"purge": not bool(no_purge) # should go here
|
|
}
|
|
# everything else is optional and shouldn't even exist in post body
|
|
if new_room_user_id:
|
|
data.update({"new_room_user_id": new_room_user_id})
|
|
if room_name:
|
|
data.update({"room_name": room_name})
|
|
if message:
|
|
data.update({"message": message})
|
|
if force_purge:
|
|
data.update({"force_purge": force_purge})
|
|
return self.query("delete", "v1/rooms/{room_id}", data=data,
|
|
room_id=room_id)
|
|
|
|
def room_delete_v2(self, room_id, new_room_user_id, room_name, message,
|
|
block, purge, force_purge):
|
|
""" Delete a room asynchronously and purge it if requested
|
|
"""
|
|
data = {
|
|
"block": block, # data with proper defaults from cli
|
|
"purge": purge
|
|
}
|
|
# everything else is optional and shouldn't even exist in post body
|
|
if new_room_user_id:
|
|
data.update({"new_room_user_id": new_room_user_id})
|
|
if room_name:
|
|
data.update({"room_name": room_name})
|
|
if message:
|
|
data.update({"message": message})
|
|
if force_purge:
|
|
data.update({"force_purge": force_purge})
|
|
return self.query("delete", "v2/rooms/{room_id}", data=data,
|
|
room_id=room_id)
|
|
|
|
def room_delete_v2_status_by_room_id(self, room_id):
|
|
return self.query("get", "v2/rooms/{room_id}/delete_status",
|
|
room_id=room_id)
|
|
|
|
def room_delete_v2_status_by_delete_id(self, delete_id):
|
|
return self.query("get", "v2/rooms/delete_status/{delete_id}",
|
|
delete_id=delete_id)
|
|
|
|
def block_room(self, room_id, block):
|
|
""" Block or unblock a room.
|
|
|
|
Args:
|
|
room_id (string): Required.
|
|
block (boolean): Whether to block or unblock a room.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occurred. See Synapse Admin API docs for details.
|
|
"""
|
|
# TODO prevent usage on versions before 1.48
|
|
data = {
|
|
"block": block
|
|
}
|
|
return self.query("put", "v1/rooms/{room_id}/block", data=data,
|
|
room_id=room_id)
|
|
|
|
def room_block_status(self, room_id):
|
|
""" Returns if the room is blocked or not, and who blocked it.
|
|
|
|
Args:
|
|
room_id (string): Fully qualified Matrix room ID.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
"""
|
|
# TODO prevent usage on versions before 1.48
|
|
return self.query("get", "v1/rooms/{room_id}/block", room_id=room_id)
|
|
|
|
def room_make_admin(self, room_id, user_id):
|
|
""" Grant a user room admin permission. If the user is not in the room,
|
|
and it is not publicly joinable, then invite the user.
|
|
"""
|
|
data = {}
|
|
if user_id:
|
|
data.update({"user_id": user_id})
|
|
return self.query("post", "v1/rooms/{room_id}/make_room_admin",
|
|
data=data, room_id=room_id)
|
|
|
|
def room_media_list(self, room_id):
|
|
""" Get a list of known media in an (unencrypted) room.
|
|
"""
|
|
return self.query("get", "v1/room/{room_id}/media", room_id=room_id)
|
|
|
|
def media_quarantine(self, server_name, media_id):
|
|
""" Quarantine a single piece of local or remote media
|
|
"""
|
|
return self.query(
|
|
"post", "v1/media/quarantine/{server_name}/{media_id}", data={},
|
|
server_name=server_name, media_id=media_id
|
|
)
|
|
|
|
def media_unquarantine(self, server_name, media_id):
|
|
""" Removes a single piece of local or remote media from quarantine.
|
|
"""
|
|
return self.query(
|
|
"post", "v1/media/unquarantine/{server_name}/{media_id}", data={},
|
|
server_name=server_name, media_id=media_id
|
|
)
|
|
|
|
def room_media_quarantine(self, room_id):
|
|
""" Quarantine all local and remote media in a room
|
|
"""
|
|
return self.query(
|
|
"post", "v1/room/{room_id}/media/quarantine", data={},
|
|
room_id=room_id
|
|
)
|
|
|
|
def user_media_quarantine(self, user_id):
|
|
""" Quarantine all local and remote media of a user
|
|
"""
|
|
return self.query(
|
|
"post", "v1/user/{user_id}/media/quarantine", data={},
|
|
user_id=user_id
|
|
)
|
|
|
|
def user_media(self, user_id, _from, limit, order_by, reverse, readable):
|
|
""" Get a user's uploaded media
|
|
"""
|
|
result = self.query("get", "v1/users/{user_id}/media", params={
|
|
"from": _from,
|
|
"limit": limit,
|
|
"order_by": order_by,
|
|
"dir": "b" if reverse else None
|
|
}, user_id=user_id)
|
|
if (readable and result is not None and "media" in result):
|
|
for i, media in enumerate(result["media"]):
|
|
created = media["created_ts"]
|
|
last_access = media["last_access_ts"]
|
|
if created is not None:
|
|
result["media"][i][
|
|
"created_ts"
|
|
] = self._datetime_from_timestamp(created, as_str=True)
|
|
if last_access is not None:
|
|
result["media"][i][
|
|
"last_access_ts"
|
|
] = self._datetime_from_timestamp(last_access, as_str=True)
|
|
return result
|
|
|
|
def media_delete(self, server_name, media_id):
|
|
""" Delete a specific (local) media_id
|
|
"""
|
|
return self.query(
|
|
"delete", "v1/media/{server_name}/{media_id}", data={},
|
|
server_name=server_name, media_id=media_id
|
|
)
|
|
|
|
def media_delete_by_date_or_size(self, before_days, before, _before_ts,
|
|
_size_gt, delete_profiles):
|
|
""" Delete local media by date and/or size FIXME and/or?
|
|
"""
|
|
if before_days:
|
|
self.log.debug("Received --before-days: %s", before_days)
|
|
before_ts = self._timestamp_from_days_ago(before_days)
|
|
elif before:
|
|
self.log.debug("Received --before: %s", before)
|
|
before_ts = self._timestamp_from_datetime(before)
|
|
elif _before_ts is not None: # Could be 0 if it's an older server ;-)
|
|
self.log.debug("Received --before-ts: %s",
|
|
_before_ts)
|
|
before_ts = _before_ts # Click checks for int already
|
|
else:
|
|
self.log.debug("Something wrong in click FIXME")
|
|
|
|
self.log.info("Deleting local media older than timestamp: %d,",
|
|
before_ts)
|
|
self.log.info("which is the date: %s",
|
|
self._datetime_from_timestamp(before_ts))
|
|
params = {
|
|
}
|
|
if before_ts is not None:
|
|
params.update({
|
|
"before_ts": before_ts,
|
|
})
|
|
if _size_gt:
|
|
size_gt = _size_gt * 1024
|
|
self.log.info("Deleting local media greater than %d bytes,",
|
|
size_gt)
|
|
params.update({
|
|
"size_gt": size_gt
|
|
})
|
|
if delete_profiles:
|
|
params.update({
|
|
"keep_profiles": "false"
|
|
})
|
|
return self.query(
|
|
"post", "v1/media/delete", data={}, params=params
|
|
)
|
|
|
|
def media_protect(self, media_id):
|
|
""" Protect a single piece of local or remote media
|
|
|
|
from being quarantined
|
|
"""
|
|
return self.query(
|
|
"post", "v1/media/protect/{media_id}", data={}, media_id=media_id
|
|
)
|
|
|
|
def purge_media_cache(self, before_days, before, _before_ts):
|
|
""" Purge old cached remote media
|
|
"""
|
|
if before_days:
|
|
self.log.debug("Received --before-days: %s", before_days)
|
|
before_ts = self._timestamp_from_days_ago(before_days)
|
|
if before:
|
|
self.log.debug("Received --before: %s", before)
|
|
before_ts = self._timestamp_from_datetime(before)
|
|
if _before_ts:
|
|
self.log.debug("Received --before-ts: %s",
|
|
_before_ts)
|
|
before_ts = _before_ts # Click checks for int already
|
|
|
|
self.log.info("Purging cached remote media older than timestamp: %d,",
|
|
before_ts)
|
|
self.log.info("which is the date: %s",
|
|
self._datetime_from_timestamp(before_ts))
|
|
|
|
return self.query(
|
|
"post", "v1/purge_media_cache", data={}, params={
|
|
"before_ts": str(before_ts)
|
|
}
|
|
)
|
|
|
|
def version(self):
|
|
""" Get the server version
|
|
"""
|
|
return self.query("get", "v1/server_version")
|
|
|
|
def group_delete(self, group_id):
|
|
""" Delete a local group (community)
|
|
"""
|
|
return self.query("post", "v1/delete_group/{group_id}",
|
|
group_id=group_id)
|
|
|
|
def purge_history(self, room_id, before_event_id, before_days, before,
|
|
_before_ts, delete_local):
|
|
""" Purge room history
|
|
"""
|
|
before_ts = None
|
|
if before_days:
|
|
self.log.debug("Received --before-days: %s", before_days)
|
|
before_ts = self._timestamp_from_days_ago(before_days)
|
|
elif before:
|
|
self.log.debug("Received --before: %s", before)
|
|
before_ts = self._timestamp_from_datetime(before)
|
|
elif _before_ts:
|
|
self.log.debug("Received --before-ts: %s",
|
|
_before_ts)
|
|
before_ts = _before_ts # Click checks for int already
|
|
elif before_event_id:
|
|
self.log.debug("Received --event-id: %s",
|
|
before_event_id)
|
|
|
|
data = {}
|
|
if before_ts is not None:
|
|
data.update({
|
|
"purge_up_to_ts": before_ts,
|
|
})
|
|
self.log.info("Purging history older than timestamp: %d,",
|
|
before_ts)
|
|
self.log.info("which is the date/time: %s",
|
|
self._datetime_from_timestamp(before_ts))
|
|
elif before_event_id:
|
|
data.update({
|
|
"purge_up_to_event_id": before_event_id,
|
|
})
|
|
|
|
if delete_local:
|
|
data.update({
|
|
"delete_local_events": True,
|
|
})
|
|
|
|
return self.query("post", "v1/purge_history/{room_id}", data=data,
|
|
room_id=room_id)
|
|
|
|
def purge_history_status(self, purge_id):
|
|
""" Get status of a recent history purge
|
|
|
|
The status will be one of active, complete, or failed.
|
|
"""
|
|
return self.query("get", "v1/purge_history_status/{purge_id}",
|
|
purge_id=purge_id)
|
|
|
|
def regtok_list(self, valid, readable_expiry):
|
|
""" List registration tokens
|
|
|
|
Args:
|
|
valid (bool): List only valid (if True) or invalid (if False)
|
|
tokens. Default is to list all tokens regardless of validity.
|
|
readable_expiry (bool): If True, replace the expiry_time field with
|
|
a human readable datetime. If False, expiry_time will be a unix
|
|
timestamp.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
result = self.query("get", "v1/registration_tokens", params={
|
|
"valid": (str(valid).lower() if isinstance(valid, bool) else None)
|
|
})
|
|
|
|
# Change expiry_time to a human readable format if requested
|
|
if (
|
|
readable_expiry
|
|
and result is not None
|
|
and "registration_tokens" in result
|
|
):
|
|
for i, regtok in enumerate(result["registration_tokens"]):
|
|
expiry_time = regtok["expiry_time"]
|
|
if expiry_time is not None:
|
|
result["registration_tokens"][i][
|
|
"expiry_time"
|
|
] = self._datetime_from_timestamp(expiry_time, as_str=True)
|
|
|
|
return result
|
|
|
|
def regtok_details(self, token, readable_expiry):
|
|
""" Get details about the given registration token
|
|
|
|
Args:
|
|
token (string): The registration token in question
|
|
readable_expiry (bool): If True, replace the expiry_time field with
|
|
a human readable datetime. If False, expiry_time will be a unix
|
|
timestamp.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
result = self.query("get", "v1/registration_tokens/{t}",
|
|
t=token)
|
|
|
|
# Change expiry_time to a human readable format if requested
|
|
if (
|
|
readable_expiry
|
|
and result is not None
|
|
and result.get("expiry_time") is not None
|
|
):
|
|
result["expiry_time"] = self._datetime_from_timestamp(
|
|
result["expiry_time"], as_str=True
|
|
)
|
|
|
|
return result
|
|
|
|
def regtok_new(self, token, length, uses_allowed, expiry_ts, expire_at):
|
|
""" Create a new registration token
|
|
|
|
Args:
|
|
token (string): Registration token to create. Default is randomly
|
|
generated by the server.
|
|
length (int): The length of the token to generate if the token is
|
|
not provided.
|
|
uses_allowed (int): The number of times the token can be used to
|
|
complete a registration before it becomes invalid.
|
|
expiry_ts (int): The latest time the registration token is valid.
|
|
Given as the number of milliseconds since
|
|
1970-01-01 00:00:00 UTC.
|
|
expire_at (click.DateTime): The latest time the registration token
|
|
is valid.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
data = {
|
|
"length": length,
|
|
"uses_allowed": uses_allowed,
|
|
}
|
|
|
|
if expiry_ts:
|
|
self.log.debug(f"Received --expiry-ts: {expiry_ts}")
|
|
data["expiry_time"] = expiry_ts
|
|
elif expire_at:
|
|
self.log.debug(f"Received --expire-at: {expire_at}")
|
|
data["expiry_time"] = self._timestamp_from_datetime(expire_at)
|
|
else:
|
|
data["expiry_time"] = None
|
|
|
|
# The token cannot be null, it must be a string
|
|
if isinstance(token, str):
|
|
data["token"] = token
|
|
|
|
return self.query("post", "v1/registration_tokens/new", data=data)
|
|
|
|
def regtok_update(self, token, uses_allowed, expiry_ts, expire_at):
|
|
""" Update a registration token
|
|
|
|
Args:
|
|
token (string): Registration token to update.
|
|
uses_allowed (int): The number of times the token can be used to
|
|
complete a registration before it becomes invalid.
|
|
expiry_ts (int): The latest time the registration token is valid.
|
|
Given as the number of milliseconds since
|
|
1970-01-01 00:00:00 UTC. -1 indicates no expiry.
|
|
expire_at (click.DateTime): The latest time the registration token
|
|
is valid.
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
# If uses_allowed or expiry time were not provided by the user,
|
|
# do not add the corresponding parameter to the request so that
|
|
# the server will not modify its value.
|
|
data = {}
|
|
|
|
if uses_allowed == -1:
|
|
# A null value indicates unlimited uses
|
|
data["uses_allowed"] = None
|
|
elif uses_allowed is not None:
|
|
data["uses_allowed"] = uses_allowed
|
|
|
|
if expiry_ts:
|
|
self.log.debug(f"Received --expiry-ts: {expiry_ts}")
|
|
if expiry_ts == -1:
|
|
# A null value indicates no expiry
|
|
data["expiry_time"] = None
|
|
else:
|
|
data["expiry_time"] = expiry_ts
|
|
elif expire_at:
|
|
self.log.debug(f"Received --expire-at: {expire_at}")
|
|
data["expiry_time"] = self._timestamp_from_datetime(expire_at)
|
|
|
|
return self.query("put", "v1/registration_tokens/{t}", data=data,
|
|
t=token)
|
|
|
|
def regtok_delete(self, token):
|
|
""" Delete a registration token
|
|
|
|
Args:
|
|
token (string): The registration token to delete
|
|
|
|
Returns:
|
|
string: JSON string containing the Admin API's response or None if
|
|
an exception occured. See Synapse Admin API docs for details.
|
|
|
|
"""
|
|
# t because query also accepts token when we want it for the
|
|
# request
|
|
# https://github.com/JOJ0/synadm/issues/110#issuecomment-1590032158
|
|
return self.query("delete", "v1/registration_tokens/{t}",
|
|
t=token)
|
|
|
|
def user_shadow_ban(self, user_id, unban):
|
|
""" Shadow-ban or unban a user.
|
|
|
|
Args:
|
|
user_id (string): The user to be banned/unbanned.
|
|
unban (boolean): Unban the specified user.
|
|
"""
|
|
if unban:
|
|
method = "delete"
|
|
else:
|
|
method = "post"
|
|
return self.query(method, "v1/users/{user_id}/shadow_ban",
|
|
user_id=user_id)
|
|
|
|
def notice_send(self, receivers, content_plain, content_html, paginate,
|
|
regex):
|
|
""" Send server notices.
|
|
|
|
Args:
|
|
receivers (string): Target(s) of the notice. Either localpart or
|
|
regular expression matching localparts.
|
|
content_plain (string): Unformatted text of the notice.
|
|
content_html (string): HTML-formatted text of the notice.
|
|
paginate (int): Limits to how many users the notice is sent at
|
|
once. Users are fetched with the user_list method and using
|
|
its pagination capabilities.
|
|
to_regex (bool): Selects whether receivers should be interpreted as
|
|
a regular expression or a single recipient.
|
|
|
|
Returns:
|
|
list: A list of dictionaries, each containing the response of
|
|
what a single notice Admin API call returned. Usually that is
|
|
an event ID or an error. See Synapse Admin API docs for
|
|
details.
|
|
"""
|
|
data = {
|
|
"user_id": "",
|
|
"content": {
|
|
"msgtype": "m.text",
|
|
"body": content_plain,
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": content_html
|
|
}
|
|
}
|
|
|
|
# A regular expression was supplied to match receivers.
|
|
if regex:
|
|
outputs = []
|
|
response = self.user_list(0, paginate, True, False, "", "", None)
|
|
if "users" not in response:
|
|
return
|
|
while True:
|
|
for user in response["users"]:
|
|
if re.match(receivers, user["name"]):
|
|
data["user_id"] = user["name"]
|
|
outputs.append(
|
|
self.query(
|
|
"post", "v1/send_server_notice", data=data
|
|
)
|
|
)
|
|
|
|
if "next_token" not in response:
|
|
return outputs
|
|
response = self.user_list(response["next_token"],
|
|
100, True, False, "", "", None)
|
|
# Only a single user ID was supplied as receiver
|
|
else:
|
|
data["user_id"] = receivers
|
|
return [self.query("post", "v1/send_server_notice", data=data)]
|
|
|
|
def raw_request(self, endpoint, method, data):
|
|
data_dict = {}
|
|
if method != "get":
|
|
self.log.debug("The data we are trying to parse and submit:")
|
|
self.log.debug(data)
|
|
try: # user provided json might be crap
|
|
data_dict = json.loads(data)
|
|
except Exception as error:
|
|
self.log.error("loading data: %s: %s",
|
|
type(error).__name__, error)
|
|
return None
|
|
|
|
return self.query(method, endpoint, data=data_dict)
|