chia-blockchain/chia/wallet/wallet_action_store.py

141 lines
4.5 KiB
Python

from typing import List, Optional
import aiosqlite
from chia.util.db_wrapper import DBWrapper
from chia.util.ints import uint32
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.wallet_action import WalletAction
class WalletActionStore:
"""
WalletActionStore keeps track of all wallet actions that require persistence.
Used by Colored coins, Atomic swaps, Rate Limited, and Authorized payee wallets
"""
db_connection: aiosqlite.Connection
cache_size: uint32
db_wrapper: DBWrapper
@classmethod
async def create(cls, db_wrapper: DBWrapper):
self = cls()
self.db_wrapper = db_wrapper
self.db_connection = db_wrapper.db
await self.db_connection.execute(
(
"CREATE TABLE IF NOT EXISTS action_queue("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
" name text,"
" wallet_id int,"
" wallet_type int,"
" wallet_callback text,"
" done int,"
" data text)"
)
)
await self.db_connection.execute("CREATE INDEX IF NOT EXISTS name on action_queue(name)")
await self.db_connection.execute("CREATE INDEX IF NOT EXISTS wallet_id on action_queue(wallet_id)")
await self.db_connection.execute("CREATE INDEX IF NOT EXISTS wallet_type on action_queue(wallet_type)")
await self.db_connection.commit()
return self
async def _clear_database(self):
cursor = await self.db_connection.execute("DELETE FROM action_queue")
await cursor.close()
await self.db_connection.commit()
async def get_wallet_action(self, id: int) -> Optional[WalletAction]:
"""
Return a wallet action by id
"""
cursor = await self.db_connection.execute("SELECT * from action_queue WHERE id=?", (id,))
row = await cursor.fetchone()
await cursor.close()
if row is None:
return None
return WalletAction(row[0], row[1], row[2], WalletType(row[3]), row[4], bool(row[5]), row[6])
async def create_action(
self, name: str, wallet_id: int, type: int, callback: str, done: bool, data: str, in_transaction: bool
):
"""
Creates Wallet Action
"""
if not in_transaction:
await self.db_wrapper.lock.acquire()
try:
cursor = await self.db_connection.execute(
"INSERT INTO action_queue VALUES(?, ?, ?, ?, ?, ?, ?)",
(None, name, wallet_id, type, callback, done, data),
)
await cursor.close()
finally:
if not in_transaction:
await self.db_connection.commit()
self.db_wrapper.lock.release()
async def action_done(self, action_id: int):
"""
Marks action as done
"""
action: Optional[WalletAction] = await self.get_wallet_action(action_id)
assert action is not None
async with self.db_wrapper.lock:
cursor = await self.db_connection.execute(
"Replace INTO action_queue VALUES(?, ?, ?, ?, ?, ?, ?)",
(
action.id,
action.name,
action.wallet_id,
action.type.value,
action.wallet_callback,
True,
action.data,
),
)
await cursor.close()
await self.db_connection.commit()
async def get_all_pending_actions(self) -> List[WalletAction]:
"""
Returns list of all pending action
"""
result: List[WalletAction] = []
cursor = await self.db_connection.execute("SELECT * from action_queue WHERE done=?", (0,))
rows = await cursor.fetchall()
await cursor.close()
if rows is None:
return result
for row in rows:
action = WalletAction(row[0], row[1], row[2], WalletType(row[3]), row[4], bool(row[5]), row[6])
result.append(action)
return result
async def get_action_by_id(self, id) -> Optional[WalletAction]:
"""
Return a wallet action by id
"""
cursor = await self.db_connection.execute("SELECT * from action_queue WHERE id=?", (id,))
row = await cursor.fetchone()
await cursor.close()
if row is None:
return None
return WalletAction(row[0], row[1], row[2], WalletType(row[3]), row[4], bool(row[5]), row[6])