458 lines
19 KiB
Python
458 lines
19 KiB
Python
__package__ = 'archivebox.workers'
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import json
|
|
|
|
from typing import ClassVar, Iterable, Type
|
|
from pathlib import Path
|
|
|
|
from rich import print
|
|
|
|
from django.db import transaction
|
|
from django.db.models import QuerySet
|
|
from django.utils import timezone
|
|
from django.utils.functional import classproperty # type: ignore
|
|
|
|
from crawls.models import Crawl
|
|
from core.models import Snapshot, ArchiveResult
|
|
|
|
from workers.models import Event, Process, EventDict
|
|
|
|
|
|
class WorkerType:
|
|
# static class attributes
|
|
name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
|
|
|
|
listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
|
|
outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
|
|
|
|
poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events
|
|
|
|
@classproperty
|
|
def event_queue(cls) -> QuerySet[Event]:
|
|
return Event.objects.filter(name__startswith=cls.listens_to)
|
|
|
|
@classmethod
|
|
def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
|
|
cmd = ['archivebox', 'worker', cls.name]
|
|
if exit_on_idle:
|
|
cmd.append('--exit-on-idle')
|
|
if wait_for_first_event:
|
|
cmd.append('--wait-for-first-event')
|
|
return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
|
|
|
|
@classproperty
|
|
def processes(cls) -> QuerySet[Process]:
|
|
return Process.objects.filter(actor_type=cls.name)
|
|
|
|
@classmethod
|
|
def run(cls, wait_for_first_event=False, exit_on_idle=True):
|
|
|
|
if wait_for_first_event:
|
|
event = cls.event_queue.get_next_unclaimed()
|
|
while not event:
|
|
time.sleep(cls.poll_interval)
|
|
event = cls.event_queue.get_next_unclaimed()
|
|
|
|
while True:
|
|
output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
|
|
yield from output_events
|
|
if not output_events:
|
|
if exit_on_idle:
|
|
break
|
|
else:
|
|
time.sleep(cls.poll_interval)
|
|
|
|
@classmethod
|
|
def process_next_event(cls) -> Iterable[EventDict]:
|
|
event = cls.event_queue.get_next_unclaimed()
|
|
output_events = []
|
|
|
|
if not event:
|
|
return []
|
|
|
|
cls.mark_event_claimed(event)
|
|
print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
|
|
try:
|
|
for output_event in cls.receive(event):
|
|
output_events.append(output_event)
|
|
yield output_event
|
|
cls.mark_event_succeeded(event, output_events=output_events)
|
|
except BaseException as e:
|
|
cls.mark_event_failed(event, output_events=output_events, error=e)
|
|
|
|
@classmethod
|
|
def process_idle_tick(cls) -> Iterable[EventDict]:
|
|
# reset the idle event to be claimed by the current process
|
|
event, _created = Event.objects.update_or_create(
|
|
name=f'{cls.listens_to}IDLE',
|
|
emitted_by=Process.current(),
|
|
defaults={
|
|
'deliver_at': timezone.now(),
|
|
'claimed_proc': None,
|
|
'claimed_at': None,
|
|
'finished_at': None,
|
|
'error': None,
|
|
'parent': None,
|
|
},
|
|
)
|
|
|
|
# then process it like any other event
|
|
yield from cls.process_next_event()
|
|
|
|
@classmethod
|
|
def receive(cls, event: Event) -> Iterable[EventDict]:
|
|
handler_method = getattr(cls, f'on_{event.name}', None)
|
|
if handler_method:
|
|
yield from handler_method(event)
|
|
else:
|
|
raise Exception(f'No handler method for event: {event.name}')
|
|
|
|
@staticmethod
|
|
def on_IDLE() -> Iterable[EventDict]:
|
|
return []
|
|
|
|
@staticmethod
|
|
def mark_event_claimed(event: Event):
|
|
proc = Process.current()
|
|
|
|
with transaction.atomic():
|
|
claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
|
|
event.refresh_from_db()
|
|
if not claimed:
|
|
raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
|
|
|
|
print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
|
|
|
|
# process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
|
|
|
|
@staticmethod
|
|
def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
|
|
event.refresh_from_db()
|
|
assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
|
|
assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
|
|
|
|
with transaction.atomic():
|
|
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
|
|
event.refresh_from_db()
|
|
if not updated:
|
|
raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
|
|
|
|
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
|
|
|
|
# dispatch any output events
|
|
for output_event in output_events:
|
|
Event.dispatch(event=output_event, parent=event)
|
|
|
|
# trigger any callback events
|
|
if event.on_success:
|
|
Event.dispatch(event=event.on_success, parent=event)
|
|
|
|
@staticmethod
|
|
def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
|
|
event.refresh_from_db()
|
|
assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
|
|
assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
|
|
|
|
with transaction.atomic():
|
|
updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
|
|
event.refresh_from_db()
|
|
if not updated:
|
|
raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
|
|
|
|
# process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
|
|
# if not process_updated:
|
|
# raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
|
|
|
|
|
|
# add dedicated error event to the output events
|
|
if not event.name.endswith('_ERROR'):
|
|
output_events = [
|
|
*output_events,
|
|
{'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
|
|
]
|
|
|
|
# dispatch any output events
|
|
for output_event in output_events:
|
|
Event.dispatch(event=output_event, parent=event)
|
|
|
|
# trigger any callback events
|
|
if event.on_failure:
|
|
Event.dispatch(event=event.on_failure, parent=event)
|
|
|
|
|
|
|
|
|
|
class OrchestratorWorker(WorkerType):
|
|
name = 'orchestrator'
|
|
listens_to = 'PROC_'
|
|
outputs = ['PROC_']
|
|
|
|
@staticmethod
|
|
def on_PROC_IDLE() -> Iterable[EventDict]:
|
|
# look through all Processes that are not yet launched and launch them
|
|
to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
|
|
if not to_launch:
|
|
return []
|
|
|
|
yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
|
|
|
|
@staticmethod
|
|
def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
|
|
process = Process.create_and_fork(**event.kwargs)
|
|
yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
|
|
|
|
@staticmethod
|
|
def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
|
|
process = Process.objects.get(id=event.process_id)
|
|
process.kill()
|
|
yield {'name': 'PROC_KILLED', 'process_id': process.id}
|
|
|
|
@staticmethod
|
|
def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
|
|
process = Process.objects.get(id=event.process_id)
|
|
process.kill()
|
|
yield {'name': 'PROC_KILLED', 'process_id': process.id}
|
|
|
|
|
|
class FileSystemWorker(WorkerType):
|
|
name = 'filesystem'
|
|
listens_to = 'FS_'
|
|
outputs = ['FS_']
|
|
|
|
@staticmethod
|
|
def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
|
|
# check for tmp files that can be deleted
|
|
for tmp_file in Path('/tmp').glob('archivebox/*'):
|
|
yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
|
|
|
|
@staticmethod
|
|
def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
|
|
with open(event.path, 'w') as f:
|
|
f.write(event.content)
|
|
yield {'name': 'FS_CHANGED', 'path': event.path}
|
|
|
|
@staticmethod
|
|
def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
|
|
with open(event.path, 'a') as f:
|
|
f.write(event.content)
|
|
yield {'name': 'FS_CHANGED', 'path': event.path}
|
|
|
|
@staticmethod
|
|
def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
|
|
os.remove(event.path)
|
|
yield {'name': 'FS_CHANGED', 'path': event.path}
|
|
|
|
@staticmethod
|
|
def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
|
|
os.system(f'rsync -av {event.src} {event.dst}')
|
|
yield {'name': 'FS_CHANGED', 'path': event.dst}
|
|
|
|
|
|
class CrawlWorker(WorkerType):
|
|
name = 'crawl'
|
|
listens_to = 'CRAWL_'
|
|
outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
|
|
|
|
@staticmethod
|
|
def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
|
|
# check for any stale crawls that can be started or sealed
|
|
stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
|
|
if not stale_crawl:
|
|
return []
|
|
|
|
if stale_crawl.can_start():
|
|
yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
|
|
|
|
elif stale_crawl.can_seal():
|
|
yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
|
|
|
|
@staticmethod
|
|
def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
|
|
crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
|
|
if created:
|
|
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
@staticmethod
|
|
def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
|
|
crawl = Crawl.objects.get(id=event.pop('crawl_id'))
|
|
diff = {
|
|
key: val
|
|
for key, val in event.items()
|
|
if getattr(crawl, key) != val
|
|
}
|
|
if diff:
|
|
crawl.update(**diff)
|
|
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
@staticmethod
|
|
def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
|
|
crawl = Crawl.objects.get(id=event.crawl_id)
|
|
yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
|
|
|
|
|
|
@staticmethod
|
|
def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
|
|
crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
|
|
if not crawl:
|
|
return
|
|
crawl.status = Crawl.StatusChoices.SEALED
|
|
crawl.save()
|
|
yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
|
|
yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
@staticmethod
|
|
def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
|
|
# create root snapshot
|
|
crawl = Crawl.objects.get(id=event.crawl_id)
|
|
new_snapshot_id = uuid.uuid4()
|
|
yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
|
|
yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
|
|
yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
|
|
|
|
|
|
class SnapshotWorker(WorkerType):
|
|
name = 'snapshot'
|
|
listens_to = 'SNAPSHOT_'
|
|
outputs = ['SNAPSHOT_', 'FS_']
|
|
|
|
@staticmethod
|
|
def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
|
|
# check for any snapshots that can be started or sealed
|
|
snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
|
|
if not snapshot:
|
|
return []
|
|
|
|
if snapshot.can_start():
|
|
yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
|
|
elif snapshot.can_seal():
|
|
yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
|
|
|
|
@staticmethod
|
|
def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
|
|
snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
|
|
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
|
|
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
|
|
|
|
@staticmethod
|
|
def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
|
|
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
|
|
assert snapshot.can_seal()
|
|
snapshot.status = Snapshot.StatusChoices.SEALED
|
|
snapshot.save()
|
|
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
|
|
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
|
|
|
|
@staticmethod
|
|
def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
|
|
snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
|
|
assert snapshot.can_start()
|
|
|
|
# create pending archiveresults for each extractor
|
|
for extractor in snapshot.get_extractors():
|
|
new_archiveresult_id = uuid.uuid4()
|
|
yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
|
|
yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
|
|
|
|
snapshot.status = Snapshot.StatusChoices.STARTED
|
|
snapshot.save()
|
|
yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
|
|
yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
|
|
|
|
|
|
|
|
class ArchiveResultWorker(WorkerType):
|
|
name = 'archiveresult'
|
|
listens_to = 'ARCHIVERESULT_'
|
|
outputs = ['ARCHIVERESULT_', 'FS_']
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
|
|
archiveresult = ArchiveResult.objects.get(id=event.id)
|
|
diff = {
|
|
key: val
|
|
for key, val in event.items()
|
|
if getattr(archiveresult, key) != val
|
|
}
|
|
if diff:
|
|
archiveresult.update(**diff)
|
|
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
|
|
archiveresult = ArchiveResult.objects.get(id=event.id)
|
|
yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
|
|
archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
|
|
if created:
|
|
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
|
|
else:
|
|
diff = {
|
|
key: val
|
|
for key, val in event.items()
|
|
if getattr(archiveresult, key) != val
|
|
}
|
|
assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
|
|
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
|
|
assert archiveresult.can_seal()
|
|
yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
|
|
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
|
|
|
|
yield {
|
|
'name': 'SHELL_EXEC',
|
|
'cmd': archiveresult.EXTRACTOR.get_cmd(),
|
|
'cwd': archiveresult.OUTPUT_DIR,
|
|
'on_exit': {
|
|
'name': 'ARCHIVERESULT_SEAL',
|
|
'id': archiveresult.id,
|
|
},
|
|
}
|
|
|
|
archiveresult.status = ArchiveResult.StatusChoices.STARTED
|
|
archiveresult.save()
|
|
yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
|
|
yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
|
|
|
|
@staticmethod
|
|
def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
|
|
stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
|
|
if not stale_archiveresult:
|
|
return []
|
|
if stale_archiveresult.can_start():
|
|
yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
|
|
if stale_archiveresult.can_seal():
|
|
yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
|
|
|
|
|
|
WORKER_TYPES = [
|
|
OrchestratorWorker,
|
|
FileSystemWorker,
|
|
CrawlWorker,
|
|
SnapshotWorker,
|
|
ArchiveResultWorker,
|
|
]
|
|
|
|
def get_worker_type(name: str) -> Type[WorkerType]:
|
|
for worker_type in WORKER_TYPES:
|
|
matches_verbose_name = (worker_type.name == name)
|
|
matches_class_name = (worker_type.__name__.lower() == name.lower())
|
|
matches_listens_to = (worker_type.listens_to.strip('_').lower() == name.strip('_').lower())
|
|
if matches_verbose_name or matches_class_name or matches_listens_to:
|
|
return worker_type
|
|
raise Exception(f'Worker type not found: {name}')
|