447 lines
14 KiB
Python
447 lines
14 KiB
Python
__package__ = 'archivebox.api'
|
|
|
|
import math
|
|
from uuid import UUID
|
|
from typing import List, Optional, Union, Any
|
|
from datetime import datetime
|
|
|
|
from django.db.models import Q
|
|
from django.core.exceptions import ValidationError
|
|
from django.contrib.auth import get_user_model
|
|
from django.shortcuts import redirect
|
|
|
|
from ninja import Router, Schema, FilterSchema, Field, Query
|
|
from ninja.pagination import paginate, PaginationBase
|
|
from ninja.errors import HttpError
|
|
|
|
from core.models import Snapshot, ArchiveResult, Tag
|
|
from api.models import APIToken, OutboundWebhook
|
|
from api.v1_crawls import CrawlSchema, SeedSchema
|
|
|
|
# from .auth import API_AUTH_METHODS
|
|
|
|
|
|
|
|
router = Router(tags=['Core Models'])
|
|
|
|
|
|
|
|
class CustomPagination(PaginationBase):
|
|
class Input(Schema):
|
|
limit: int = 200
|
|
offset: int = 0
|
|
page: int = 0
|
|
|
|
|
|
class Output(Schema):
|
|
total_items: int
|
|
total_pages: int
|
|
page: int
|
|
limit: int
|
|
offset: int
|
|
num_items: int
|
|
items: List[Any]
|
|
|
|
def paginate_queryset(self, queryset, pagination: Input, **params):
|
|
limit = min(pagination.limit, 500)
|
|
offset = pagination.offset or (pagination.page * limit)
|
|
total = queryset.count()
|
|
total_pages = math.ceil(total / limit)
|
|
current_page = math.ceil(offset / (limit + 1))
|
|
items = queryset[offset : offset + limit]
|
|
return {
|
|
'total_items': total,
|
|
'total_pages': total_pages,
|
|
'page': current_page,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'num_items': len(items),
|
|
'items': items,
|
|
}
|
|
|
|
|
|
### ArchiveResult #########################################################################
|
|
|
|
class MinimalArchiveResultSchema(Schema):
|
|
TYPE: str = 'core.models.ArchiveResult'
|
|
|
|
id: UUID
|
|
abid: str
|
|
|
|
created_at: datetime | None
|
|
modified_at: datetime | None
|
|
created_by_id: str
|
|
created_by_username: str
|
|
|
|
status: str
|
|
retry_at: datetime | None
|
|
|
|
extractor: str
|
|
cmd_version: str | None
|
|
cmd: list[str] | None
|
|
pwd: str | None
|
|
output: str | None
|
|
|
|
start_ts: datetime | None
|
|
end_ts: datetime | None
|
|
|
|
@staticmethod
|
|
def resolve_created_by_id(obj):
|
|
return str(obj.created_by_id)
|
|
|
|
@staticmethod
|
|
def resolve_created_by_username(obj) -> str:
|
|
User = get_user_model()
|
|
return User.objects.filter(pk=obj.created_by_id).values_list('username', flat=True)[0]
|
|
|
|
@staticmethod
|
|
def resolve_abid(obj):
|
|
return str(obj.ABID)
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_timestamp(obj):
|
|
return obj.snapshot.timestamp
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_url(obj):
|
|
return obj.snapshot.url
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_id(obj):
|
|
return str(obj.snapshot_id)
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_abid(obj):
|
|
return str(obj.snapshot.ABID)
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_tags(obj):
|
|
return sorted(tag.name for tag in obj.snapshot.tags.all())
|
|
|
|
class ArchiveResultSchema(MinimalArchiveResultSchema):
|
|
TYPE: str = 'core.models.ArchiveResult'
|
|
|
|
# ... Extends MinimalArchiveResultSchema fields ...
|
|
|
|
snapshot_id: UUID
|
|
snapshot_abid: str
|
|
snapshot_timestamp: str
|
|
snapshot_url: str
|
|
snapshot_tags: List[str]
|
|
|
|
|
|
class ArchiveResultFilterSchema(FilterSchema):
|
|
id: Optional[str] = Field(None, q=['id__startswith', 'abid__icontains', 'snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
|
|
|
|
search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains', 'id__startswith', 'abid__icontains', 'snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
|
|
snapshot_id: Optional[str] = Field(None, q=['snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
|
|
snapshot_url: Optional[str] = Field(None, q='snapshot__url__icontains')
|
|
snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name__icontains')
|
|
|
|
status: Optional[str] = Field(None, q='status')
|
|
output: Optional[str] = Field(None, q='output__icontains')
|
|
extractor: Optional[str] = Field(None, q='extractor__icontains')
|
|
cmd: Optional[str] = Field(None, q='cmd__0__icontains')
|
|
pwd: Optional[str] = Field(None, q='pwd__icontains')
|
|
cmd_version: Optional[str] = Field(None, q='cmd_version')
|
|
|
|
created_at: Optional[datetime] = Field(None, q='created_at')
|
|
created_at__gte: Optional[datetime] = Field(None, q='created_at__gte')
|
|
created_at__lt: Optional[datetime] = Field(None, q='created_at__lt')
|
|
|
|
|
|
@router.get("/archiveresults", response=List[ArchiveResultSchema], url_name="get_archiveresult")
|
|
@paginate(CustomPagination)
|
|
def get_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
|
|
"""List all ArchiveResult entries matching these filters."""
|
|
qs = ArchiveResult.objects.all()
|
|
results = filters.filter(qs).distinct()
|
|
return results
|
|
|
|
|
|
@router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema, url_name="get_archiveresult")
|
|
def get_archiveresult(request, archiveresult_id: str):
|
|
"""Get a specific ArchiveResult by id or abid."""
|
|
return ArchiveResult.objects.get(Q(id__icontains=archiveresult_id) | Q(abid__icontains=archiveresult_id))
|
|
|
|
|
|
# @router.post("/archiveresult", response=ArchiveResultSchema)
|
|
# def create_archiveresult(request, payload: ArchiveResultSchema):
|
|
# archiveresult = ArchiveResult.objects.create(**payload.dict())
|
|
# return archiveresult
|
|
#
|
|
# @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
|
|
# def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
|
|
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
|
|
#
|
|
# for attr, value in payload.dict().items():
|
|
# setattr(archiveresult, attr, value)
|
|
# archiveresult.save()
|
|
#
|
|
# return archiveresult
|
|
#
|
|
# @router.delete("/archiveresult/{archiveresult_id}")
|
|
# def delete_archiveresult(request, archiveresult_id: str):
|
|
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
|
|
# archiveresult.delete()
|
|
# return {"success": True}
|
|
|
|
|
|
|
|
|
|
|
|
### Snapshot #########################################################################
|
|
|
|
|
|
class SnapshotSchema(Schema):
|
|
TYPE: str = 'core.models.Snapshot'
|
|
|
|
id: UUID
|
|
abid: str
|
|
|
|
created_by_id: str
|
|
created_by_username: str
|
|
created_at: datetime
|
|
modified_at: datetime
|
|
|
|
status: str
|
|
retry_at: datetime | None
|
|
|
|
bookmarked_at: datetime
|
|
downloaded_at: Optional[datetime]
|
|
|
|
url: str
|
|
tags: List[str]
|
|
title: Optional[str]
|
|
timestamp: str
|
|
archive_path: str
|
|
|
|
# url_for_admin: str
|
|
# url_for_view: str
|
|
|
|
num_archiveresults: int
|
|
archiveresults: List[MinimalArchiveResultSchema]
|
|
|
|
@staticmethod
|
|
def resolve_created_by_id(obj):
|
|
return str(obj.created_by_id)
|
|
|
|
@staticmethod
|
|
def resolve_created_by_username(obj):
|
|
User = get_user_model()
|
|
return User.objects.get(id=obj.created_by_id).username
|
|
|
|
@staticmethod
|
|
def resolve_abid(obj):
|
|
return str(obj.ABID)
|
|
|
|
@staticmethod
|
|
def resolve_tags(obj):
|
|
return sorted(tag.name for tag in obj.tags.all())
|
|
|
|
# @staticmethod
|
|
# def resolve_url_for_admin(obj):
|
|
# return f"/admin/core/snapshot/{obj.id}/change/"
|
|
|
|
# @staticmethod
|
|
# def resolve_url_for_view(obj):
|
|
# return f"/{obj.archive_path}"
|
|
|
|
@staticmethod
|
|
def resolve_num_archiveresults(obj, context):
|
|
return obj.archiveresult_set.all().distinct().count()
|
|
|
|
@staticmethod
|
|
def resolve_archiveresults(obj, context):
|
|
if context['request'].with_archiveresults:
|
|
return obj.archiveresult_set.all().distinct()
|
|
return ArchiveResult.objects.none()
|
|
|
|
|
|
class SnapshotFilterSchema(FilterSchema):
|
|
id: Optional[str] = Field(None, q=['id__icontains', 'abid__icontains', 'timestamp__startswith'])
|
|
abid: Optional[str] = Field(None, q='abid__icontains')
|
|
|
|
created_by_id: str = Field(None, q='created_by_id')
|
|
created_by_username: str = Field(None, q='created_by__username__icontains')
|
|
|
|
created_at__gte: datetime = Field(None, q='created_at__gte')
|
|
created_at__lt: datetime = Field(None, q='created_at__lt')
|
|
created_at: datetime = Field(None, q='created_at')
|
|
modified_at: datetime = Field(None, q='modified_at')
|
|
modified_at__gte: datetime = Field(None, q='modified_at__gte')
|
|
modified_at__lt: datetime = Field(None, q='modified_at__lt')
|
|
|
|
search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains', 'id__icontains', 'abid__icontains', 'timestamp__startswith'])
|
|
url: Optional[str] = Field(None, q='url')
|
|
tag: Optional[str] = Field(None, q='tags__name')
|
|
title: Optional[str] = Field(None, q='title__icontains')
|
|
timestamp: Optional[str] = Field(None, q='timestamp__startswith')
|
|
|
|
bookmarked_at__gte: Optional[datetime] = Field(None, q='bookmarked_at__gte')
|
|
bookmarked_at__lt: Optional[datetime] = Field(None, q='bookmarked_at__lt')
|
|
|
|
|
|
|
|
@router.get("/snapshots", response=List[SnapshotSchema], url_name="get_snapshots")
|
|
@paginate(CustomPagination)
|
|
def get_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=False):
|
|
"""List all Snapshot entries matching these filters."""
|
|
request.with_archiveresults = with_archiveresults
|
|
|
|
qs = Snapshot.objects.all()
|
|
results = filters.filter(qs).distinct()
|
|
return results
|
|
|
|
@router.get("/snapshot/{snapshot_id}", response=SnapshotSchema, url_name="get_snapshot")
|
|
def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
|
|
"""Get a specific Snapshot by abid or id."""
|
|
request.with_archiveresults = with_archiveresults
|
|
snapshot = None
|
|
try:
|
|
snapshot = Snapshot.objects.get(Q(abid__startswith=snapshot_id) | Q(id__startswith=snapshot_id) | Q(timestamp__startswith=snapshot_id))
|
|
except Snapshot.DoesNotExist:
|
|
pass
|
|
|
|
try:
|
|
snapshot = snapshot or Snapshot.objects.get(Q(abid__icontains=snapshot_id) | Q(id__icontains=snapshot_id))
|
|
except Snapshot.DoesNotExist:
|
|
pass
|
|
|
|
if not snapshot:
|
|
raise Snapshot.DoesNotExist
|
|
|
|
return snapshot
|
|
|
|
|
|
# @router.post("/snapshot", response=SnapshotSchema)
|
|
# def create_snapshot(request, payload: SnapshotSchema):
|
|
# snapshot = Snapshot.objects.create(**payload.dict())
|
|
# return snapshot
|
|
#
|
|
# @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
|
|
# def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
|
|
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
|
|
#
|
|
# for attr, value in payload.dict().items():
|
|
# setattr(snapshot, attr, value)
|
|
# snapshot.save()
|
|
#
|
|
# return snapshot
|
|
#
|
|
# @router.delete("/snapshot/{snapshot_id}")
|
|
# def delete_snapshot(request, snapshot_id: str):
|
|
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
|
|
# snapshot.delete()
|
|
# return {"success": True}
|
|
|
|
|
|
|
|
### Tag #########################################################################
|
|
|
|
|
|
class TagSchema(Schema):
|
|
TYPE: str = 'core.models.Tag'
|
|
|
|
id: UUID
|
|
abid: str
|
|
|
|
modified_at: datetime
|
|
created_at: datetime
|
|
created_by_id: str
|
|
created_by_username: str
|
|
|
|
name: str
|
|
slug: str
|
|
num_snapshots: int
|
|
snapshots: List[SnapshotSchema]
|
|
|
|
@staticmethod
|
|
def resolve_created_by_id(obj):
|
|
return str(obj.created_by_id)
|
|
|
|
@staticmethod
|
|
def resolve_created_by_username(obj):
|
|
User = get_user_model()
|
|
return User.objects.get(id=obj.created_by_id).username
|
|
|
|
@staticmethod
|
|
def resolve_num_snapshots(obj, context):
|
|
return obj.snapshot_set.all().distinct().count()
|
|
|
|
@staticmethod
|
|
def resolve_snapshots(obj, context):
|
|
if context['request'].with_snapshots:
|
|
return obj.snapshot_set.all().distinct()
|
|
return Snapshot.objects.none()
|
|
|
|
@router.get("/tags", response=List[TagSchema], url_name="get_tags")
|
|
@paginate(CustomPagination)
|
|
def get_tags(request):
|
|
request.with_snapshots = False
|
|
request.with_archiveresults = False
|
|
return Tag.objects.all().distinct()
|
|
|
|
@router.get("/tag/{tag_id}", response=TagSchema, url_name="get_tag")
|
|
def get_tag(request, tag_id: str, with_snapshots: bool=True):
|
|
request.with_snapshots = with_snapshots
|
|
request.with_archiveresults = False
|
|
tag = None
|
|
try:
|
|
tag = Tag.objects.get(abid__icontains=tag_id)
|
|
except (Tag.DoesNotExist, ValidationError):
|
|
pass
|
|
|
|
try:
|
|
tag = tag or Tag.objects.get(id__icontains=tag_id)
|
|
except (Tag.DoesNotExist, ValidationError):
|
|
pass
|
|
return tag
|
|
|
|
@router.get("/any/{abid}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema, SeedSchema, CrawlSchema], url_name="get_any", summary="Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)")
|
|
def get_any(request, abid: str):
|
|
"""Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)."""
|
|
|
|
request.with_snapshots = False
|
|
request.with_archiveresults = False
|
|
|
|
if abid.startswith(APIToken.abid_prefix):
|
|
raise HttpError(403, 'APIToken objects are not accessible via REST API')
|
|
|
|
if abid.startswith(OutboundWebhook.abid_prefix):
|
|
raise HttpError(403, 'OutboundWebhook objects are not accessible via REST API')
|
|
|
|
response = None
|
|
try:
|
|
response = response or get_snapshot(request, abid)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
response = response or get_archiveresult(request, abid)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
response = response or get_tag(request, abid)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
from api.v1_crawls import get_seed
|
|
response = response or get_seed(request, abid)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
from api.v1_crawls import get_crawl
|
|
response = response or get_crawl(request, abid)
|
|
except Exception:
|
|
pass
|
|
|
|
if response:
|
|
app_label, model_name = response._meta.app_label, response._meta.model_name
|
|
return redirect(f"/api/v1/{app_label}/{model_name}/{response.abid}?{request.META['QUERY_STRING']}")
|
|
|
|
raise HttpError(404, 'Object with given ABID not found')
|