134 lines
5.4 KiB
Python
134 lines
5.4 KiB
Python
# mautrix-instagram - A Matrix-Instagram puppeting bridge.
|
|
# Copyright (C) 2022 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
from __future__ import annotations
|
|
|
|
from ..errors import IGChallengeWrongCodeError, IGResponseError
|
|
from ..types import ChallengeStateResponse
|
|
from .base import BaseAndroidAPI
|
|
|
|
|
|
class ChallengeAPI(BaseAndroidAPI):
|
|
@property
|
|
def __path(self) -> str:
|
|
return f"/api/v1{self.state.challenge_path}"
|
|
|
|
async def challenge_get_state(self) -> ChallengeStateResponse:
|
|
query = {
|
|
"guid": self.state.device.uuid,
|
|
"device_id": self.state.device.id,
|
|
}
|
|
if self.state.challenge_context:
|
|
query["challenge_context"] = self.state.challenge_context.json()
|
|
self.log.debug("Fetching current challenge state")
|
|
return self.__handle_resp(
|
|
await self.std_http_get(self.__path, query=query, response_type=ChallengeStateResponse)
|
|
)
|
|
|
|
async def challenge_select_method(
|
|
self, choice: str, is_replay: bool = False
|
|
) -> ChallengeStateResponse:
|
|
path = self.__path
|
|
if is_replay:
|
|
path = path.replace("/challenge/", "/challenge/replay/")
|
|
req = {
|
|
"choice": choice,
|
|
"_csrftoken": self.state.cookies.csrf_token,
|
|
"guid": self.state.device.uuid,
|
|
"device_id": self.state.device.id,
|
|
}
|
|
self.log.debug(f"Selecting challenge method {choice} (replay: {is_replay})")
|
|
return self.__handle_resp(
|
|
await self.std_http_post(path, data=req, response_type=ChallengeStateResponse)
|
|
)
|
|
|
|
async def challenge_delta_review(self, was_me: bool = True) -> ChallengeStateResponse:
|
|
return await self.challenge_select_method("0" if was_me else "1")
|
|
|
|
async def challenge_send_phone_number(self, phone_number: str) -> ChallengeStateResponse:
|
|
req = {
|
|
"phone_number": phone_number,
|
|
"_csrftoken": self.state.cookies.csrf_token,
|
|
"guid": self.state.device.uuid,
|
|
"device_id": self.state.device.id,
|
|
}
|
|
self.log.debug("Sending challenge phone number")
|
|
return self.__handle_resp(
|
|
await self.std_http_post(self.__path, data=req, response_type=ChallengeStateResponse)
|
|
)
|
|
|
|
async def challenge_send_security_code(self, code: str | int) -> ChallengeStateResponse:
|
|
req = {
|
|
"security_code": code,
|
|
"_csrftoken": self.state.cookies.csrf_token,
|
|
"guid": self.state.device.uuid,
|
|
"device_id": self.state.device.id,
|
|
}
|
|
try:
|
|
self.log.debug("Sending challenge security code")
|
|
return self.__handle_resp(
|
|
await self.std_http_post(
|
|
self.__path, data=req, response_type=ChallengeStateResponse
|
|
)
|
|
)
|
|
except IGResponseError as e:
|
|
if e.response.status == 400:
|
|
raise IGChallengeWrongCodeError((await e.response.json())["message"]) from e
|
|
raise
|
|
|
|
async def challenge_reset(self) -> ChallengeStateResponse:
|
|
req = {
|
|
"_csrftoken": self.state.cookies.csrf_token,
|
|
"guid": self.state.device.uuid,
|
|
"device_id": self.state.device.id,
|
|
}
|
|
self.log.debug("Resetting challenge")
|
|
return self.__handle_resp(
|
|
await self.std_http_post(
|
|
self.__path.replace("/challenge/", "/challenge/reset/"),
|
|
data=req,
|
|
response_type=ChallengeStateResponse,
|
|
)
|
|
)
|
|
|
|
async def challenge_auto(self, reset: bool = False) -> ChallengeStateResponse:
|
|
if reset:
|
|
await self.challenge_reset()
|
|
challenge = self.state.challenge or await self.challenge_get_state()
|
|
if challenge.step_name == "select_verify_method":
|
|
self.log.debug(
|
|
"Got select_verify_method challenge step, "
|
|
f"auto-selecting {challenge.step_data.choice}"
|
|
)
|
|
return await self.challenge_select_method(challenge.step_data.choice)
|
|
elif challenge.step_name == "delta_login_review":
|
|
self.log.debug("Got delta_login_review challenge step, auto-selecting was_me=True")
|
|
return await self.challenge_delta_review(was_me=True)
|
|
else:
|
|
self.log.debug(f"Got unknown challenge step {challenge.step_name}, not doing anything")
|
|
return challenge
|
|
|
|
def __handle_resp(self, resp: ChallengeStateResponse) -> ChallengeStateResponse:
|
|
if resp.action == "close":
|
|
self.log.debug(
|
|
f"Challenge closed (step: {resp.step_name}, has user: {bool(resp.logged_in_user)})"
|
|
)
|
|
self.state.challenge = None
|
|
self.state.challenge_context = None
|
|
self.state.challenge_path = None
|
|
else:
|
|
self.state.challenge = resp
|
|
return resp
|