ArchiveBox/archivebox/workers/worker.py

458 lines
19 KiB
Python

__package__ = 'archivebox.workers'
import os
import sys
import time
import uuid
import json
from typing import ClassVar, Iterable, Type
from pathlib import Path
from rich import print
from django.db import transaction
from django.db.models import QuerySet
from django.utils import timezone
from django.utils.functional import classproperty # type: ignore
from crawls.models import Crawl
from core.models import Snapshot, ArchiveResult
from workers.models import Event, Process, EventDict
class WorkerType:
# static class attributes
name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events
@classproperty
def event_queue(cls) -> QuerySet[Event]:
return Event.objects.filter(name__startswith=cls.listens_to)
@classmethod
def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
cmd = ['archivebox', 'worker', cls.name]
if exit_on_idle:
cmd.append('--exit-on-idle')
if wait_for_first_event:
cmd.append('--wait-for-first-event')
return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
@classproperty
def processes(cls) -> QuerySet[Process]:
return Process.objects.filter(actor_type=cls.name)
@classmethod
def run(cls, wait_for_first_event=False, exit_on_idle=True):
if wait_for_first_event:
event = cls.event_queue.get_next_unclaimed()
while not event:
time.sleep(cls.poll_interval)
event = cls.event_queue.get_next_unclaimed()
while True:
output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
yield from output_events
if not output_events:
if exit_on_idle:
break
else:
time.sleep(cls.poll_interval)
@classmethod
def process_next_event(cls) -> Iterable[EventDict]:
event = cls.event_queue.get_next_unclaimed()
output_events = []
if not event:
return []
cls.mark_event_claimed(event)
print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
try:
for output_event in cls.receive(event):
output_events.append(output_event)
yield output_event
cls.mark_event_succeeded(event, output_events=output_events)
except BaseException as e:
cls.mark_event_failed(event, output_events=output_events, error=e)
@classmethod
def process_idle_tick(cls) -> Iterable[EventDict]:
# reset the idle event to be claimed by the current process
event, _created = Event.objects.update_or_create(
name=f'{cls.listens_to}IDLE',
emitted_by=Process.current(),
defaults={
'deliver_at': timezone.now(),
'claimed_proc': None,
'claimed_at': None,
'finished_at': None,
'error': None,
'parent': None,
},
)
# then process it like any other event
yield from cls.process_next_event()
@classmethod
def receive(cls, event: Event) -> Iterable[EventDict]:
handler_method = getattr(cls, f'on_{event.name}', None)
if handler_method:
yield from handler_method(event)
else:
raise Exception(f'No handler method for event: {event.name}')
@staticmethod
def on_IDLE() -> Iterable[EventDict]:
return []
@staticmethod
def mark_event_claimed(event: Event):
proc = Process.current()
with transaction.atomic():
claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
event.refresh_from_db()
if not claimed:
raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
# process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
# if not process_updated:
# raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
@staticmethod
def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
event.refresh_from_db()
assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
with transaction.atomic():
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
event.refresh_from_db()
if not updated:
raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
# if not process_updated:
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
# dispatch any output events
for output_event in output_events:
Event.dispatch(event=output_event, parent=event)
# trigger any callback events
if event.on_success:
Event.dispatch(event=event.on_success, parent=event)
@staticmethod
def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
event.refresh_from_db()
assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
with transaction.atomic():
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
event.refresh_from_db()
if not updated:
raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
# if not process_updated:
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
# add dedicated error event to the output events
if not event.name.endswith('_ERROR'):
output_events = [
*output_events,
{'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
]
# dispatch any output events
for output_event in output_events:
Event.dispatch(event=output_event, parent=event)
# trigger any callback events
if event.on_failure:
Event.dispatch(event=event.on_failure, parent=event)
class OrchestratorWorker(WorkerType):
name = 'orchestrator'
listens_to = 'PROC_'
outputs = ['PROC_']
@staticmethod
def on_PROC_IDLE() -> Iterable[EventDict]:
# look through all Processes that are not yet launched and launch them
to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
if not to_launch:
return []
yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
@staticmethod
def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
process = Process.create_and_fork(**event.kwargs)
yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
@staticmethod
def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
process = Process.objects.get(id=event.process_id)
process.kill()
yield {'name': 'PROC_KILLED', 'process_id': process.id}
@staticmethod
def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
process = Process.objects.get(id=event.process_id)
process.kill()
yield {'name': 'PROC_KILLED', 'process_id': process.id}
class FileSystemWorker(WorkerType):
name = 'filesystem'
listens_to = 'FS_'
outputs = ['FS_']
@staticmethod
def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
# check for tmp files that can be deleted
for tmp_file in Path('/tmp').glob('archivebox/*'):
yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
@staticmethod
def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
with open(event.path, 'w') as f:
f.write(event.content)
yield {'name': 'FS_CHANGED', 'path': event.path}
@staticmethod
def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
with open(event.path, 'a') as f:
f.write(event.content)
yield {'name': 'FS_CHANGED', 'path': event.path}
@staticmethod
def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
os.remove(event.path)
yield {'name': 'FS_CHANGED', 'path': event.path}
@staticmethod
def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
os.system(f'rsync -av {event.src} {event.dst}')
yield {'name': 'FS_CHANGED', 'path': event.dst}
class CrawlWorker(WorkerType):
name = 'crawl'
listens_to = 'CRAWL_'
outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
@staticmethod
def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
# check for any stale crawls that can be started or sealed
stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
if not stale_crawl:
return []
if stale_crawl.can_start():
yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
elif stale_crawl.can_seal():
yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
@staticmethod
def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
if created:
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
@staticmethod
def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.get(id=event.pop('crawl_id'))
diff = {
key: val
for key, val in event.items()
if getattr(crawl, key) != val
}
if diff:
crawl.update(**diff)
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
@staticmethod
def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.get(id=event.crawl_id)
yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
@staticmethod
def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
if not crawl:
return
crawl.status = Crawl.StatusChoices.SEALED
crawl.save()
yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
@staticmethod
def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
# create root snapshot
crawl = Crawl.objects.get(id=event.crawl_id)
new_snapshot_id = uuid.uuid4()
yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
class SnapshotWorker(WorkerType):
name = 'snapshot'
listens_to = 'SNAPSHOT_'
outputs = ['SNAPSHOT_', 'FS_']
@staticmethod
def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
# check for any snapshots that can be started or sealed
snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
if not snapshot:
return []
if snapshot.can_start():
yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
elif snapshot.can_seal():
yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
@staticmethod
def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
@staticmethod
def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
assert snapshot.can_seal()
snapshot.status = Snapshot.StatusChoices.SEALED
snapshot.save()
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
@staticmethod
def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
assert snapshot.can_start()
# create pending archiveresults for each extractor
for extractor in snapshot.get_extractors():
new_archiveresult_id = uuid.uuid4()
yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
snapshot.status = Snapshot.StatusChoices.STARTED
snapshot.save()
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
class ArchiveResultWorker(WorkerType):
name = 'archiveresult'
listens_to = 'ARCHIVERESULT_'
outputs = ['ARCHIVERESULT_', 'FS_']
@staticmethod
def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id)
diff = {
key: val
for key, val in event.items()
if getattr(archiveresult, key) != val
}
if diff:
archiveresult.update(**diff)
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
@staticmethod
def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id)
yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
@staticmethod
def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
if created:
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
else:
diff = {
key: val
for key, val in event.items()
if getattr(archiveresult, key) != val
}
assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
@staticmethod
def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
assert archiveresult.can_seal()
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
@staticmethod
def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
yield {
'name': 'SHELL_EXEC',
'cmd': archiveresult.EXTRACTOR.get_cmd(),
'cwd': archiveresult.OUTPUT_DIR,
'on_exit': {
'name': 'ARCHIVERESULT_SEAL',
'id': archiveresult.id,
},
}
archiveresult.status = ArchiveResult.StatusChoices.STARTED
archiveresult.save()
yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
@staticmethod
def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
if not stale_archiveresult:
return []
if stale_archiveresult.can_start():
yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
if stale_archiveresult.can_seal():
yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
WORKER_TYPES = [
OrchestratorWorker,
FileSystemWorker,
CrawlWorker,
SnapshotWorker,
ArchiveResultWorker,
]
def get_worker_type(name: str) -> Type[WorkerType]:
for worker_type in WORKER_TYPES:
matches_verbose_name = (worker_type.name == name)
matches_class_name = (worker_type.__name__.lower() == name.lower())
matches_listens_to = (worker_type.listens_to.strip('_').lower() == name.strip('_').lower())
if matches_verbose_name or matches_class_name or matches_listens_to:
return worker_type
raise Exception(f'Worker type not found: {name}')