authentik/authentik/flows/stage.py

274 lines
11 KiB
Python

"""authentik stage Base view"""
from typing import TYPE_CHECKING
from django.contrib.auth.models import AnonymousUser
from django.http import HttpRequest
from django.http.request import QueryDict
from django.http.response import HttpResponse
from django.urls import reverse
from django.views.generic.base import View
from prometheus_client import Histogram
from rest_framework.request import Request
from sentry_sdk.hub import Hub
from structlog.stdlib import BoundLogger, get_logger
from authentik.core.models import User
from authentik.flows.challenge import (
AccessDeniedChallenge,
Challenge,
ChallengeResponse,
ChallengeTypes,
ContextualFlowInfo,
HttpChallengeResponse,
RedirectChallenge,
WithUserInfoChallenge,
)
from authentik.flows.exceptions import StageInvalidException
from authentik.flows.models import InvalidResponseAction
from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER
from authentik.lib.avatars import DEFAULT_AVATAR, get_avatar
from authentik.lib.utils.reflection import class_to_path
if TYPE_CHECKING:
from authentik.flows.views.executor import FlowExecutorView
PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier"
HIST_FLOWS_STAGE_TIME = Histogram(
"authentik_flows_stage_time",
"Duration taken by different parts of stages",
["stage_type", "method"],
)
class StageView(View):
"""Abstract Stage"""
executor: "FlowExecutorView"
request: HttpRequest = None
logger: BoundLogger
def __init__(self, executor: "FlowExecutorView", **kwargs):
self.executor = executor
current_stage = getattr(self.executor, "current_stage", None)
self.logger = get_logger().bind(
stage=getattr(current_stage, "name", None),
stage_view=class_to_path(type(self)),
)
super().__init__(**kwargs)
def get_pending_user(self, for_display=False) -> User:
"""Either show the matched User object or show what the user entered,
based on what the earlier stage (mostly IdentificationStage) set.
_USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
other things besides the form display.
If no user is pending, returns request.user"""
if not self.executor.plan:
return self.request.user
if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
return User(
username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
email="",
)
if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
return self.request.user
def cleanup(self):
"""Cleanup session"""
class ChallengeStageView(StageView):
"""Stage view which response with a challenge"""
response_class = ChallengeResponse
def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
"""Return the response class type"""
return self.response_class(None, data=data, stage=self)
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Return a challenge for the frontend to solve"""
challenge = self._get_challenge(*args, **kwargs)
if not challenge.is_valid():
self.logger.warning(
"f(ch): Invalid challenge",
errors=challenge.errors,
)
return HttpChallengeResponse(challenge)
def post(self, request: Request, *args, **kwargs) -> HttpResponse:
"""Handle challenge response"""
valid = False
try:
challenge: ChallengeResponse = self.get_response_instance(data=request.data)
valid = challenge.is_valid()
except StageInvalidException as exc:
self.logger.debug("Got StageInvalidException", exc=exc)
return self.executor.stage_invalid()
if not valid:
if self.executor.current_binding.invalid_response_action in [
InvalidResponseAction.RESTART,
InvalidResponseAction.RESTART_WITH_CONTEXT,
]:
keep_context = (
self.executor.current_binding.invalid_response_action
== InvalidResponseAction.RESTART_WITH_CONTEXT
)
self.logger.debug(
"f(ch): Invalid response, restarting flow",
keep_context=keep_context,
)
return self.executor.restart_flow(keep_context)
with (
Hub.current.start_span(
op="authentik.flow.stage.challenge_invalid",
description=self.__class__.__name__,
),
HIST_FLOWS_STAGE_TIME.labels(
stage_type=self.__class__.__name__, method="challenge_invalid"
).time(),
):
return self.challenge_invalid(challenge)
with (
Hub.current.start_span(
op="authentik.flow.stage.challenge_valid",
description=self.__class__.__name__,
),
HIST_FLOWS_STAGE_TIME.labels(
stage_type=self.__class__.__name__, method="challenge_valid"
).time(),
):
return self.challenge_valid(challenge)
def format_title(self) -> str:
"""Allow usage of placeholder in flow title."""
if not self.executor.plan:
return self.executor.flow.title
try:
return self.executor.flow.title % {
"app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
"user": self.get_pending_user(for_display=True),
}
except Exception as exc:
self.logger.warning("failed to template title", exc=exc)
return self.executor.flow.title
def _get_challenge(self, *args, **kwargs) -> Challenge:
with (
Hub.current.start_span(
op="authentik.flow.stage.get_challenge",
description=self.__class__.__name__,
),
HIST_FLOWS_STAGE_TIME.labels(
stage_type=self.__class__.__name__, method="get_challenge"
).time(),
):
try:
challenge = self.get_challenge(*args, **kwargs)
except StageInvalidException as exc:
self.logger.debug("Got StageInvalidException", exc=exc)
return self.executor.stage_invalid()
with Hub.current.start_span(
op="authentik.flow.stage._get_challenge",
description=self.__class__.__name__,
):
if not hasattr(challenge, "initial_data"):
challenge.initial_data = {}
if "flow_info" not in challenge.initial_data:
flow_info = ContextualFlowInfo(
data={
"title": self.format_title(),
"background": self.executor.flow.background_url,
"cancel_url": reverse("authentik_flows:cancel"),
"layout": self.executor.flow.layout,
}
)
flow_info.is_valid()
challenge.initial_data["flow_info"] = flow_info.data
if isinstance(challenge, WithUserInfoChallenge):
# If there's a pending user, update the `username` field
# this field is only used by password managers.
# If there's no user set, an error is raised later.
if user := self.get_pending_user(for_display=True):
challenge.initial_data["pending_user"] = user.username
challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR
if not isinstance(user, AnonymousUser):
challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request)
return challenge
def get_challenge(self, *args, **kwargs) -> Challenge:
"""Return the challenge that the client should solve"""
raise NotImplementedError
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
"""Callback when the challenge has the correct format"""
raise NotImplementedError
def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
"""Callback when the challenge has the incorrect format"""
challenge_response = self._get_challenge()
full_errors = {}
for field, errors in response.errors.items():
for error in errors:
full_errors.setdefault(field, [])
field_error = {
"string": str(error),
}
if hasattr(error, "code"):
field_error["code"] = error.code
full_errors[field].append(field_error)
challenge_response.initial_data["response_errors"] = full_errors
if not challenge_response.is_valid():
self.logger.error(
"f(ch): invalid challenge response",
errors=challenge_response.errors,
)
return HttpChallengeResponse(challenge_response)
class AccessDeniedChallengeView(ChallengeStageView):
"""Used internally by FlowExecutor's stage_invalid()"""
error_message: str | None
def __init__(self, executor: "FlowExecutorView", error_message: str | None = None, **kwargs):
super().__init__(executor, **kwargs)
self.error_message = error_message
def get_challenge(self, *args, **kwargs) -> Challenge:
return AccessDeniedChallenge(
data={
"error_message": str(self.error_message or "Unknown error"),
"type": ChallengeTypes.NATIVE.value,
"component": "ak-stage-access-denied",
}
)
# This can never be reached since this challenge is created on demand and only the
# .get() method is called
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover
return self.executor.cancel()
class RedirectStage(ChallengeStageView):
"""Redirect to any URL"""
def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
destination = getattr(
self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
)
return RedirectChallenge(
data={
"type": ChallengeTypes.REDIRECT.value,
"to": destination,
}
)
def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
return HttpChallengeResponse(self.get_challenge())