458 lines
19 KiB
458 lines
19 KiB
__package__ = 'archivebox.workers'
import os
import sys
import time
import uuid
import json
from typing import ClassVar, Iterable, Type
from pathlib import Path
from rich import print
from django.db import transaction
from django.db.models import QuerySet
from django.utils import timezone
from django.utils.functional import classproperty # type: ignore
from crawls.models import Crawl
from core.models import Snapshot, ArchiveResult
from workers.models import Event, Process, EventDict
class WorkerType:
# static class attributes
name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events
def event_queue(cls) -> QuerySet[Event]:
return Event.objects.filter(name__startswith=cls.listens_to)
def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
cmd = ['archivebox', 'worker', cls.name]
if exit_on_idle:
if wait_for_first_event:
return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
def processes(cls) -> QuerySet[Process]:
return Process.objects.filter(actor_type=cls.name)
def run(cls, wait_for_first_event=False, exit_on_idle=True):
if wait_for_first_event:
event = cls.event_queue.get_next_unclaimed()
while not event:
event = cls.event_queue.get_next_unclaimed()
while True:
output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
yield from output_events
if not output_events:
if exit_on_idle:
def process_next_event(cls) -> Iterable[EventDict]:
event = cls.event_queue.get_next_unclaimed()
output_events = []
if not event:
return []
print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
for output_event in cls.receive(event):
yield output_event
cls.mark_event_succeeded(event, output_events=output_events)
except BaseException as e:
cls.mark_event_failed(event, output_events=output_events, error=e)
def process_idle_tick(cls) -> Iterable[EventDict]:
# reset the idle event to be claimed by the current process
event, _created = Event.objects.update_or_create(
'deliver_at': timezone.now(),
'claimed_proc': None,
'claimed_at': None,
'finished_at': None,
'error': None,
'parent': None,
# then process it like any other event
yield from cls.process_next_event()
def receive(cls, event: Event) -> Iterable[EventDict]:
handler_method = getattr(cls, f'on_{event.name}', None)
if handler_method:
yield from handler_method(event)
raise Exception(f'No handler method for event: {event.name}')
def on_IDLE() -> Iterable[EventDict]:
return []
def mark_event_claimed(event: Event):
proc = Process.current()
with transaction.atomic():
claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
if not claimed:
raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
# process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
# if not process_updated:
# raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
with transaction.atomic():
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
if not updated:
raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
# if not process_updated:
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
# dispatch any output events
for output_event in output_events:
Event.dispatch(event=output_event, parent=event)
# trigger any callback events
if event.on_success:
Event.dispatch(event=event.on_success, parent=event)
def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
with transaction.atomic():
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
if not updated:
raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
# if not process_updated:
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
# add dedicated error event to the output events
if not event.name.endswith('_ERROR'):
output_events = [
{'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
# dispatch any output events
for output_event in output_events:
Event.dispatch(event=output_event, parent=event)
# trigger any callback events
if event.on_failure:
Event.dispatch(event=event.on_failure, parent=event)
class OrchestratorWorker(WorkerType):
name = 'orchestrator'
listens_to = 'PROC_'
outputs = ['PROC_']
def on_PROC_IDLE() -> Iterable[EventDict]:
# look through all Processes that are not yet launched and launch them
to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
if not to_launch:
return []
yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
process = Process.create_and_fork(**event.kwargs)
yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
process = Process.objects.get(id=event.process_id)
yield {'name': 'PROC_KILLED', 'process_id': process.id}
def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
process = Process.objects.get(id=event.process_id)
yield {'name': 'PROC_KILLED', 'process_id': process.id}
class FileSystemWorker(WorkerType):
name = 'filesystem'
listens_to = 'FS_'
outputs = ['FS_']
def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
# check for tmp files that can be deleted
for tmp_file in Path('/tmp').glob('archivebox/*'):
yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
with open(event.path, 'w') as f:
yield {'name': 'FS_CHANGED', 'path': event.path}
def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
with open(event.path, 'a') as f:
yield {'name': 'FS_CHANGED', 'path': event.path}
def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
yield {'name': 'FS_CHANGED', 'path': event.path}
def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
os.system(f'rsync -av {event.src} {event.dst}')
yield {'name': 'FS_CHANGED', 'path': event.dst}
class CrawlWorker(WorkerType):
name = 'crawl'
listens_to = 'CRAWL_'
outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
# check for any stale crawls that can be started or sealed
stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
if not stale_crawl:
return []
if stale_crawl.can_start():
yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
elif stale_crawl.can_seal():
yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
if created:
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.get(id=event.pop('crawl_id'))
diff = {
key: val
for key, val in event.items()
if getattr(crawl, key) != val
if diff:
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.get(id=event.crawl_id)
yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
if not crawl:
crawl.status = Crawl.StatusChoices.SEALED
yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
# create root snapshot
crawl = Crawl.objects.get(id=event.crawl_id)
new_snapshot_id = uuid.uuid4()
yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
class SnapshotWorker(WorkerType):
name = 'snapshot'
listens_to = 'SNAPSHOT_'
outputs = ['SNAPSHOT_', 'FS_']
def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
# check for any snapshots that can be started or sealed
snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
if not snapshot:
return []
if snapshot.can_start():
yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
elif snapshot.can_seal():
yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
assert snapshot.can_seal()
snapshot.status = Snapshot.StatusChoices.SEALED
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
assert snapshot.can_start()
# create pending archiveresults for each extractor
for extractor in snapshot.get_extractors():
new_archiveresult_id = uuid.uuid4()
yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
snapshot.status = Snapshot.StatusChoices.STARTED
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
class ArchiveResultWorker(WorkerType):
name = 'archiveresult'
listens_to = 'ARCHIVERESULT_'
outputs = ['ARCHIVERESULT_', 'FS_']
def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id)
diff = {
key: val
for key, val in event.items()
if getattr(archiveresult, key) != val
if diff:
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id)
yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
if created:
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
diff = {
key: val
for key, val in event.items()
if getattr(archiveresult, key) != val
assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
assert archiveresult.can_seal()
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
yield {
'name': 'SHELL_EXEC',
'cmd': archiveresult.EXTRACTOR.get_cmd(),
'cwd': archiveresult.OUTPUT_DIR,
'on_exit': {
'id': archiveresult.id,
archiveresult.status = ArchiveResult.StatusChoices.STARTED
yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
if not stale_archiveresult:
return []
if stale_archiveresult.can_start():
yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
if stale_archiveresult.can_seal():
yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
def get_worker_type(name: str) -> Type[WorkerType]:
for worker_type in WORKER_TYPES:
matches_verbose_name = (worker_type.name == name)
matches_class_name = (worker_type.__name__.lower() == name.lower())
matches_listens_to = (worker_type.listens_to.strip('_').lower() == name.strip('_').lower())
if matches_verbose_name or matches_class_name or matches_listens_to:
return worker_type
raise Exception(f'Worker type not found: {name}')