mautrix-instagram/mautrix_instagram/commands/auth.py

260 lines
9.7 KiB
Python

# mautrix-instagram - A Matrix-Instagram puppeting bridge.
# Copyright (C) 2023 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import base64
import hashlib
import hmac
import zlib
from mauigpapi.errors import (
IGBad2FACodeError,
IGChallengeError,
IGChallengeWrongCodeError,
IGLoginBadPasswordError,
IGLoginInvalidCredentialsError,
IGLoginInvalidUserError,
IGLoginTwoFactorRequiredError,
)
from mauigpapi.http import AndroidAPI
from mauigpapi.state import AndroidState
from mauigpapi.types import BaseResponseUser
from mautrix.bridge.commands import HelpSection, command_handler
from mautrix.types import EventID
from .. import user as u
from .typehint import CommandEvent
SECTION_AUTH = HelpSection("Authentication", 10, "")
async def get_login_state(user: u.User, seed: str) -> tuple[AndroidAPI, AndroidState]:
if user.command_status and user.command_status["action"] == "Login":
api: AndroidAPI = user.command_status["api"]
state: AndroidState = user.command_status["state"]
else:
state = AndroidState()
seed = hmac.new(seed.encode("utf-8"), user.mxid.encode("utf-8"), hashlib.sha256).digest()
state.device.generate(seed)
api = AndroidAPI(
state,
log=user.api_log,
proxy_handler=user.proxy_handler,
on_proxy_update=user.on_proxy_update,
on_response_error=user.on_response_error,
)
await api.get_mobile_config()
user.command_status = {
"action": "Login",
"state": state,
"api": api,
}
return api, state
@command_handler(
needs_auth=False,
management_only=True,
help_section=SECTION_AUTH,
help_text="Log into Instagram",
help_args="<_username_> <_password_>",
)
async def login(evt: CommandEvent) -> None:
if await evt.sender.is_logged_in():
await evt.reply("You're already logged in")
return
elif len(evt.args) < 2:
await evt.reply("**Usage:** `$cmdprefix+sp login <username> <password>`")
return
username = evt.args[0]
password = " ".join(evt.args[1:])
await evt.redact()
api, state = await get_login_state(evt.sender, evt.config["instagram.device_seed"])
try:
resp = await api.login(username, password)
except IGLoginTwoFactorRequiredError as e:
tfa_info = e.body.two_factor_info
msg = "Username and password accepted, but you have two-factor authentication enabled.\n"
if tfa_info.totp_two_factor_on:
msg += "Send the code from your authenticator app here."
if tfa_info.sms_two_factor_on:
msg += f" Alternatively, send `resend-sms` to get an SMS code to •••{tfa_info.obfuscated_phone_number}"
elif tfa_info.sms_two_factor_on:
msg += (
f"Send the code sent to •••{tfa_info.obfuscated_phone_number} here."
" You can also send `resend-sms` if you didn't receive the code."
)
else:
msg += (
"Unfortunately, none of your two-factor authentication methods are currently "
"supported by the bridge."
)
return
evt.sender.command_status = {
**evt.sender.command_status,
"next": enter_login_2fa,
"username": tfa_info.username,
"is_totp": tfa_info.totp_two_factor_on,
"has_sms": tfa_info.sms_two_factor_on,
"2fa_identifier": tfa_info.two_factor_identifier,
}
await evt.reply(msg)
except IGChallengeError:
await evt.reply(
"Login challenges aren't currently supported. "
"Please set up real two-factor authentication."
)
await api.challenge_auto()
evt.sender.command_status = {
**evt.sender.command_status,
"next": enter_login_security_code,
}
await evt.reply(
"Username and password accepted, but Instagram wants to verify it's really"
" you. Please confirm the login and enter the security code here."
)
except IGLoginInvalidUserError:
await evt.reply("Invalid username")
except IGLoginBadPasswordError:
await evt.reply("Incorrect password")
except IGLoginInvalidCredentialsError:
await evt.reply("Incorrect username or password")
except Exception as e:
evt.log.exception("Failed to log in")
await evt.reply(f"Failed to log in: {e}")
else:
await _post_login(evt, state, resp.logged_in_user)
async def enter_login_2fa(evt: CommandEvent) -> None:
api: AndroidAPI = evt.sender.command_status["api"]
state: AndroidState = evt.sender.command_status["state"]
identifier = evt.sender.command_status["2fa_identifier"]
username = evt.sender.command_status["username"]
is_totp = evt.sender.command_status["is_totp"]
has_sms = evt.sender.command_status["has_sms"]
code = "".join(evt.args).lower()
if has_sms and code == "resend-sms":
try:
resp = await api.send_two_factor_login_sms(username, identifier=identifier)
except Exception as e:
evt.log.exception("Failed to re-request SMS code")
await evt.reply(f"Failed to re-request SMS code: {e}")
else:
await evt.reply(
f"Re-requested SMS code to {resp.two_factor_info.obfuscated_phone_number}"
)
evt.sender.command_status[
"2fa_identifier"
] = resp.two_factor_info.two_factor_identifier
evt.sender.command_status["is_totp"] = False
return
try:
resp = await api.two_factor_login(
username, code=code, identifier=identifier, is_totp=is_totp
)
except IGBad2FACodeError:
await evt.reply(
"Invalid 2-factor authentication code. Please try again "
"or use `$cmdprefix+sp cancel` to cancel."
)
except IGChallengeError:
await api.challenge_auto(reset=True)
evt.sender.command_status = {
**evt.sender.command_status,
"next": enter_login_security_code,
}
await evt.reply(
"2-factor authentication code accepted, but Instagram wants to verify it's"
" really you. Please confirm the login and enter the security code here."
)
except Exception as e:
evt.log.exception("Failed to log in")
await evt.reply(f"Failed to log in: {e}")
evt.sender.command_status = None
else:
evt.sender.command_status = None
await _post_login(evt, state, resp.logged_in_user)
async def enter_login_security_code(evt: CommandEvent) -> None:
api: AndroidAPI = evt.sender.command_status["api"]
state: AndroidState = evt.sender.command_status["state"]
try:
resp = await api.challenge_send_security_code("".join(evt.args))
except IGChallengeWrongCodeError as e:
await evt.reply(f"Incorrect security code: {e}")
except Exception as e:
evt.log.exception("Failed to log in")
await evt.reply(f"Failed to log in: {e}")
evt.sender.command_status = None
else:
if not resp.logged_in_user:
evt.log.error(
f"Didn't get logged_in_user in challenge response "
f"after entering security code: {resp.serialize()}"
)
await evt.reply("An unknown error occurred. Please check the bridge logs.")
return
evt.sender.command_status = None
await _post_login(evt, state, resp.logged_in_user)
async def _post_login(evt: CommandEvent, state: AndroidState, user: BaseResponseUser) -> None:
evt.sender.state = state
pl = state.device.payload
manufacturer, model = pl["manufacturer"], pl["model"]
await evt.reply(
f"Successfully logged in as {user.full_name} ([@{user.username}]"
f"(https://instagram.com/{user.username}), user ID: {user.pk}).\n\n"
f"The bridge will show up on Instagram as {manufacturer} {model}."
)
await evt.sender.try_connect()
@command_handler(
needs_auth=True,
help_section=SECTION_AUTH,
help_text="Disconnect the bridge from your Instagram account",
)
async def logout(evt: CommandEvent) -> None:
await evt.sender.logout()
await evt.reply("Successfully logged out")
@command_handler(
needs_auth=False,
management_only=True,
help_section=SECTION_AUTH,
help_text="Log into Instagram with a pre-generated session blob",
help_args="<_blob_>",
)
async def login_blob(evt: CommandEvent) -> EventID:
if await evt.sender.is_logged_in():
return await evt.reply("You're already logged in")
elif len(evt.args) < 1:
return await evt.reply("**Usage:** `$cmdprefix+sp login-blob <blob>`")
await evt.redact()
try:
state = AndroidState.parse_json(zlib.decompress(base64.b64decode("".join(evt.args))))
except Exception:
evt.log.exception(f"{evt.sender} provided an invalid login blob")
return await evt.reply("Invalid blob")
evt.sender.state = state
await evt.reply("Connecting...")
await evt.sender.try_connect()
await evt.reply("Maybe connected now, try pinging?")