1030 lines
36 KiB
Python
1030 lines
36 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, TextIO, Tuple, cast
|
|
|
|
from websockets import ConnectionClosedOK, WebSocketException, WebSocketServerProtocol, serve
|
|
|
|
from chia.cmds.init_funcs import chia_init
|
|
from chia.daemon.windows_signal import kill
|
|
from chia.server.server import ssl_context_for_root, ssl_context_for_server
|
|
from chia.ssl.create_ssl import get_mozilla_ca_crt
|
|
from chia.util.chia_logging import initialize_logging
|
|
from chia.util.config import load_config
|
|
from chia.util.json_util import dict_to_json_str
|
|
from chia.util.path import mkdir
|
|
from chia.util.service_groups import validate_service
|
|
from chia.util.setproctitle import setproctitle
|
|
from chia.util.ws_message import WsRpcMessage, create_payload, format_response
|
|
|
|
io_pool_exc = ThreadPoolExecutor()
|
|
|
|
try:
|
|
from aiohttp import ClientSession, web
|
|
except ModuleNotFoundError:
|
|
print("Error: Make sure to run . ./activate from the project folder before starting Chia.")
|
|
quit()
|
|
|
|
try:
|
|
import fcntl
|
|
|
|
has_fcntl = True
|
|
except ImportError:
|
|
has_fcntl = False
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
service_plotter = "chia plots create"
|
|
|
|
|
|
async def fetch(url: str):
|
|
async with ClientSession() as session:
|
|
try:
|
|
mozilla_root = get_mozilla_ca_crt()
|
|
ssl_context = ssl_context_for_root(mozilla_root)
|
|
response = await session.get(url, ssl=ssl_context)
|
|
if not response.ok:
|
|
log.warning("Response not OK.")
|
|
return None
|
|
return await response.text()
|
|
except Exception as e:
|
|
log.error(f"Exception while fetching {url}, exception: {e}")
|
|
return None
|
|
|
|
|
|
class PlotState(str, Enum):
|
|
SUBMITTED = "SUBMITTED"
|
|
RUNNING = "RUNNING"
|
|
REMOVING = "REMOVING"
|
|
FINISHED = "FINISHED"
|
|
|
|
|
|
class PlotEvent(str, Enum):
|
|
LOG_CHANGED = "log_changed"
|
|
STATE_CHANGED = "state_changed"
|
|
|
|
|
|
# determine if application is a script file or frozen exe
|
|
if getattr(sys, "frozen", False):
|
|
name_map = {
|
|
"chia": "chia",
|
|
"chia_wallet": "start_wallet",
|
|
"chia_full_node": "start_full_node",
|
|
"chia_harvester": "start_harvester",
|
|
"chia_farmer": "start_farmer",
|
|
"chia_introducer": "start_introducer",
|
|
"chia_timelord": "start_timelord",
|
|
"chia_timelord_launcher": "timelord_launcher",
|
|
"chia_full_node_simulator": "start_simulator",
|
|
}
|
|
|
|
def executable_for_service(service_name: str) -> str:
|
|
application_path = os.path.dirname(sys.executable)
|
|
if sys.platform == "win32" or sys.platform == "cygwin":
|
|
executable = name_map[service_name]
|
|
path = f"{application_path}/{executable}.exe"
|
|
return path
|
|
else:
|
|
path = f"{application_path}/{name_map[service_name]}"
|
|
return path
|
|
|
|
|
|
else:
|
|
application_path = os.path.dirname(__file__)
|
|
|
|
def executable_for_service(service_name: str) -> str:
|
|
return service_name
|
|
|
|
|
|
async def ping() -> Dict[str, Any]:
|
|
response = {"success": True, "value": "pong"}
|
|
return response
|
|
|
|
|
|
class WebSocketServer:
|
|
def __init__(self, root_path: Path, ca_crt_path: Path, ca_key_path: Path, crt_path: Path, key_path: Path):
|
|
self.root_path = root_path
|
|
self.log = log
|
|
self.services: Dict = dict()
|
|
self.plots_queue: List[Dict] = []
|
|
self.connections: Dict[str, List[WebSocketServerProtocol]] = dict() # service_name : [WebSocket]
|
|
self.remote_address_map: Dict[WebSocketServerProtocol, str] = dict() # socket: service_name
|
|
self.ping_job: Optional[asyncio.Task] = None
|
|
self.net_config = load_config(root_path, "config.yaml")
|
|
self.self_hostname = self.net_config["self_hostname"]
|
|
self.daemon_port = self.net_config["daemon_port"]
|
|
self.websocket_server = None
|
|
self.ssl_context = ssl_context_for_server(ca_crt_path, ca_key_path, crt_path, key_path)
|
|
self.shut_down = False
|
|
|
|
async def start(self):
|
|
self.log.info("Starting Daemon Server")
|
|
|
|
def master_close_cb():
|
|
asyncio.create_task(self.stop())
|
|
|
|
try:
|
|
asyncio.get_running_loop().add_signal_handler(signal.SIGINT, master_close_cb)
|
|
asyncio.get_running_loop().add_signal_handler(signal.SIGTERM, master_close_cb)
|
|
except NotImplementedError:
|
|
self.log.info("Not implemented")
|
|
|
|
self.websocket_server = await serve(
|
|
self.safe_handle,
|
|
self.self_hostname,
|
|
self.daemon_port,
|
|
max_size=50 * 1000 * 1000,
|
|
ping_interval=500,
|
|
ping_timeout=300,
|
|
ssl=self.ssl_context,
|
|
)
|
|
self.log.info("Waiting Daemon WebSocketServer closure")
|
|
|
|
def cancel_task_safe(self, task: Optional[asyncio.Task]):
|
|
if task is not None:
|
|
try:
|
|
task.cancel()
|
|
except Exception as e:
|
|
self.log.error(f"Error while canceling task.{e} {task}")
|
|
|
|
async def stop(self) -> Dict[str, Any]:
|
|
self.shut_down = True
|
|
self.cancel_task_safe(self.ping_job)
|
|
await self.exit()
|
|
if self.websocket_server is not None:
|
|
self.websocket_server.close()
|
|
return {"success": True}
|
|
|
|
async def safe_handle(self, websocket: WebSocketServerProtocol, path: str):
|
|
service_name = ""
|
|
try:
|
|
async for message in websocket:
|
|
try:
|
|
decoded = json.loads(message)
|
|
if "data" not in decoded:
|
|
decoded["data"] = {}
|
|
response, sockets_to_use = await self.handle_message(websocket, decoded)
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
self.log.error(f"Error while handling message: {tb}")
|
|
error = {"success": False, "error": f"{e}"}
|
|
response = format_response(decoded, error)
|
|
sockets_to_use = []
|
|
if len(sockets_to_use) > 0:
|
|
for socket in sockets_to_use:
|
|
try:
|
|
await socket.send(response)
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
self.log.error(f"Unexpected exception trying to send to websocket: {e} {tb}")
|
|
self.remove_connection(socket)
|
|
await socket.close()
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
service_name = "Unknown"
|
|
if websocket in self.remote_address_map:
|
|
service_name = self.remote_address_map[websocket]
|
|
if isinstance(e, ConnectionClosedOK):
|
|
self.log.info(f"ConnectionClosedOk. Closing websocket with {service_name} {e}")
|
|
elif isinstance(e, WebSocketException):
|
|
self.log.info(f"Websocket exception. Closing websocket with {service_name} {e} {tb}")
|
|
else:
|
|
self.log.error(f"Unexpected exception in websocket: {e} {tb}")
|
|
finally:
|
|
self.remove_connection(websocket)
|
|
await websocket.close()
|
|
|
|
def remove_connection(self, websocket: WebSocketServerProtocol):
|
|
service_name = None
|
|
if websocket in self.remote_address_map:
|
|
service_name = self.remote_address_map[websocket]
|
|
self.remote_address_map.pop(websocket)
|
|
if service_name in self.connections:
|
|
after_removal = []
|
|
for connection in self.connections[service_name]:
|
|
if connection == websocket:
|
|
continue
|
|
else:
|
|
after_removal.append(connection)
|
|
self.connections[service_name] = after_removal
|
|
|
|
async def ping_task(self) -> None:
|
|
restart = True
|
|
await asyncio.sleep(30)
|
|
for remote_address, service_name in self.remote_address_map.items():
|
|
if service_name in self.connections:
|
|
sockets = self.connections[service_name]
|
|
for socket in sockets:
|
|
if socket.remote_address[1] == remote_address:
|
|
try:
|
|
self.log.info(f"About to ping: {service_name}")
|
|
await socket.ping()
|
|
except asyncio.CancelledError:
|
|
self.log.info("Ping task received Cancel")
|
|
restart = False
|
|
break
|
|
except Exception as e:
|
|
self.log.info(f"Ping error: {e}")
|
|
self.log.warning("Ping failed, connection closed.")
|
|
self.remove_connection(socket)
|
|
await socket.close()
|
|
if restart is True:
|
|
self.ping_job = asyncio.create_task(self.ping_task())
|
|
|
|
async def handle_message(
|
|
self, websocket: WebSocketServerProtocol, message: WsRpcMessage
|
|
) -> Tuple[Optional[str], List[Any]]:
|
|
"""
|
|
This function gets called when new message is received via websocket.
|
|
"""
|
|
|
|
command = message["command"]
|
|
destination = message["destination"]
|
|
if destination != "daemon":
|
|
destination = message["destination"]
|
|
if destination in self.connections:
|
|
sockets = self.connections[destination]
|
|
return dict_to_json_str(message), sockets
|
|
|
|
return None, []
|
|
|
|
data = message["data"]
|
|
commands_with_data = [
|
|
"start_service",
|
|
"start_plotting",
|
|
"stop_plotting",
|
|
"stop_service",
|
|
"is_running",
|
|
"register_service",
|
|
]
|
|
if len(data) == 0 and command in commands_with_data:
|
|
response = {"success": False, "error": f'{command} requires "data"'}
|
|
elif command == "ping":
|
|
response = await ping()
|
|
elif command == "start_service":
|
|
response = await self.start_service(cast(Dict[str, Any], data))
|
|
elif command == "start_plotting":
|
|
response = await self.start_plotting(cast(Dict[str, Any], data))
|
|
elif command == "stop_plotting":
|
|
response = await self.stop_plotting(cast(Dict[str, Any], data))
|
|
elif command == "stop_service":
|
|
response = await self.stop_service(cast(Dict[str, Any], data))
|
|
elif command == "is_running":
|
|
response = await self.is_running(cast(Dict[str, Any], data))
|
|
elif command == "exit":
|
|
response = await self.stop()
|
|
elif command == "register_service":
|
|
response = await self.register_service(websocket, cast(Dict[str, Any], data))
|
|
elif command == "get_status":
|
|
response = self.get_status()
|
|
else:
|
|
self.log.error(f"UK>> {message}")
|
|
response = {"success": False, "error": f"unknown_command {command}"}
|
|
|
|
full_response = format_response(message, response)
|
|
return full_response, [websocket]
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
response = {"success": True, "genesis_initialized": True}
|
|
return response
|
|
|
|
def plot_queue_to_payload(self, plot_queue_item, send_full_log: bool) -> Dict[str, Any]:
|
|
error = plot_queue_item.get("error")
|
|
has_error = error is not None
|
|
|
|
item = {
|
|
"id": plot_queue_item["id"],
|
|
"queue": plot_queue_item["queue"],
|
|
"size": plot_queue_item["size"],
|
|
"parallel": plot_queue_item["parallel"],
|
|
"delay": plot_queue_item["delay"],
|
|
"state": plot_queue_item["state"],
|
|
"error": str(error) if has_error else None,
|
|
"deleted": plot_queue_item["deleted"],
|
|
"log_new": plot_queue_item.get("log_new"),
|
|
}
|
|
|
|
if send_full_log:
|
|
item["log"] = plot_queue_item.get("log")
|
|
return item
|
|
|
|
def prepare_plot_state_message(self, state: PlotEvent, id):
|
|
message = {
|
|
"state": state,
|
|
"queue": self.extract_plot_queue(id),
|
|
}
|
|
return message
|
|
|
|
def extract_plot_queue(self, id=None) -> List[Dict]:
|
|
send_full_log = id is None
|
|
data = []
|
|
for item in self.plots_queue:
|
|
if id is None or item["id"] == id:
|
|
data.append(self.plot_queue_to_payload(item, send_full_log))
|
|
return data
|
|
|
|
async def _state_changed(self, service: str, message: Dict[str, Any]):
|
|
"""If id is None, send the whole state queue"""
|
|
if service not in self.connections:
|
|
return None
|
|
|
|
websockets = self.connections[service]
|
|
|
|
if message is None:
|
|
return None
|
|
|
|
response = create_payload("state_changed", message, service, "wallet_ui")
|
|
|
|
for websocket in websockets:
|
|
try:
|
|
await websocket.send(response)
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
self.log.error(f"Unexpected exception trying to send to websocket: {e} {tb}")
|
|
websockets.remove(websocket)
|
|
await websocket.close()
|
|
|
|
def state_changed(self, service: str, message: Dict[str, Any]):
|
|
asyncio.create_task(self._state_changed(service, message))
|
|
|
|
async def _watch_file_changes(self, config, fp: TextIO, loop: asyncio.AbstractEventLoop):
|
|
id = config["id"]
|
|
final_words = ["Renamed final file"]
|
|
|
|
while True:
|
|
new_data = await loop.run_in_executor(io_pool_exc, fp.readline)
|
|
|
|
if config["state"] is not PlotState.RUNNING:
|
|
return None
|
|
|
|
if new_data not in (None, ""):
|
|
config["log"] = new_data if config["log"] is None else config["log"] + new_data
|
|
config["log_new"] = new_data
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.LOG_CHANGED, id))
|
|
|
|
if new_data:
|
|
for word in final_words:
|
|
if word in new_data:
|
|
return None
|
|
else:
|
|
time.sleep(0.5)
|
|
|
|
async def _track_plotting_progress(self, config, loop: asyncio.AbstractEventLoop):
|
|
file_path = config["out_file"]
|
|
with open(file_path, "r") as fp:
|
|
await self._watch_file_changes(config, fp, loop)
|
|
|
|
def _build_plotting_command_args(self, request: Any, ignoreCount: bool) -> List[str]:
|
|
service_name = request["service"]
|
|
|
|
k = request["k"]
|
|
n = 1 if ignoreCount else request["n"]
|
|
t = request["t"]
|
|
t2 = request["t2"]
|
|
d = request["d"]
|
|
b = request["b"]
|
|
u = request["u"]
|
|
r = request["r"]
|
|
f = request.get("f")
|
|
p = request.get("p")
|
|
c = request.get("c")
|
|
a = request.get("a")
|
|
e = request["e"]
|
|
x = request["x"]
|
|
override_k = request["overrideK"]
|
|
|
|
command_args: List[str] = []
|
|
command_args += service_name.split(" ")
|
|
command_args.append(f"-k{k}")
|
|
command_args.append(f"-n{n}")
|
|
command_args.append(f"-t{t}")
|
|
command_args.append(f"-2{t2}")
|
|
command_args.append(f"-d{d}")
|
|
command_args.append(f"-b{b}")
|
|
command_args.append(f"-u{u}")
|
|
command_args.append(f"-r{r}")
|
|
|
|
if a is not None:
|
|
command_args.append(f"-a{a}")
|
|
|
|
if f is not None:
|
|
command_args.append(f"-f{f}")
|
|
|
|
if p is not None:
|
|
command_args.append(f"-p{p}")
|
|
|
|
if c is not None:
|
|
command_args.append(f"-c{c}")
|
|
|
|
if e is True:
|
|
command_args.append("-e")
|
|
|
|
if x is True:
|
|
command_args.append("-x")
|
|
|
|
if override_k is True:
|
|
command_args.append("--override-k")
|
|
|
|
self.log.debug(f"command_args are {command_args}")
|
|
|
|
return command_args
|
|
|
|
def _is_serial_plotting_running(self, queue: str = "default") -> bool:
|
|
response = False
|
|
for item in self.plots_queue:
|
|
if item["queue"] == queue and item["parallel"] is False and item["state"] is PlotState.RUNNING:
|
|
response = True
|
|
return response
|
|
|
|
def _get_plots_queue_item(self, id: str):
|
|
config = next(item for item in self.plots_queue if item["id"] == id)
|
|
return config
|
|
|
|
def _run_next_serial_plotting(self, loop: asyncio.AbstractEventLoop, queue: str = "default"):
|
|
next_plot_id = None
|
|
|
|
if self._is_serial_plotting_running(queue) is True:
|
|
return None
|
|
|
|
for item in self.plots_queue:
|
|
if item["queue"] == queue and item["state"] is PlotState.SUBMITTED and item["parallel"] is False:
|
|
next_plot_id = item["id"]
|
|
|
|
if next_plot_id is not None:
|
|
loop.create_task(self._start_plotting(next_plot_id, loop, queue))
|
|
|
|
async def _start_plotting(self, id: str, loop: asyncio.AbstractEventLoop, queue: str = "default"):
|
|
current_process = None
|
|
try:
|
|
log.info(f"Starting plotting with ID {id}")
|
|
config = self._get_plots_queue_item(id)
|
|
|
|
if config is None:
|
|
raise Exception(f"Plot queue config with ID {id} does not exist")
|
|
|
|
state = config["state"]
|
|
if state is not PlotState.SUBMITTED:
|
|
raise Exception(f"Plot with ID {id} has no state submitted")
|
|
|
|
id = config["id"]
|
|
delay = config["delay"]
|
|
await asyncio.sleep(delay)
|
|
|
|
if config["state"] is not PlotState.SUBMITTED:
|
|
return None
|
|
|
|
service_name = config["service_name"]
|
|
command_args = config["command_args"]
|
|
self.log.debug(f"command_args before launch_plotter are {command_args}")
|
|
self.log.debug(f"self.root_path before launch_plotter is {self.root_path}")
|
|
process, pid_path = launch_plotter(self.root_path, service_name, command_args, id)
|
|
|
|
current_process = process
|
|
|
|
config["state"] = PlotState.RUNNING
|
|
config["out_file"] = plotter_log_path(self.root_path, id).absolute()
|
|
config["process"] = process
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
|
|
if service_name not in self.services:
|
|
self.services[service_name] = []
|
|
|
|
self.services[service_name].append(process)
|
|
|
|
await self._track_plotting_progress(config, loop)
|
|
|
|
config["state"] = PlotState.FINISHED
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
|
|
except (subprocess.SubprocessError, IOError):
|
|
log.exception(f"problem starting {service_name}")
|
|
error = Exception("Start plotting failed")
|
|
config["state"] = PlotState.FINISHED
|
|
config["error"] = error
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
raise error
|
|
|
|
finally:
|
|
if current_process is not None:
|
|
self.services[service_name].remove(current_process)
|
|
current_process.wait() # prevent zombies
|
|
self._run_next_serial_plotting(loop, queue)
|
|
|
|
async def start_plotting(self, request: Dict[str, Any]):
|
|
service_name = request["service"]
|
|
|
|
delay = request.get("delay", 0)
|
|
parallel = request.get("parallel", False)
|
|
size = request.get("k")
|
|
count = request.get("n", 1)
|
|
queue = request.get("queue", "default")
|
|
|
|
if ("p" in request) and ("c" in request):
|
|
response = {
|
|
"success": False,
|
|
"service_name": service_name,
|
|
"error": "Choose one of pool_contract_address and pool_public_key",
|
|
}
|
|
return response
|
|
|
|
for k in range(count):
|
|
id = str(uuid.uuid4())
|
|
config = {
|
|
"id": id,
|
|
"size": size,
|
|
"queue": queue,
|
|
"service_name": service_name,
|
|
"command_args": self._build_plotting_command_args(request, True),
|
|
"parallel": parallel,
|
|
"delay": delay * k if parallel is True else delay,
|
|
"state": PlotState.SUBMITTED,
|
|
"deleted": False,
|
|
"error": None,
|
|
"log": None,
|
|
"process": None,
|
|
}
|
|
|
|
self.plots_queue.append(config)
|
|
|
|
# notify GUI about new plot queue item
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
|
|
# only first item can start when user selected serial plotting
|
|
can_start_serial_plotting = k == 0 and self._is_serial_plotting_running(queue) is False
|
|
|
|
if parallel is True or can_start_serial_plotting:
|
|
log.info(f"Plotting will start in {config['delay']} seconds")
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(self._start_plotting(id, loop, queue))
|
|
else:
|
|
log.info("Plotting will start automatically when previous plotting finish")
|
|
|
|
response = {
|
|
"success": True,
|
|
"service_name": service_name,
|
|
}
|
|
|
|
return response
|
|
|
|
async def stop_plotting(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
id = request["id"]
|
|
config = self._get_plots_queue_item(id)
|
|
if config is None:
|
|
return {"success": False}
|
|
|
|
id = config["id"]
|
|
state = config["state"]
|
|
process = config["process"]
|
|
queue = config["queue"]
|
|
|
|
if config["state"] is PlotState.REMOVING:
|
|
return {"success": False}
|
|
|
|
try:
|
|
run_next = False
|
|
if process is not None and state == PlotState.RUNNING:
|
|
run_next = True
|
|
config["state"] = PlotState.REMOVING
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
await kill_process(process, self.root_path, service_plotter, id)
|
|
|
|
config["state"] = PlotState.FINISHED
|
|
config["deleted"] = True
|
|
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
|
|
self.plots_queue.remove(config)
|
|
|
|
if run_next:
|
|
loop = asyncio.get_event_loop()
|
|
self._run_next_serial_plotting(loop, queue)
|
|
|
|
return {"success": True}
|
|
except Exception as e:
|
|
log.error(f"Error during killing the plot process: {e}")
|
|
config["state"] = PlotState.FINISHED
|
|
config["error"] = str(e)
|
|
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
|
|
return {"success": False}
|
|
|
|
async def start_service(self, request: Dict[str, Any]):
|
|
service_command = request["service"]
|
|
|
|
error = None
|
|
success = False
|
|
testing = False
|
|
if "testing" in request:
|
|
testing = request["testing"]
|
|
|
|
if not validate_service(service_command):
|
|
error = "unknown service"
|
|
|
|
if service_command in self.services:
|
|
service = self.services[service_command]
|
|
r = service is not None and service.poll() is None
|
|
if r is False:
|
|
self.services.pop(service_command)
|
|
error = None
|
|
else:
|
|
error = f"Service {service_command} already running"
|
|
|
|
if error is None:
|
|
try:
|
|
exe_command = service_command
|
|
if testing is True:
|
|
exe_command = f"{service_command} --testing=true"
|
|
process, pid_path = launch_service(self.root_path, exe_command)
|
|
self.services[service_command] = process
|
|
success = True
|
|
except (subprocess.SubprocessError, IOError):
|
|
log.exception(f"problem starting {service_command}")
|
|
error = "start failed"
|
|
|
|
response = {"success": success, "service": service_command, "error": error}
|
|
return response
|
|
|
|
async def stop_service(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
service_name = request["service"]
|
|
result = await kill_service(self.root_path, self.services, service_name)
|
|
response = {"success": result, "service_name": service_name}
|
|
return response
|
|
|
|
async def is_running(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
service_name = request["service"]
|
|
|
|
if service_name == service_plotter:
|
|
processes = self.services.get(service_name)
|
|
is_running = processes is not None and len(processes) > 0
|
|
response = {
|
|
"success": True,
|
|
"service_name": service_name,
|
|
"is_running": is_running,
|
|
}
|
|
else:
|
|
process = self.services.get(service_name)
|
|
is_running = process is not None and process.poll() is None
|
|
response = {
|
|
"success": True,
|
|
"service_name": service_name,
|
|
"is_running": is_running,
|
|
}
|
|
|
|
return response
|
|
|
|
async def exit(self) -> Dict[str, Any]:
|
|
jobs = []
|
|
for k in self.services.keys():
|
|
jobs.append(kill_service(self.root_path, self.services, k))
|
|
if jobs:
|
|
await asyncio.wait(jobs)
|
|
self.services.clear()
|
|
|
|
# TODO: fix this hack
|
|
asyncio.get_event_loop().call_later(5, lambda *args: sys.exit(0))
|
|
log.info("chia daemon exiting in 5 seconds")
|
|
|
|
response = {"success": True}
|
|
return response
|
|
|
|
async def register_service(self, websocket: WebSocketServerProtocol, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
self.log.info(f"Register service {request}")
|
|
service = request["service"]
|
|
if service not in self.connections:
|
|
self.connections[service] = []
|
|
self.connections[service].append(websocket)
|
|
|
|
response: Dict[str, Any] = {"success": True}
|
|
if service == service_plotter:
|
|
response = {
|
|
"success": True,
|
|
"service": service,
|
|
"queue": self.extract_plot_queue(),
|
|
}
|
|
else:
|
|
self.remote_address_map[websocket] = service
|
|
if self.ping_job is None:
|
|
self.ping_job = asyncio.create_task(self.ping_task())
|
|
self.log.info(f"registered for service {service}")
|
|
log.info(f"{response}")
|
|
return response
|
|
|
|
|
|
def daemon_launch_lock_path(root_path: Path) -> Path:
|
|
"""
|
|
A path to a file that is lock when a daemon is launching but not yet started.
|
|
This prevents multiple instances from launching.
|
|
"""
|
|
return root_path / "run" / "start-daemon.launching"
|
|
|
|
|
|
def service_launch_lock_path(root_path: Path, service: str) -> Path:
|
|
"""
|
|
A path to a file that is lock when a service is running.
|
|
"""
|
|
service_name = service.replace(" ", "-").replace("/", "-")
|
|
return root_path / "run" / f"{service_name}.lock"
|
|
|
|
|
|
def pid_path_for_service(root_path: Path, service: str, id: str = "") -> Path:
|
|
"""
|
|
Generate a path for a PID file for the given service name.
|
|
"""
|
|
pid_name = service.replace(" ", "-").replace("/", "-")
|
|
return root_path / "run" / f"{pid_name}{id}.pid"
|
|
|
|
|
|
def plotter_log_path(root_path: Path, id: str):
|
|
return root_path / "plotter" / f"plotter_log_{id}.txt"
|
|
|
|
|
|
def launch_plotter(root_path: Path, service_name: str, service_array: List[str], id: str):
|
|
# we need to pass on the possibly altered CHIA_ROOT
|
|
os.environ["CHIA_ROOT"] = str(root_path)
|
|
service_executable = executable_for_service(service_array[0])
|
|
|
|
# Swap service name with name of executable
|
|
service_array[0] = service_executable
|
|
startupinfo = None
|
|
if os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO() # type: ignore
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
|
|
|
|
# Windows-specific.
|
|
# If the current process group is used, CTRL_C_EVENT will kill the parent and everyone in the group!
|
|
try:
|
|
creationflags: int = subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore
|
|
except AttributeError: # Not on Windows.
|
|
creationflags = 0
|
|
|
|
plotter_path = plotter_log_path(root_path, id)
|
|
|
|
if plotter_path.parent.exists():
|
|
if plotter_path.exists():
|
|
plotter_path.unlink()
|
|
else:
|
|
mkdir(plotter_path.parent)
|
|
outfile = open(plotter_path.resolve(), "w")
|
|
log.info(f"Service array: {service_array}")
|
|
process = subprocess.Popen(
|
|
service_array,
|
|
shell=False,
|
|
stderr=outfile,
|
|
stdout=outfile,
|
|
startupinfo=startupinfo,
|
|
creationflags=creationflags,
|
|
)
|
|
|
|
pid_path = pid_path_for_service(root_path, service_name, id)
|
|
try:
|
|
mkdir(pid_path.parent)
|
|
with open(pid_path, "w") as f:
|
|
f.write(f"{process.pid}\n")
|
|
except Exception:
|
|
pass
|
|
return process, pid_path
|
|
|
|
|
|
def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen, Path]:
|
|
"""
|
|
Launch a child process.
|
|
"""
|
|
# set up CHIA_ROOT
|
|
# invoke correct script
|
|
# save away PID
|
|
|
|
# we need to pass on the possibly altered CHIA_ROOT
|
|
os.environ["CHIA_ROOT"] = str(root_path)
|
|
|
|
log.debug(f"Launching service with CHIA_ROOT: {os.environ['CHIA_ROOT']}")
|
|
|
|
lockfile = singleton(service_launch_lock_path(root_path, service_command))
|
|
if lockfile is None:
|
|
logging.error(f"{service_command}: already running")
|
|
raise subprocess.SubprocessError
|
|
|
|
# Insert proper e
|
|
service_array = service_command.split()
|
|
service_executable = executable_for_service(service_array[0])
|
|
service_array[0] = service_executable
|
|
startupinfo = None
|
|
if os.name == "nt":
|
|
startupinfo = subprocess.STARTUPINFO() # type: ignore
|
|
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
|
|
|
|
# CREATE_NEW_PROCESS_GROUP allows graceful shutdown on windows, by CTRL_BREAK_EVENT signal
|
|
if sys.platform == "win32" or sys.platform == "cygwin":
|
|
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
|
|
else:
|
|
creationflags = 0
|
|
environ_copy = os.environ.copy()
|
|
process = subprocess.Popen(
|
|
service_array, shell=False, startupinfo=startupinfo, creationflags=creationflags, env=environ_copy
|
|
)
|
|
pid_path = pid_path_for_service(root_path, service_command)
|
|
try:
|
|
mkdir(pid_path.parent)
|
|
with open(pid_path, "w") as f:
|
|
f.write(f"{process.pid}\n")
|
|
except Exception:
|
|
pass
|
|
return process, pid_path
|
|
|
|
|
|
async def kill_process(
|
|
process: subprocess.Popen, root_path: Path, service_name: str, id: str, delay_before_kill: int = 15
|
|
) -> bool:
|
|
pid_path = pid_path_for_service(root_path, service_name, id)
|
|
|
|
if sys.platform == "win32" or sys.platform == "cygwin":
|
|
log.info("sending CTRL_BREAK_EVENT signal to %s", service_name)
|
|
# pylint: disable=E1101
|
|
kill(process.pid, signal.SIGBREAK) # type: ignore
|
|
|
|
else:
|
|
log.info("sending term signal to %s", service_name)
|
|
process.terminate()
|
|
|
|
count: float = 0
|
|
while count < delay_before_kill:
|
|
if process.poll() is not None:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
count += 0.5
|
|
else:
|
|
process.kill()
|
|
log.info("sending kill signal to %s", service_name)
|
|
r = process.wait()
|
|
log.info("process %s returned %d", service_name, r)
|
|
try:
|
|
pid_path_killed = pid_path.with_suffix(".pid-killed")
|
|
if pid_path_killed.exists():
|
|
pid_path_killed.unlink()
|
|
os.rename(pid_path, pid_path_killed)
|
|
except Exception:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
async def kill_service(
|
|
root_path: Path, services: Dict[str, subprocess.Popen], service_name: str, delay_before_kill: int = 15
|
|
) -> bool:
|
|
process = services.get(service_name)
|
|
if process is None:
|
|
return False
|
|
del services[service_name]
|
|
|
|
result = await kill_process(process, root_path, service_name, "", delay_before_kill)
|
|
return result
|
|
|
|
|
|
def is_running(services: Dict[str, subprocess.Popen], service_name: str) -> bool:
|
|
process = services.get(service_name)
|
|
return process is not None and process.poll() is None
|
|
|
|
|
|
def create_server_for_daemon(root_path: Path):
|
|
routes = web.RouteTableDef()
|
|
|
|
services: Dict = dict()
|
|
|
|
@routes.get("/daemon/ping/")
|
|
async def ping(request: web.Request) -> web.Response:
|
|
return web.Response(text="pong")
|
|
|
|
@routes.get("/daemon/service/start/")
|
|
async def start_service(request: web.Request) -> web.Response:
|
|
service_name = request.query.get("service")
|
|
if service_name is None or not validate_service(service_name):
|
|
r = f"{service_name} unknown service"
|
|
return web.Response(text=str(r))
|
|
|
|
if is_running(services, service_name):
|
|
r = f"{service_name} already running"
|
|
return web.Response(text=str(r))
|
|
|
|
try:
|
|
process, pid_path = launch_service(root_path, service_name)
|
|
services[service_name] = process
|
|
r = f"{service_name} started"
|
|
except (subprocess.SubprocessError, IOError):
|
|
log.exception(f"problem starting {service_name}")
|
|
r = f"{service_name} start failed"
|
|
|
|
return web.Response(text=str(r))
|
|
|
|
@routes.get("/daemon/service/stop/")
|
|
async def stop_service(request: web.Request) -> web.Response:
|
|
service_name = request.query.get("service")
|
|
if service_name is None:
|
|
r = f"{service_name} unknown service"
|
|
return web.Response(text=str(r))
|
|
r = str(await kill_service(root_path, services, service_name))
|
|
return web.Response(text=str(r))
|
|
|
|
@routes.get("/daemon/service/is_running/")
|
|
async def is_running_handler(request: web.Request) -> web.Response:
|
|
service_name = request.query.get("service")
|
|
if service_name is None:
|
|
r = f"{service_name} unknown service"
|
|
return web.Response(text=str(r))
|
|
|
|
r = str(is_running(services, service_name))
|
|
return web.Response(text=str(r))
|
|
|
|
@routes.get("/daemon/exit/")
|
|
async def exit(request: web.Request):
|
|
jobs = []
|
|
for k in services.keys():
|
|
jobs.append(kill_service(root_path, services, k))
|
|
if jobs:
|
|
await asyncio.wait(jobs)
|
|
services.clear()
|
|
|
|
# we can't await `site.stop()` here because that will cause a deadlock, waiting for this
|
|
# request to exit
|
|
|
|
|
|
def singleton(lockfile: Path, text: str = "semaphore") -> Optional[TextIO]:
|
|
"""
|
|
Open a lockfile exclusively.
|
|
"""
|
|
|
|
if not lockfile.parent.exists():
|
|
mkdir(lockfile.parent)
|
|
|
|
try:
|
|
if has_fcntl:
|
|
f = open(lockfile, "w")
|
|
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
else:
|
|
if lockfile.exists():
|
|
lockfile.unlink()
|
|
fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
|
|
f = open(fd, "w")
|
|
f.write(text)
|
|
except IOError:
|
|
return None
|
|
return f
|
|
|
|
|
|
async def async_run_daemon(root_path: Path) -> int:
|
|
chia_init(root_path)
|
|
config = load_config(root_path, "config.yaml")
|
|
setproctitle("chia_daemon")
|
|
initialize_logging("daemon", config["logging"], root_path)
|
|
lockfile = singleton(daemon_launch_lock_path(root_path))
|
|
crt_path = root_path / config["daemon_ssl"]["private_crt"]
|
|
key_path = root_path / config["daemon_ssl"]["private_key"]
|
|
ca_crt_path = root_path / config["private_ssl_ca"]["crt"]
|
|
ca_key_path = root_path / config["private_ssl_ca"]["key"]
|
|
sys.stdout.flush()
|
|
json_msg = dict_to_json_str(
|
|
{
|
|
"message": "cert_path",
|
|
"success": True,
|
|
"cert": f"{crt_path}",
|
|
"key": f"{key_path}",
|
|
"ca_crt": f"{ca_crt_path}",
|
|
}
|
|
)
|
|
sys.stdout.write("\n" + json_msg + "\n")
|
|
sys.stdout.flush()
|
|
if lockfile is None:
|
|
print("daemon: already launching")
|
|
return 2
|
|
|
|
# TODO: clean this up, ensuring lockfile isn't removed until the listen port is open
|
|
create_server_for_daemon(root_path)
|
|
ws_server = WebSocketServer(root_path, ca_crt_path, ca_key_path, crt_path, key_path)
|
|
await ws_server.start()
|
|
assert ws_server.websocket_server is not None
|
|
await ws_server.websocket_server.wait_closed()
|
|
log.info("Daemon WebSocketServer closed")
|
|
return 0
|
|
|
|
|
|
def run_daemon(root_path: Path) -> int:
|
|
return asyncio.get_event_loop().run_until_complete(async_run_daemon(root_path))
|
|
|
|
|
|
def main() -> int:
|
|
from chia.util.default_root import DEFAULT_ROOT_PATH
|
|
|
|
return run_daemon(DEFAULT_ROOT_PATH)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|