authentik/authentik/events/models.py

696 lines
25 KiB
Python

"""authentik events models"""
import time
from collections import Counter
from datetime import timedelta
from difflib import get_close_matches
from functools import lru_cache
from inspect import currentframe
from smtplib import SMTPException
from uuid import uuid4
from django.apps import apps
from django.db import connection, models
from django.db.models import Count, ExpressionWrapper, F
from django.db.models.fields import DurationField
from django.db.models.functions import Extract
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from django.http import HttpRequest
from django.http.request import QueryDict
from django.utils.timezone import now
from django.utils.translation import gettext as _
from requests import RequestException
from rest_framework.serializers import Serializer
from structlog.stdlib import get_logger
from authentik import get_full_version
from authentik.brands.models import Brand
from authentik.brands.utils import DEFAULT_BRAND
from authentik.core.middleware import (
SESSION_KEY_IMPERSONATE_ORIGINAL_USER,
SESSION_KEY_IMPERSONATE_USER,
)
from authentik.core.models import ExpiringModel, Group, PropertyMapping, User
from authentik.events.apps import GAUGE_TASKS, SYSTEM_TASK_STATUS, SYSTEM_TASK_TIME
from authentik.events.context_processors.base import get_context_processors
from authentik.events.utils import (
cleanse_dict,
get_user,
model_to_dict,
sanitize_dict,
sanitize_item,
)
from authentik.lib.models import DomainlessURLValidator, SerializerModel
from authentik.lib.sentry import SentryIgnoredException
from authentik.lib.utils.http import get_http_session
from authentik.lib.utils.time import timedelta_from_string
from authentik.policies.models import PolicyBindingModel
from authentik.root.middleware import ClientIPMiddleware
from authentik.stages.email.utils import TemplateEmailMessage
from authentik.tenants.models import Tenant
LOGGER = get_logger()
DISCORD_FIELD_LIMIT = 25
NOTIFICATION_SUMMARY_LENGTH = 75
def default_event_duration():
"""Default duration an Event is saved.
This is used as a fallback when no brand is available"""
return now() + timedelta(days=365)
def default_brand():
"""Get a default value for brand"""
return sanitize_dict(model_to_dict(DEFAULT_BRAND))
@lru_cache
def django_app_names() -> list[str]:
"""Get a cached list of all django apps' names (not labels)"""
return [x.name for x in apps.app_configs.values()]
class NotificationTransportError(SentryIgnoredException):
"""Error raised when a notification fails to be delivered"""
class EventAction(models.TextChoices):
"""All possible actions to save into the events log"""
LOGIN = "login"
LOGIN_FAILED = "login_failed"
LOGOUT = "logout"
USER_WRITE = "user_write"
SUSPICIOUS_REQUEST = "suspicious_request"
PASSWORD_SET = "password_set" # noqa # nosec
SECRET_VIEW = "secret_view" # noqa # nosec
SECRET_ROTATE = "secret_rotate" # noqa # nosec
INVITE_USED = "invitation_used"
AUTHORIZE_APPLICATION = "authorize_application"
SOURCE_LINKED = "source_linked"
IMPERSONATION_STARTED = "impersonation_started"
IMPERSONATION_ENDED = "impersonation_ended"
FLOW_EXECUTION = "flow_execution"
POLICY_EXECUTION = "policy_execution"
POLICY_EXCEPTION = "policy_exception"
PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception"
SYSTEM_TASK_EXECUTION = "system_task_execution"
SYSTEM_TASK_EXCEPTION = "system_task_exception"
SYSTEM_EXCEPTION = "system_exception"
CONFIGURATION_ERROR = "configuration_error"
MODEL_CREATED = "model_created"
MODEL_UPDATED = "model_updated"
MODEL_DELETED = "model_deleted"
EMAIL_SENT = "email_sent"
UPDATE_AVAILABLE = "update_available"
CUSTOM_PREFIX = "custom_"
class EventQuerySet(QuerySet):
"""Custom events query set with helper functions"""
def get_events_per(
self,
time_since: timedelta,
extract: Extract,
data_points: int,
) -> list[dict[str, int]]:
"""Get event count by hour in the last day, fill with zeros"""
_now = now()
max_since = timedelta(days=60)
# Allow maximum of 60 days to limit load
if time_since.total_seconds() > max_since.total_seconds():
time_since = max_since
date_from = _now - time_since
result = (
self.filter(created__gte=date_from)
.annotate(age=ExpressionWrapper(_now - F("created"), output_field=DurationField()))
.annotate(age_interval=extract("age"))
.values("age_interval")
.annotate(count=Count("pk"))
.order_by("age_interval")
)
data = Counter({int(d["age_interval"]): d["count"] for d in result})
results = []
interval_delta = time_since / data_points
for interval in range(1, -data_points, -1):
results.append(
{
"x_cord": time.mktime((_now + (interval_delta * interval)).timetuple()) * 1000,
"y_cord": data[interval * -1],
}
)
return results
class EventManager(Manager):
"""Custom helper methods for Events"""
def get_queryset(self) -> QuerySet:
"""use custom queryset"""
return EventQuerySet(self.model, using=self._db)
def get_events_per(
self,
time_since: timedelta,
extract: Extract,
data_points: int,
) -> list[dict[str, int]]:
"""Wrap method from queryset"""
return self.get_queryset().get_events_per(time_since, extract, data_points)
class Event(SerializerModel, ExpiringModel):
"""An individual Audit/Metrics/Notification/Error Event"""
event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
user = models.JSONField(default=dict)
action = models.TextField(choices=EventAction.choices)
app = models.TextField()
context = models.JSONField(default=dict, blank=True)
client_ip = models.GenericIPAddressField(null=True)
created = models.DateTimeField(auto_now_add=True)
brand = models.JSONField(default=default_brand, blank=True)
# Shadow the expires attribute from ExpiringModel to override the default duration
expires = models.DateTimeField(default=default_event_duration)
objects = EventManager()
@staticmethod
def _get_app_from_request(request: HttpRequest) -> str:
if not isinstance(request, HttpRequest):
return ""
return request.resolver_match.app_name
@staticmethod
def new(
action: str | EventAction,
app: str | None = None,
**kwargs,
) -> "Event":
"""Create new Event instance from arguments. Instance is NOT saved."""
if not isinstance(action, EventAction):
action = EventAction.CUSTOM_PREFIX + action
if not app:
current = currentframe()
parent = current.f_back
app = parent.f_globals["__name__"]
# Attempt to match the calling module to the django app it belongs to
# if we can't find a match, keep the module name
django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
# Also ensure that closest django app has the correct prefix
if len(django_apps) > 0 and django_apps[0].startswith(app):
app = django_apps[0]
cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
event = Event(action=action, app=app, context=cleaned_kwargs)
return event
def set_user(self, user: User) -> "Event":
"""Set `.user` based on user, ensuring the correct attributes are copied.
This should only be used when self.from_http is *not* used."""
self.user = get_user(user)
return self
def from_http(self, request: HttpRequest, user: User | None = None) -> "Event":
"""Add data from a Django-HttpRequest, allowing the creation of
Events independently from requests.
`user` arguments optionally overrides user from requests."""
if request:
from authentik.flows.views.executor import QS_QUERY
self.context["http_request"] = {
"path": request.path,
"method": request.method,
"args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
"user_agent": request.META.get("HTTP_USER_AGENT", ""),
}
# Special case for events created during flow execution
# since they keep the http query within a wrapped query
if QS_QUERY in self.context["http_request"]["args"]:
wrapped = self.context["http_request"]["args"][QS_QUERY]
self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
if hasattr(request, "tenant"):
tenant: Tenant = request.tenant
# Because self.created only gets set on save, we can't use it's value here
# hence we set self.created to now and then use it
self.created = now()
self.expires = self.created + timedelta_from_string(tenant.event_retention)
if hasattr(request, "brand"):
brand: Brand = request.brand
self.brand = sanitize_dict(model_to_dict(brand))
if hasattr(request, "user"):
original_user = None
if hasattr(request, "session"):
original_user = request.session.get(SESSION_KEY_IMPERSONATE_ORIGINAL_USER, None)
self.user = get_user(request.user, original_user)
if user:
self.user = get_user(user)
# Check if we're currently impersonating, and add that user
if hasattr(request, "session"):
if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
# User 255.255.255.255 as fallback if IP cannot be determined
self.client_ip = ClientIPMiddleware.get_client_ip(request)
# Enrich event data
for processor in get_context_processors():
processor.enrich_event(self)
# If there's no app set, we get it from the requests too
if not self.app:
self.app = Event._get_app_from_request(request)
self.save()
return self
def save(self, *args, **kwargs):
if self._state.adding:
LOGGER.info(
"Created Event",
action=self.action,
context=self.context,
client_ip=self.client_ip,
user=self.user,
)
super().save(*args, **kwargs)
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.events import EventSerializer
return EventSerializer
@property
def summary(self) -> str:
"""Return a summary of this event."""
if "message" in self.context:
return self.context["message"]
return f"{self.action}: {self.context}"
def __str__(self) -> str:
return f"Event action={self.action} user={self.user} context={self.context}"
class Meta:
verbose_name = _("Event")
verbose_name_plural = _("Events")
indexes = [
models.Index(fields=["action"]),
models.Index(fields=["user"]),
models.Index(fields=["app"]),
models.Index(fields=["created"]),
models.Index(fields=["client_ip"]),
models.Index(
models.F("context__authorized_application"), name="authentik_e_ctx_app__idx"
),
]
class TransportMode(models.TextChoices):
"""Modes that a notification transport can send a notification"""
LOCAL = "local", _("authentik inbuilt notifications")
WEBHOOK = "webhook", _("Generic Webhook")
WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
EMAIL = "email", _("Email")
class NotificationTransport(SerializerModel):
"""Action which is executed when a Rule matches"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField(unique=True)
mode = models.TextField(choices=TransportMode.choices, default=TransportMode.LOCAL)
webhook_url = models.TextField(blank=True, validators=[DomainlessURLValidator()])
webhook_mapping = models.ForeignKey(
"NotificationWebhookMapping", on_delete=models.SET_DEFAULT, null=True, default=None
)
send_once = models.BooleanField(
default=False,
help_text=_(
"Only send notification once, for example when sending a webhook into a chat channel."
),
)
def send(self, notification: "Notification") -> list[str]:
"""Send notification to user, called from async task"""
if self.mode == TransportMode.LOCAL:
return self.send_local(notification)
if self.mode == TransportMode.WEBHOOK:
return self.send_webhook(notification)
if self.mode == TransportMode.WEBHOOK_SLACK:
return self.send_webhook_slack(notification)
if self.mode == TransportMode.EMAIL:
return self.send_email(notification)
raise ValueError(f"Invalid mode {self.mode} set")
def send_local(self, notification: "Notification") -> list[str]:
"""Local notification delivery"""
if self.webhook_mapping:
self.webhook_mapping.evaluate(
user=notification.user,
request=None,
notification=notification,
)
notification.save()
return []
def send_webhook(self, notification: "Notification") -> list[str]:
"""Send notification to generic webhook"""
default_body = {
"body": notification.body,
"severity": notification.severity,
"user_email": notification.user.email,
"user_username": notification.user.username,
}
if notification.event and notification.event.user:
default_body["event_user_email"] = notification.event.user.get("email", None)
default_body["event_user_username"] = notification.event.user.get("username", None)
if self.webhook_mapping:
default_body = sanitize_item(
self.webhook_mapping.evaluate(
user=notification.user,
request=None,
notification=notification,
)
)
try:
response = get_http_session().post(
self.webhook_url,
json=default_body,
)
response.raise_for_status()
except RequestException as exc:
raise NotificationTransportError(
exc.response.text if exc.response else str(exc)
) from exc
return [
response.status_code,
response.text,
]
def send_webhook_slack(self, notification: "Notification") -> list[str]:
"""Send notification to slack or slack-compatible endpoints"""
fields = [
{
"title": _("Severity"),
"value": notification.severity,
"short": True,
},
{
"title": _("Dispatched for user"),
"value": str(notification.user),
"short": True,
},
]
if notification.event:
if notification.event.user:
fields.append(
{
"title": _("Event user"),
"value": str(notification.event.user.get("username")),
"short": True,
},
)
for key, value in notification.event.context.items():
if not isinstance(value, str):
continue
# https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
if len(fields) >= DISCORD_FIELD_LIMIT:
continue
fields.append({"title": key[:256], "value": value[:1024]})
body = {
"username": "authentik",
"icon_url": "https://goauthentik.io/img/icon.png",
"attachments": [
{
"author_name": "authentik",
"author_link": "https://goauthentik.io",
"author_icon": "https://goauthentik.io/img/icon.png",
"title": notification.body,
"color": "#fd4b2d",
"fields": fields,
"footer": f"authentik {get_full_version()}",
}
],
}
if notification.event:
body["attachments"][0]["title"] = notification.event.action
try:
response = get_http_session().post(self.webhook_url, json=body)
response.raise_for_status()
except RequestException as exc:
text = exc.response.text if exc.response else str(exc)
raise NotificationTransportError(text) from exc
return [
response.status_code,
response.text,
]
def send_email(self, notification: "Notification") -> list[str]:
"""Send notification via global email configuration"""
if notification.user.email.strip() == "":
LOGGER.info(
"Discarding notification as user has no email address",
user=notification.user,
notification=notification,
)
return None
subject_prefix = "authentik Notification: "
context = {
"key_value": {
"user_email": notification.user.email,
"user_username": notification.user.username,
},
"body": notification.body,
"title": "",
}
if notification.event and notification.event.user:
context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
context["key_value"]["event_user_username"] = notification.event.user.get(
"username", None
)
if notification.event:
context["title"] += notification.event.action
for key, value in notification.event.context.items():
if not isinstance(value, str):
continue
context["key_value"][key] = value
else:
context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
# TODO: improve permission check
if notification.user.is_superuser:
context["source"] = {
"from": self.name,
}
mail = TemplateEmailMessage(
subject=subject_prefix + context["title"],
to=[(notification.user.name, notification.user.email)],
language=notification.user.locale(),
template_name="email/event_notification.html",
template_context=context,
)
# Email is sent directly here, as the call to send() should have been from a task.
try:
from authentik.stages.email.tasks import send_mail
return send_mail(mail.__dict__)
except (SMTPException, ConnectionError, OSError) as exc:
raise NotificationTransportError(exc) from exc
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.notification_transports import NotificationTransportSerializer
return NotificationTransportSerializer
def __str__(self) -> str:
return f"Notification Transport {self.name}"
class Meta:
verbose_name = _("Notification Transport")
verbose_name_plural = _("Notification Transports")
class NotificationSeverity(models.TextChoices):
"""Severity images that a notification can have"""
NOTICE = "notice", _("Notice")
WARNING = "warning", _("Warning")
ALERT = "alert", _("Alert")
class Notification(SerializerModel):
"""Event Notification"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
severity = models.TextField(choices=NotificationSeverity.choices)
body = models.TextField()
created = models.DateTimeField(auto_now_add=True)
event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
seen = models.BooleanField(default=False)
user = models.ForeignKey(User, on_delete=models.CASCADE)
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.notifications import NotificationSerializer
return NotificationSerializer
def __str__(self) -> str:
body_trunc = (
(self.body[:NOTIFICATION_SUMMARY_LENGTH] + "..")
if len(self.body) > NOTIFICATION_SUMMARY_LENGTH
else self.body
)
return f"Notification for user {self.user_id}: {body_trunc}"
class Meta:
verbose_name = _("Notification")
verbose_name_plural = _("Notifications")
class NotificationRule(SerializerModel, PolicyBindingModel):
"""Decide when to create a Notification based on policies attached to this object."""
name = models.TextField(unique=True)
transports = models.ManyToManyField(
NotificationTransport,
help_text=_(
"Select which transports should be used to notify the user. If none are "
"selected, the notification will only be shown in the authentik UI."
),
blank=True,
)
severity = models.TextField(
choices=NotificationSeverity.choices,
default=NotificationSeverity.NOTICE,
help_text=_("Controls which severity level the created notifications will have."),
)
group = models.ForeignKey(
Group,
help_text=_(
"Define which group of users this notification should be sent and shown to. "
"If left empty, Notification won't ben sent."
),
null=True,
blank=True,
on_delete=models.SET_NULL,
)
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.notification_rules import NotificationRuleSerializer
return NotificationRuleSerializer
def __str__(self) -> str:
return f"Notification Rule {self.name}"
class Meta:
verbose_name = _("Notification Rule")
verbose_name_plural = _("Notification Rules")
class NotificationWebhookMapping(PropertyMapping):
"""Modify the payload of outgoing webhook requests"""
@property
def component(self) -> str:
return "ak-property-mapping-notification-form"
@property
def serializer(self) -> type[type[Serializer]]:
from authentik.events.api.notification_mappings import NotificationWebhookMappingSerializer
return NotificationWebhookMappingSerializer
def __str__(self):
return f"Webhook Mapping {self.name}"
class Meta:
verbose_name = _("Webhook Mapping")
verbose_name_plural = _("Webhook Mappings")
class TaskStatus(models.TextChoices):
"""Possible states of tasks"""
UNKNOWN = "unknown"
SUCCESSFUL = "successful"
WARNING = "warning"
ERROR = "error"
class SystemTask(SerializerModel, ExpiringModel):
"""Info about a system task running in the background along with details to restart the task"""
uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
name = models.TextField()
uid = models.TextField(null=True)
start_timestamp = models.DateTimeField(default=now)
finish_timestamp = models.DateTimeField(default=now)
duration = models.FloatField(default=0)
status = models.TextField(choices=TaskStatus.choices)
description = models.TextField(null=True)
messages = models.JSONField()
task_call_module = models.TextField()
task_call_func = models.TextField()
task_call_args = models.JSONField(default=list)
task_call_kwargs = models.JSONField(default=dict)
@property
def serializer(self) -> type[Serializer]:
from authentik.events.api.tasks import SystemTaskSerializer
return SystemTaskSerializer
def update_metrics(self):
"""Update prometheus metrics"""
# TODO: Deprecated metric - remove in 2024.2 or later
GAUGE_TASKS.labels(
tenant=connection.schema_name,
task_name=self.name,
task_uid=self.uid or "",
status=self.status.lower(),
).set(self.duration)
SYSTEM_TASK_TIME.labels(
tenant=connection.schema_name,
task_name=self.name,
task_uid=self.uid or "",
).observe(self.duration)
SYSTEM_TASK_STATUS.labels(
tenant=connection.schema_name,
task_name=self.name,
task_uid=self.uid or "",
status=self.status.lower(),
).inc()
def __str__(self) -> str:
return f"System Task {self.name}"
class Meta:
unique_together = (("name", "uid"),)
# Remove "add", "change" and "delete" permissions as those are not used
default_permissions = ["view"]
permissions = [("run_task", _("Run task"))]
verbose_name = _("System Task")
verbose_name_plural = _("System Tasks")