151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
"""Serializer for tenants models"""
|
|
|
|
from datetime import timedelta
|
|
from hmac import compare_digest
|
|
|
|
from django.apps import apps
|
|
from django.http import HttpResponseNotFound
|
|
from django.http.request import urljoin
|
|
from django.utils.timezone import now
|
|
from drf_spectacular.utils import OpenApiResponse, extend_schema
|
|
from rest_framework import permissions
|
|
from rest_framework.authentication import get_authorization_header
|
|
from rest_framework.decorators import action
|
|
from rest_framework.fields import CharField, IntegerField
|
|
from rest_framework.filters import OrderingFilter, SearchFilter
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.serializers import DateTimeField, ModelSerializer
|
|
from rest_framework.views import View
|
|
from rest_framework.viewsets import ModelViewSet
|
|
|
|
from authentik.api.authentication import validate_auth
|
|
from authentik.core.api.utils import PassiveSerializer
|
|
from authentik.core.models import User
|
|
from authentik.lib.config import CONFIG
|
|
from authentik.recovery.lib import create_admin_group, create_recovery_token
|
|
from authentik.tenants.models import Tenant
|
|
|
|
|
|
class TenantApiKeyPermission(permissions.BasePermission):
|
|
"""Authentication based on tenants.api_key"""
|
|
|
|
def has_permission(self, request: Request, view: View) -> bool:
|
|
key = CONFIG.get("tenants.api_key", "")
|
|
if not key:
|
|
return False
|
|
token = validate_auth(get_authorization_header(request))
|
|
if token is None:
|
|
return False
|
|
return compare_digest(token, key)
|
|
|
|
|
|
class TenantSerializer(ModelSerializer):
|
|
"""Tenant Serializer"""
|
|
|
|
class Meta:
|
|
model = Tenant
|
|
fields = [
|
|
"tenant_uuid",
|
|
"schema_name",
|
|
"name",
|
|
"ready",
|
|
]
|
|
|
|
|
|
class TenantAdminGroupRequestSerializer(PassiveSerializer):
|
|
"""Tenant admin group creation request serializer"""
|
|
|
|
user = CharField()
|
|
|
|
|
|
class TenantRecoveryKeyRequestSerializer(PassiveSerializer):
|
|
"""Tenant recovery key creation request serializer"""
|
|
|
|
user = CharField()
|
|
duration_days = IntegerField(initial=365)
|
|
|
|
|
|
class TenantRecoveryKeyResponseSerializer(PassiveSerializer):
|
|
"""Tenant recovery key creation response serializer"""
|
|
|
|
expiry = DateTimeField()
|
|
url = CharField()
|
|
|
|
|
|
class TenantViewSet(ModelViewSet):
|
|
"""Tenant Viewset"""
|
|
|
|
queryset = Tenant.objects.all()
|
|
serializer_class = TenantSerializer
|
|
search_fields = [
|
|
"name",
|
|
"schema_name",
|
|
"domains__domain",
|
|
]
|
|
ordering = ["schema_name"]
|
|
authentication_classes = []
|
|
permission_classes = [TenantApiKeyPermission]
|
|
filter_backends = [OrderingFilter, SearchFilter]
|
|
filterset_fields = []
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
# This only checks the license in the default tenant, which is what we want
|
|
if not apps.get_app_config("authentik_enterprise").enabled():
|
|
return HttpResponseNotFound()
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
@extend_schema(
|
|
request=TenantAdminGroupRequestSerializer(),
|
|
responses={
|
|
204: OpenApiResponse(description="Group created successfully."),
|
|
400: OpenApiResponse(description="Bad request"),
|
|
404: OpenApiResponse(description="User not found"),
|
|
},
|
|
)
|
|
@action(detail=True, pagination_class=None, methods=["POST"])
|
|
def create_admin_group(self, request: Request, pk: str) -> Response:
|
|
"""Create admin group and add user to it."""
|
|
tenant = self.get_object()
|
|
with tenant:
|
|
data = TenantAdminGroupRequestSerializer(data=request.data)
|
|
if not data.is_valid():
|
|
return Response(data.errors, status=400)
|
|
user = User.objects.filter(username=data.validated_data.get("user")).first()
|
|
if not user:
|
|
return Response(status=404)
|
|
_ = create_admin_group(user)
|
|
return Response(status=200)
|
|
|
|
@extend_schema(
|
|
request=TenantRecoveryKeyRequestSerializer(),
|
|
responses={
|
|
200: TenantRecoveryKeyResponseSerializer(),
|
|
400: OpenApiResponse(description="Bad request"),
|
|
404: OpenApiResponse(description="User not found"),
|
|
},
|
|
)
|
|
@action(detail=True, pagination_class=None, methods=["POST"])
|
|
def create_recovery_key(self, request: Request, pk: str) -> Response:
|
|
"""Create recovery key for user."""
|
|
tenant = self.get_object()
|
|
with tenant:
|
|
data = TenantRecoveryKeyRequestSerializer(data=request.data)
|
|
if not data.is_valid():
|
|
return Response(data.errors, status=400)
|
|
user = User.objects.filter(username=data.validated_data.get("user")).first()
|
|
if not user:
|
|
return Response(status=404)
|
|
|
|
expiry = now() + timedelta(days=data.validated_data.get("duration_days"))
|
|
|
|
token, url = create_recovery_token(user, expiry, "tenants API")
|
|
|
|
domain = tenant.get_primary_domain()
|
|
host = domain.domain if domain else request.get_host()
|
|
|
|
url = urljoin(f"{request.scheme}://{host}", url)
|
|
|
|
serializer = TenantRecoveryKeyResponseSerializer({"expiry": token.expires, "url": url})
|
|
return Response(serializer.data)
|