ArchiveBox/archivebox/index/schema.py

455 lines
15 KiB
Python

"""
WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED.
DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py
These are the old types we used to use before ArchiveBox v0.4 (before we switched to Django).
"""
__package__ = 'archivebox.index'
from pathlib import Path
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional, Union, ClassVar
from pydantic import BaseModel, ConfigDict, Field, field_validator, computed_field
from benedict import benedict
from archivebox.config import ARCHIVE_DIR, CONSTANTS
from archivebox.misc.util import parse_date
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
# Type aliases
LinkDict = Dict[str, Any]
ArchiveOutput = Union[str, Exception, None]
class ArchiveResult(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
TYPE: str = 'index.schema.ArchiveResult'
cmd: list[str]
pwd: str | None = None
cmd_version: str | None = None
output: ArchiveOutput | None = None
status: str
start_ts: datetime
end_ts: datetime
index_texts: list[str] | None = None
# Class variables for compatibility
_field_names: ClassVar[list[str] | None] = None
@field_validator('status')
@classmethod
def validate_status(cls, v: str) -> str:
if not v:
raise ValueError('status must be a non-empty string')
return v
@field_validator('cmd')
@classmethod
def validate_cmd(cls, v: List[str]) -> List[str]:
if not all(isinstance(arg, str) and arg for arg in v):
raise ValueError('all command arguments must be non-empty strings')
return v
@field_validator('pwd')
@classmethod
def validate_pwd(cls, v: Optional[str]) -> Optional[str]:
if v == '': # Convert empty string to None for consistency
return None
return v
@field_validator('cmd_version')
@classmethod
def validate_cmd_version(cls, v: Optional[str]) -> Optional[str]:
if v == '': # Convert empty string to None for consistency
return None
return v
def model_dump(self, **kwargs) -> dict:
"""Backwards compatible with _asdict()"""
return super().model_dump(**kwargs)
@classmethod
def field_names(cls) -> List[str]:
"""Get all field names of the model"""
if cls._field_names is None:
cls._field_names = list(cls.model_fields.keys())
return cls._field_names
@classmethod
def guess_ts(cls, dict_info: dict) -> tuple[datetime, datetime]:
"""Guess timestamps from dictionary info"""
parsed_timestamp = parse_date(dict_info["timestamp"])
start_ts = parsed_timestamp
end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
return start_ts, end_ts
@classmethod
def from_json(cls, json_info: dict, guess: bool = False) -> 'ArchiveResult':
"""Create instance from JSON data"""
info = {
key: val
for key, val in json_info.items()
if key in cls.field_names()
}
if guess:
if "start_ts" not in info:
info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
else:
info['start_ts'] = parse_date(info['start_ts'])
info['end_ts'] = parse_date(info['end_ts'])
if "pwd" not in info:
info["pwd"] = str(ARCHIVE_DIR / json_info["timestamp"])
if "cmd_version" not in info:
info["cmd_version"] = "Undefined"
if "cmd" not in info:
info["cmd"] = []
else:
info['start_ts'] = parse_date(info['start_ts'])
info['end_ts'] = parse_date(info['end_ts'])
info['cmd_version'] = info.get('cmd_version')
# Handle string command as list
if isinstance(info.get("cmd"), str):
info["cmd"] = [info["cmd"]]
return cls(**info)
def to_dict(self, *keys: str) -> dict:
"""Convert to dictionary, optionally filtering by keys"""
data = self.model_dump()
if keys:
return {k: v for k, v in data.items() if k in keys}
return data
def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
"""Convert to JSON string"""
return self.model_dump_json(indent=indent, exclude_none=True)
def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
"""Convert to CSV string"""
data = self.model_dump()
cols = cols or self.field_names()
return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
@computed_field
def duration(self) -> int:
"""Calculate duration in seconds between start and end timestamps"""
return int((self.end_ts - self.start_ts).total_seconds())
class Link(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
TYPE: str = 'index.schema.Link'
timestamp: str
url: str
title: str | None = None
tags: str | None = None
sources: list[str] = Field(default_factory=list)
history: dict[str, list[ArchiveResult]] = Field(default_factory=dict)
downloaded_at: datetime | None = None
# Class variables for compatibility
_field_names: ClassVar[list[str] | None] = None
def __str__(self) -> str:
return f'[{self.timestamp}] {self.url} "{self.title}"'
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Link):
return NotImplemented
return self.url == other.url
def __gt__(self, other: Any) -> bool:
if not isinstance(other, Link):
return NotImplemented
if not self.timestamp or not other.timestamp:
return NotImplemented
return float(self.timestamp) > float(other.timestamp)
@field_validator('timestamp')
@classmethod
def validate_timestamp(cls, v: str) -> str:
if not v:
raise ValueError('timestamp must be a non-empty string')
if not v.replace('.', '').isdigit():
raise ValueError('timestamp must be a float str')
return v
@field_validator('url')
@classmethod
def validate_url(cls, v: str) -> str:
if not v or '://' not in v:
raise ValueError('url must be a valid URL string')
return v
@field_validator('title')
@classmethod
def validate_title(cls, v: Optional[str]) -> Optional[str]:
if v is not None and not v:
raise ValueError('title must be a non-empty string if provided')
return v
@field_validator('sources')
@classmethod
def validate_sources(cls, v: List[str]) -> List[str]:
if not all(isinstance(source, str) and source for source in v):
raise ValueError('all sources must be non-empty strings')
return v
# Backwards compatibility methods
def _asdict(self, extended: bool = False) -> dict:
return benedict(self)
def overwrite(self, **kwargs) -> 'Link':
"""Pure functional version of dict.update that returns a new instance"""
current_data = self.model_dump()
current_data.update(kwargs)
return Link(**current_data)
@classmethod
def field_names(cls) -> list[str]:
if cls._field_names is None:
cls._field_names = list(cls.model_fields.keys())
return cls._field_names
@classmethod
def from_json(cls, json_info: dict, guess: bool = False) -> 'Link':
info = {
key: val
for key, val in json_info.items()
if key in cls.field_names()
}
# Handle downloaded_at
info['downloaded_at'] = cls._parse_date(info.get('updated') or info.get('downloaded_at'))
info['sources'] = info.get('sources') or []
# Handle history
json_history = info.get('history') or {}
cast_history = {}
for method, method_history in json_history.items():
cast_history[method] = []
for json_result in method_history:
assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
cast_result = ArchiveResult.from_json(json_result, guess)
cast_history[method].append(cast_result)
info['history'] = cast_history
return cls(**info)
def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
return self.model_dump_json(indent=indent)
def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
data = self.model_dump()
cols = cols or self.field_names()
return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
# Properties for compatibility
@property
def link_dir(self) -> str:
return str(ARCHIVE_DIR / self.timestamp)
@property
def archive_path(self) -> str:
return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
@computed_field
def bookmarked_date(self) -> Optional[str]:
max_ts = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp()
if self.timestamp and self.timestamp.replace('.', '').isdigit():
if 0 < float(self.timestamp) < max_ts:
return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
return str(self.timestamp)
return None
@computed_field
def downloaded_datestr(self) -> Optional[str]:
return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
@property
def archive_dates(self) -> list[datetime]:
return [
self._parse_date(result.start_ts) # type: ignore
for results in self.history.values()
for result in results
]
@property
def oldest_archive_date(self) -> Optional[datetime]:
dates = self.archive_dates
return min(dates) if dates else None
@property
def newest_archive_date(self) -> Optional[datetime]:
dates = self.archive_dates
return max(dates) if dates else None
@property
def num_outputs(self) -> int:
try:
return self.as_snapshot().num_outputs
except Exception:
return 0
@property
def num_failures(self) -> int:
return sum(
1 for results in self.history.values()
for result in results
if result.status == 'failed')
def latest_outputs(self, status: Optional[str] = None) -> dict[str, Any]:
"""Get the latest output that each archive method produced for link"""
ARCHIVE_METHODS = (
'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf',
'screenshot', 'dom', 'git', 'media', 'archive_org',
)
latest: Dict[str, Any] = {}
for archive_method in ARCHIVE_METHODS:
# get most recent succesful result in history for each archive method
history = self.history.get(archive_method) or []
history = list(filter(lambda result: result.output, reversed(history)))
if status is not None:
history = list(filter(lambda result: result.status == status, history))
history = list(history)
latest[archive_method] = history[0].output if history else None
return latest
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""Predict the expected output paths that should be present after archiving"""
# You'll need to implement the actual logic based on your requirements
# TODO: banish this awful duplication from the codebase and import these
# from their respective extractor files
from abx_plugin_favicon.config import FAVICON_CONFIG
canonical = {
'index_path': 'index.html',
'favicon_path': 'favicon.ico',
'google_favicon_path': FAVICON_CONFIG.FAVICON_PROVIDER.format(self.domain),
'wget_path': f'warc/{self.timestamp}',
'warc_path': 'warc/',
'singlefile_path': 'singlefile.html',
'readability_path': 'readability/content.html',
'mercury_path': 'mercury/content.html',
'htmltotext_path': 'htmltotext.txt',
'pdf_path': 'output.pdf',
'screenshot_path': 'screenshot.png',
'dom_path': 'output.html',
'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
'git_path': 'git/',
'media_path': 'media/',
'headers_path': 'headers.json',
}
if self.is_static:
static_path = f'warc/{self.timestamp}'
canonical.update({
'title': self.basename,
'wget_path': static_path,
'pdf_path': static_path,
'screenshot_path': static_path,
'dom_path': static_path,
'singlefile_path': static_path,
'readability_path': static_path,
'mercury_path': static_path,
'htmltotext_path': static_path,
})
return canonical
# URL helper properties
@property
def url_hash(self) -> str:
# Implement your URL hashing logic here
from hashlib import sha256
return sha256(self.url.encode()).hexdigest()[:8]
@property
def scheme(self) -> str:
return self.url.split('://')[0]
@property
def domain(self) -> str:
return self.url.split('://')[1].split('/')[0]
@property
def path(self) -> str:
parts = self.url.split('://', 1)
return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
@property
def basename(self) -> str:
return self.path.split('/')[-1]
@property
def extension(self) -> str:
basename = self.basename
return basename.split('.')[-1] if '.' in basename else ''
@property
def base_url(self) -> str:
return f'{self.scheme}://{self.domain}'
@property
def is_static(self) -> bool:
static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
return any(self.url.lower().endswith(ext) for ext in static_extensions)
@property
def is_archived(self) -> bool:
output_paths = (
self.domain,
'output.html',
'output.pdf',
'screenshot.png',
'singlefile.html',
'readability/content.html',
'mercury/content.html',
'htmltotext.txt',
'media',
'git',
)
return any((Path(ARCHIVE_DIR) / self.timestamp / path).exists() for path in output_paths)
def as_snapshot(self):
"""Implement this based on your Django model requirements"""
from core.models import Snapshot
return Snapshot.objects.get(url=self.url)
# Helper methods
@staticmethod
def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
@staticmethod
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
try:
return datetime.fromtimestamp(float(date_str))
except (ValueError, TypeError):
return None