ArchiveBox/archivebox/workers/semaphores.py

104 lines
3.6 KiB
Python

# import uuid
# from functools import wraps
# from django.db import connection, transaction
# from django.utils import timezone
# from huey.exceptions import TaskLockedException
# from archivebox.config import CONSTANTS
# class SqliteSemaphore:
# def __init__(self, db_path, table_name, name, value=1, timeout=None):
# self.db_path = db_path
# self.table_name = table_name
# self.name = name
# self.value = value
# self.timeout = timeout or 86400 # Set a max age for lock holders
# # Ensure the table exists
# with connection.cursor() as cursor:
# cursor.execute(f"""
# CREATE TABLE IF NOT EXISTS {self.table_name} (
# id TEXT PRIMARY KEY,
# name TEXT,
# timestamp DATETIME
# )
# """)
# def acquire(self, name=None):
# name = name or str(uuid.uuid4())
# now = timezone.now()
# expiration = now - timezone.timedelta(seconds=self.timeout)
# with transaction.atomic():
# # Remove expired locks
# with connection.cursor() as cursor:
# cursor.execute(f"""
# DELETE FROM {self.table_name}
# WHERE name = %s AND timestamp < %s
# """, [self.name, expiration])
# # Try to acquire the lock
# with connection.cursor() as cursor:
# cursor.execute(f"""
# INSERT INTO {self.table_name} (id, name, timestamp)
# SELECT %s, %s, %s
# WHERE (
# SELECT COUNT(*) FROM {self.table_name}
# WHERE name = %s
# ) < %s
# """, [name, self.name, now, self.name, self.value])
# if cursor.rowcount > 0:
# return name
# # If we couldn't acquire the lock, remove our attempted entry
# with connection.cursor() as cursor:
# cursor.execute(f"""
# DELETE FROM {self.table_name}
# WHERE id = %s AND name = %s
# """, [name, self.name])
# return None
# def release(self, name):
# with connection.cursor() as cursor:
# cursor.execute(f"""
# DELETE FROM {self.table_name}
# WHERE id = %s AND name = %s
# """, [name, self.name])
# return cursor.rowcount > 0
# LOCKS_DB_PATH = CONSTANTS.DATABASE_FILE.parent / 'locks.sqlite3'
# def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None):
# """
# Lock which can be acquired multiple times (default = 1).
# NOTE: no provisions are made for blocking, waiting, or notifying. This is
# just a lock which can be acquired a configurable number of times.
# Example:
# # Allow up to 3 workers to run this task concurrently. If the task is
# # locked, retry up to 2 times with a delay of 60s.
# @huey.task(retries=2, retry_delay=60)
# @lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3)
# def my_task():
# ...
# """
# sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout)
# def decorator(fn):
# @wraps(fn)
# def inner(*args, **kwargs):
# tid = sem.acquire()
# if tid is None:
# raise TaskLockedException(f'unable to acquire lock {lock_name}')
# try:
# return fn(*args, **kwargs)
# finally:
# sem.release(tid)
# return inner
# return decorator