499 lines
13 KiB
TypeScript
499 lines
13 KiB
TypeScript
/**
|
|
* Connection that wraps a socket and provides an interface to interact with
|
|
* the Home Assistant websocket API.
|
|
*/
|
|
import * as messages from "./messages.js";
|
|
import { ERR_INVALID_AUTH, ERR_CONNECTION_LOST } from "./errors.js";
|
|
import { HassEvent, MessageBase } from "./types.js";
|
|
import { HaWebSocket } from "./socket.js";
|
|
import type { Auth } from "./auth.js";
|
|
|
|
const DEBUG = false;
|
|
|
|
export type ConnectionOptions = {
|
|
setupRetry: number;
|
|
auth?: Auth;
|
|
createSocket: (options: ConnectionOptions) => Promise<HaWebSocket>;
|
|
};
|
|
|
|
export type ConnectionEventListener = (
|
|
conn: Connection,
|
|
eventData?: any,
|
|
) => void;
|
|
|
|
type Events = "ready" | "disconnected" | "reconnect-error";
|
|
|
|
type WebSocketPongResponse = {
|
|
id: number;
|
|
type: "pong";
|
|
};
|
|
|
|
type WebSocketEventResponse = {
|
|
id: number;
|
|
type: "event";
|
|
event: HassEvent;
|
|
};
|
|
|
|
type WebSocketResultResponse = {
|
|
id: number;
|
|
type: "result";
|
|
success: true;
|
|
result: any;
|
|
};
|
|
|
|
type WebSocketResultErrorResponse = {
|
|
id: number;
|
|
type: "result";
|
|
success: false;
|
|
error: {
|
|
code: string;
|
|
message: string;
|
|
};
|
|
};
|
|
|
|
type WebSocketResponse =
|
|
| WebSocketPongResponse
|
|
| WebSocketEventResponse
|
|
| WebSocketResultResponse
|
|
| WebSocketResultErrorResponse;
|
|
|
|
type SubscriptionUnsubscribe = () => Promise<void>;
|
|
|
|
interface SubscribeEventCommmandInFlight<T> {
|
|
resolve: (result?: any) => void;
|
|
reject: (err: any) => void;
|
|
callback: (ev: T) => void;
|
|
subscribe: (() => Promise<SubscriptionUnsubscribe>) | undefined;
|
|
unsubscribe: SubscriptionUnsubscribe;
|
|
}
|
|
|
|
type CommandWithAnswerInFlight = {
|
|
resolve: (result?: any) => void;
|
|
reject: (err: any) => void;
|
|
};
|
|
|
|
type CommandInFlight =
|
|
| SubscribeEventCommmandInFlight<any>
|
|
| CommandWithAnswerInFlight;
|
|
|
|
export class Connection {
|
|
options: ConnectionOptions;
|
|
commandId: number;
|
|
commands: Map<number, CommandInFlight>;
|
|
eventListeners: Map<string, ConnectionEventListener[]>;
|
|
closeRequested: boolean;
|
|
suspendReconnectPromise?: Promise<void>;
|
|
|
|
oldSubscriptions?: Map<number, CommandInFlight>;
|
|
|
|
// We use this to queue messages in flight for the first reconnect
|
|
// after the connection has been suspended.
|
|
_queuedMessages?: Array<{
|
|
resolve: (value?: unknown) => unknown;
|
|
reject?: (err: typeof ERR_CONNECTION_LOST) => unknown;
|
|
}>;
|
|
socket?: HaWebSocket;
|
|
/**
|
|
* Version string of the Home Assistant instance. Set to version of last connection while reconnecting.
|
|
*/
|
|
// @ts-ignore: incorrectly claiming it's not set in constructor.
|
|
haVersion: string;
|
|
|
|
constructor(socket: HaWebSocket, options: ConnectionOptions) {
|
|
// connection options
|
|
// - setupRetry: amount of ms to retry when unable to connect on initial setup
|
|
// - createSocket: create a new Socket connection
|
|
this.options = options;
|
|
// id if next command to send
|
|
this.commandId = 2; // socket may send 1 at the start to enable features
|
|
// info about active subscriptions and commands in flight
|
|
this.commands = new Map();
|
|
// map of event listeners
|
|
this.eventListeners = new Map();
|
|
// true if a close is requested by the user
|
|
this.closeRequested = false;
|
|
|
|
this._setSocket(socket);
|
|
}
|
|
|
|
get connected() {
|
|
// Using conn.socket.OPEN instead of WebSocket for better node support
|
|
return (
|
|
this.socket !== undefined && this.socket.readyState == this.socket.OPEN
|
|
);
|
|
}
|
|
|
|
private _setSocket(socket: HaWebSocket) {
|
|
this.socket = socket;
|
|
this.haVersion = socket.haVersion;
|
|
socket.addEventListener("message", this._handleMessage);
|
|
socket.addEventListener("close", this._handleClose);
|
|
|
|
const oldSubscriptions = this.oldSubscriptions;
|
|
if (oldSubscriptions) {
|
|
this.oldSubscriptions = undefined;
|
|
oldSubscriptions.forEach((info) => {
|
|
if ("subscribe" in info && info.subscribe) {
|
|
info.subscribe().then((unsub) => {
|
|
info.unsubscribe = unsub;
|
|
// We need to resolve this in case it wasn't resolved yet.
|
|
// This allows us to subscribe while we're disconnected
|
|
// and recover properly.
|
|
info.resolve();
|
|
});
|
|
}
|
|
});
|
|
}
|
|
const queuedMessages = this._queuedMessages;
|
|
|
|
if (queuedMessages) {
|
|
this._queuedMessages = undefined;
|
|
for (const queuedMsg of queuedMessages) {
|
|
queuedMsg.resolve();
|
|
}
|
|
}
|
|
|
|
this.fireEvent("ready");
|
|
}
|
|
|
|
addEventListener(eventType: Events, callback: ConnectionEventListener) {
|
|
let listeners = this.eventListeners.get(eventType);
|
|
|
|
if (!listeners) {
|
|
listeners = [];
|
|
this.eventListeners.set(eventType, listeners);
|
|
}
|
|
|
|
listeners.push(callback);
|
|
}
|
|
|
|
removeEventListener(eventType: Events, callback: ConnectionEventListener) {
|
|
const listeners = this.eventListeners.get(eventType);
|
|
|
|
if (!listeners) {
|
|
return;
|
|
}
|
|
|
|
const index = listeners.indexOf(callback);
|
|
|
|
if (index !== -1) {
|
|
listeners.splice(index, 1);
|
|
}
|
|
}
|
|
|
|
fireEvent(eventType: Events, eventData?: any) {
|
|
(this.eventListeners.get(eventType) || []).forEach((callback) =>
|
|
callback(this, eventData),
|
|
);
|
|
}
|
|
|
|
suspendReconnectUntil(suspendPromise: Promise<void>) {
|
|
this.suspendReconnectPromise = suspendPromise;
|
|
}
|
|
|
|
suspend() {
|
|
if (!this.suspendReconnectPromise) {
|
|
throw new Error("Suspend promise not set");
|
|
}
|
|
if (this.socket) {
|
|
this.socket.close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reconnect the websocket connection.
|
|
* @param force discard old socket instead of gracefully closing it.
|
|
*/
|
|
reconnect(force = false) {
|
|
if (!this.socket) {
|
|
return;
|
|
}
|
|
if (!force) {
|
|
this.socket.close();
|
|
return;
|
|
}
|
|
this.socket.removeEventListener("message", this._handleMessage);
|
|
this.socket.removeEventListener("close", this._handleClose);
|
|
this.socket.close();
|
|
this._handleClose();
|
|
}
|
|
|
|
close() {
|
|
this.closeRequested = true;
|
|
if (this.socket) {
|
|
this.socket.close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Subscribe to a specific or all events.
|
|
*
|
|
* @param callback Callback to be called when a new event fires
|
|
* @param eventType
|
|
* @returns promise that resolves to an unsubscribe function
|
|
*/
|
|
async subscribeEvents<EventType>(
|
|
callback: (ev: EventType) => void,
|
|
eventType?: string,
|
|
): Promise<SubscriptionUnsubscribe> {
|
|
return this.subscribeMessage(callback, messages.subscribeEvents(eventType));
|
|
}
|
|
|
|
ping() {
|
|
return this.sendMessagePromise(messages.ping());
|
|
}
|
|
|
|
sendMessage(message: MessageBase, commandId?: number): void {
|
|
if (!this.connected) {
|
|
throw ERR_CONNECTION_LOST;
|
|
}
|
|
|
|
if (DEBUG) {
|
|
console.log("Sending", message);
|
|
}
|
|
|
|
if (this._queuedMessages) {
|
|
if (commandId) {
|
|
throw new Error("Cannot queue with commandId");
|
|
}
|
|
this._queuedMessages.push({ resolve: () => this.sendMessage(message) });
|
|
return;
|
|
}
|
|
|
|
if (!commandId) {
|
|
commandId = this._genCmdId();
|
|
}
|
|
message.id = commandId;
|
|
|
|
this.socket!.send(JSON.stringify(message));
|
|
}
|
|
|
|
sendMessagePromise<Result>(message: MessageBase): Promise<Result> {
|
|
return new Promise((resolve, reject) => {
|
|
if (this._queuedMessages) {
|
|
this._queuedMessages!.push({
|
|
reject,
|
|
resolve: async () => {
|
|
try {
|
|
resolve(await this.sendMessagePromise(message));
|
|
} catch (err) {
|
|
reject(err);
|
|
}
|
|
},
|
|
});
|
|
return;
|
|
}
|
|
|
|
const commandId = this._genCmdId();
|
|
this.commands.set(commandId, { resolve, reject });
|
|
this.sendMessage(message, commandId);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Call a websocket command that starts a subscription on the backend.
|
|
*
|
|
* @param message the message to start the subscription
|
|
* @param callback the callback to be called when a new item arrives
|
|
* @param [options.resubscribe] re-established a subscription after a reconnect. Defaults to true.
|
|
* @returns promise that resolves to an unsubscribe function
|
|
*/
|
|
async subscribeMessage<Result>(
|
|
callback: (result: Result) => void,
|
|
subscribeMessage: MessageBase,
|
|
options?: { resubscribe?: boolean },
|
|
): Promise<SubscriptionUnsubscribe> {
|
|
if (this._queuedMessages) {
|
|
await new Promise((resolve, reject) => {
|
|
this._queuedMessages!.push({ resolve, reject });
|
|
});
|
|
}
|
|
|
|
let info: SubscribeEventCommmandInFlight<Result>;
|
|
|
|
await new Promise((resolve, reject) => {
|
|
// Command ID that will be used
|
|
const commandId = this._genCmdId();
|
|
|
|
// We store unsubscribe on info object. That way we can overwrite it in case
|
|
// we get disconnected and we have to subscribe again.
|
|
info = {
|
|
resolve,
|
|
reject,
|
|
callback,
|
|
subscribe:
|
|
options?.resubscribe !== false
|
|
? () => this.subscribeMessage(callback, subscribeMessage)
|
|
: undefined,
|
|
unsubscribe: async () => {
|
|
// No need to unsubscribe if we're disconnected
|
|
if (this.connected) {
|
|
await this.sendMessagePromise(
|
|
messages.unsubscribeEvents(commandId),
|
|
);
|
|
}
|
|
this.commands.delete(commandId);
|
|
},
|
|
};
|
|
this.commands.set(commandId, info);
|
|
|
|
try {
|
|
this.sendMessage(subscribeMessage, commandId);
|
|
} catch (err) {
|
|
// Happens when the websocket is already closing.
|
|
// Don't have to handle the error, reconnect logic will pick it up.
|
|
}
|
|
});
|
|
|
|
return () => info.unsubscribe();
|
|
}
|
|
|
|
private _handleMessage = (event: MessageEvent) => {
|
|
let messageGroup: WebSocketResponse | WebSocketResponse[] = JSON.parse(
|
|
event.data,
|
|
);
|
|
|
|
if (!Array.isArray(messageGroup)) {
|
|
messageGroup = [messageGroup];
|
|
}
|
|
|
|
messageGroup.forEach((message) => {
|
|
if (DEBUG) {
|
|
console.log("Received", message);
|
|
}
|
|
|
|
const info = this.commands.get(message.id);
|
|
|
|
switch (message.type) {
|
|
case "event":
|
|
if (info) {
|
|
(info as SubscribeEventCommmandInFlight<any>).callback(
|
|
message.event,
|
|
);
|
|
} else {
|
|
console.warn(
|
|
`Received event for unknown subscription ${message.id}. Unsubscribing.`,
|
|
);
|
|
this.sendMessagePromise(
|
|
messages.unsubscribeEvents(message.id),
|
|
).catch((err) => {
|
|
if (DEBUG) {
|
|
console.warn(
|
|
` Error unsubsribing from unknown subscription ${message.id}`,
|
|
err,
|
|
);
|
|
}
|
|
});
|
|
}
|
|
break;
|
|
|
|
case "result":
|
|
// No info is fine. If just sendMessage is used, we did not store promise for result
|
|
if (info) {
|
|
if (message.success) {
|
|
info.resolve(message.result);
|
|
|
|
// Don't remove subscriptions.
|
|
if (!("subscribe" in info)) {
|
|
this.commands.delete(message.id);
|
|
}
|
|
} else {
|
|
info.reject(message.error);
|
|
this.commands.delete(message.id);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case "pong":
|
|
if (info) {
|
|
info.resolve();
|
|
this.commands.delete(message.id);
|
|
} else {
|
|
console.warn(`Received unknown pong response ${message.id}`);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
if (DEBUG) {
|
|
console.warn("Unhandled message", message);
|
|
}
|
|
}
|
|
});
|
|
};
|
|
|
|
private _handleClose = async () => {
|
|
const oldCommands = this.commands;
|
|
|
|
// reset to original state except haVersion
|
|
this.commandId = 1;
|
|
this.oldSubscriptions = this.commands;
|
|
this.commands = new Map();
|
|
this.socket = undefined;
|
|
|
|
// Reject in-flight sendMessagePromise requests
|
|
oldCommands.forEach((info) => {
|
|
// We don't cancel subscribeEvents commands in flight
|
|
// as we will be able to recover them.
|
|
if (!("subscribe" in info)) {
|
|
info.reject(messages.error(ERR_CONNECTION_LOST, "Connection lost"));
|
|
}
|
|
});
|
|
|
|
if (this.closeRequested) {
|
|
return;
|
|
}
|
|
|
|
this.fireEvent("disconnected");
|
|
|
|
// Disable setupRetry, we control it here with auto-backoff
|
|
const options = { ...this.options, setupRetry: 0 };
|
|
|
|
const reconnect = (tries: number) => {
|
|
setTimeout(
|
|
async () => {
|
|
if (this.closeRequested) {
|
|
return;
|
|
}
|
|
if (DEBUG) {
|
|
console.log("Trying to reconnect");
|
|
}
|
|
try {
|
|
const socket = await options.createSocket(options);
|
|
this._setSocket(socket);
|
|
} catch (err) {
|
|
if (this._queuedMessages) {
|
|
const queuedMessages = this._queuedMessages;
|
|
this._queuedMessages = undefined;
|
|
for (const msg of queuedMessages) {
|
|
if (msg.reject) {
|
|
msg.reject(ERR_CONNECTION_LOST);
|
|
}
|
|
}
|
|
}
|
|
if (err === ERR_INVALID_AUTH) {
|
|
this.fireEvent("reconnect-error", err);
|
|
} else {
|
|
reconnect(tries + 1);
|
|
}
|
|
}
|
|
},
|
|
Math.min(tries, 5) * 1000,
|
|
);
|
|
};
|
|
|
|
if (this.suspendReconnectPromise) {
|
|
await this.suspendReconnectPromise;
|
|
this.suspendReconnectPromise = undefined;
|
|
// For the first retry after suspend, we will queue up
|
|
// all messages.
|
|
this._queuedMessages = [];
|
|
}
|
|
|
|
reconnect(0);
|
|
};
|
|
|
|
private _genCmdId() {
|
|
return ++this.commandId;
|
|
}
|
|
}
|