chia-blockchain/chia/util/ws_message.py

66 lines
1.6 KiB
Python

from secrets import token_bytes
from typing import Any, Dict
from chia.util.json_util import dict_to_json_str
try:
from typings import TypedDict
except ImportError:
from typing_extensions import TypedDict
# Messages must follow this format
# Message = { "command" "command_name",
# "data" : {...},
# "request_id": "bytes_32",
# "destination": "service_name",
# "origin": "service_name"
# }
class WsRpcMessage(TypedDict):
command: str
ack: bool
data: Dict[str, Any]
request_id: str
destination: str
origin: str
def format_response(incoming_msg: WsRpcMessage, response_data: Dict[str, Any]) -> str:
"""
Formats the response into standard format.
"""
response = {
"command": incoming_msg["command"],
"ack": True,
"data": response_data,
"request_id": incoming_msg["request_id"],
"destination": incoming_msg["origin"],
"origin": incoming_msg["destination"],
}
json_str = dict_to_json_str(response)
return json_str
def create_payload(command: str, data: Dict[str, Any], origin: str, destination: str) -> str:
response = create_payload_dict(command, data, origin, destination)
return dict_to_json_str(response)
def create_payload_dict(command: str, data: Dict[str, Any], origin: str, destination: str) -> WsRpcMessage:
return WsRpcMessage(
command=command,
ack=False,
data=data,
request_id=token_bytes().hex(),
destination=destination,
origin=origin,
)
def pong() -> Dict[str, Any]:
response = {"success": True}
return response