401 lines
16 KiB
Swift
401 lines
16 KiB
Swift
import Communicator
|
|
import Foundation
|
|
import ObjectMapper
|
|
import PromiseKit
|
|
import Shared
|
|
|
|
final class WatchCommunicatorService {
|
|
enum WatchAssistCommunicatorError: Error {
|
|
case pipelinesFetchFailed
|
|
}
|
|
|
|
// Assist
|
|
private var assistService: AssistServiceProtocol?
|
|
private var pendingAudioData: Data?
|
|
|
|
func setup() {
|
|
Current.servers.add(observer: self)
|
|
|
|
// This directly mutates the data structure for observations to avoid race conditions.
|
|
Communicator.State.observations.store[.init(queue: .main)] = { state in
|
|
Current.Log.verbose("Activation state changed: \(state)")
|
|
_ = HomeAssistantAPI.SyncWatchContext()
|
|
}
|
|
|
|
WatchState.observations.store[.init(queue: .main)] = { watchState in
|
|
Current.Log.verbose("Watch state changed: \(watchState)")
|
|
_ = HomeAssistantAPI.SyncWatchContext()
|
|
}
|
|
|
|
Reachability.observations.store[.init(queue: .main)] = { reachability in
|
|
Current.Log.verbose("Reachability changed: \(reachability)")
|
|
}
|
|
|
|
setupMessages()
|
|
|
|
Blob.observations.store[.init(queue: .main)] = { [weak self] blob in
|
|
Current.Log.verbose("Received blob: \(blob.identifier)")
|
|
|
|
if blob.identifier == InteractiveImmediateMessages.assistAudioData.rawValue {
|
|
self?.assistAudioData(blob: blob)
|
|
}
|
|
}
|
|
|
|
Context.observations.store[.init(queue: .main)] = { context in
|
|
Current.Log.verbose("Received context: \(context.content.keys) \(context.content)")
|
|
|
|
if let modelIdentifier = context.content[WatchContext.watchModel.rawValue] as? String {
|
|
Current.crashReporter.setUserProperty(value: modelIdentifier, name: "PairedAppleWatch")
|
|
}
|
|
|
|
Current.apis.forEach({ $0.UpdateSensors(trigger: .watchContext).cauterize() })
|
|
}
|
|
|
|
_ = Communicator.shared
|
|
}
|
|
|
|
private func setupMessages() {
|
|
InteractiveImmediateMessage.observations.store[.init(queue: .main)] = { [weak self] message in
|
|
Current.Log.verbose("Received \(message.identifier) \(message) \(message.content)")
|
|
|
|
guard let self, let messageId = InteractiveImmediateMessages(rawValue: message.identifier) else {
|
|
Current.Log
|
|
.error(
|
|
"Received InteractiveImmediateMessage not mapped in InteractiveImmediateMessages: \(message.identifier)"
|
|
)
|
|
return
|
|
}
|
|
|
|
switch messageId {
|
|
case .ping:
|
|
message.reply(.init(identifier: InteractiveImmediateResponses.pong.rawValue))
|
|
case .watchConfig:
|
|
watchConfig(message: message)
|
|
case .actionRowPressed:
|
|
actionRowPressed(message: message)
|
|
case .pushAction:
|
|
pushAction(message: message)
|
|
case .assistPipelinesFetch:
|
|
assistPipelinesFetch(message: message)
|
|
case .assistAudioData:
|
|
// This will be handled by Blob observation due to amount of data
|
|
break
|
|
case .magicItemPressed:
|
|
magicItemPressed(message: message)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func watchConfig(message: InteractiveImmediateMessage) {
|
|
do {
|
|
if let config: WatchConfig = try Current.database.read({ db in
|
|
try WatchConfig.fetchOne(db)
|
|
}) {
|
|
Current.Log.info("Watch configuration exists, moving forward providing it to watch")
|
|
notifyWatchConfig(message: message, watchConfig: config)
|
|
} else {
|
|
Current.Log.error("No watch config found, notify watch of empty config")
|
|
notifyEmptyWatchConfig(message: message)
|
|
}
|
|
} catch {
|
|
Current.Log.error("Failed to access database (GRDB) for watch config error: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
private func notifyWatchConfig(message: InteractiveImmediateMessage, watchConfig: WatchConfig) {
|
|
let responseIdentifier = InteractiveImmediateResponses.watchConfigResponse.rawValue
|
|
let magicItemProvider = Current.magicItemProvider()
|
|
magicItemProvider.loadInformation { _ in
|
|
let magicItemsInfo: [MagicItem.Info] = watchConfig.items.compactMap { magicItem in
|
|
magicItemProvider.getInfo(for: magicItem)
|
|
}
|
|
message.reply(.init(identifier: responseIdentifier, content: [
|
|
"config": watchConfig.encodeForWatch(),
|
|
"magicItemsInfo": magicItemsInfo.map({ $0.encodeForWatch() }),
|
|
]))
|
|
}
|
|
}
|
|
|
|
private func notifyEmptyWatchConfig(message: InteractiveImmediateMessage) {
|
|
let responseIdentifier = InteractiveImmediateResponses.emptyWatchConfigResponse.rawValue
|
|
message.reply(.init(identifier: responseIdentifier))
|
|
}
|
|
|
|
private func magicItemPressed(message: InteractiveImmediateMessage) {
|
|
let responseIdentifier = InteractiveImmediateResponses.magicItemRowPressedResponse.rawValue
|
|
guard let itemType = message.content["itemType"] as? String,
|
|
let itemId = message.content["itemId"] as? String,
|
|
let serverId = message.content["serverId"] as? String,
|
|
let server = Current.servers.all.first(where: { $0.identifier.rawValue == serverId }),
|
|
let type = MagicItem.ItemType(rawValue: itemType), let api = Current.api(for: server) else {
|
|
Current.Log.warning("Magic item press did not provide item type or item id")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
return
|
|
}
|
|
|
|
switch type {
|
|
case .action:
|
|
firstly {
|
|
api.HandleAction(actionID: itemId, source: .Watch)
|
|
}.done {
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": true]))
|
|
}.catch { err in
|
|
Current.Log.error("Error during action event fire: \(err)")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
}
|
|
case .script:
|
|
callService(
|
|
server: server,
|
|
message: message,
|
|
magicItemId: itemId,
|
|
domain: .script,
|
|
responseIdentifier: responseIdentifier
|
|
)
|
|
case .scene:
|
|
callService(
|
|
server: server,
|
|
message: message,
|
|
magicItemId: itemId,
|
|
domain: .scene,
|
|
serviceName: "turn_on",
|
|
serviceData: ["entity_id": itemId],
|
|
responseIdentifier: responseIdentifier
|
|
)
|
|
case .entity:
|
|
guard let domain = MagicItem(id: itemId, serverId: "", type: .entity).domain else {
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
return
|
|
}
|
|
callService(
|
|
server: server,
|
|
message: message,
|
|
magicItemId: itemId,
|
|
domain: domain,
|
|
serviceName: "toggle",
|
|
serviceData: ["entity_id": itemId],
|
|
responseIdentifier: responseIdentifier
|
|
)
|
|
}
|
|
}
|
|
|
|
private func callService(
|
|
server: Server,
|
|
message: InteractiveImmediateMessage,
|
|
magicItemId: String,
|
|
domain: Domain,
|
|
serviceName: String? = nil,
|
|
serviceData: [String: String] = [:],
|
|
responseIdentifier: String
|
|
) {
|
|
guard let api = Current.api(for: server) else {
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
Current.Log.error("No API available to call service")
|
|
return
|
|
}
|
|
let domain = domain.rawValue
|
|
let serviceName = serviceName ?? magicItemId.replacingOccurrences(of: "\(domain).", with: "")
|
|
api.CallService(
|
|
domain: domain,
|
|
service: serviceName,
|
|
serviceData: serviceData,
|
|
shouldLog: true
|
|
).pipe { result in
|
|
switch result {
|
|
case .fulfilled:
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": true]))
|
|
case let .rejected(error):
|
|
Current.Log.error("Failed to run \(domain), error: \(error.localizedDescription)")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
}
|
|
}
|
|
}
|
|
|
|
private func actionRowPressed(message: InteractiveImmediateMessage) {
|
|
let responseIdentifier = InteractiveImmediateResponses.actionRowPressedResponse.rawValue
|
|
guard let actionID = message.content["ActionID"] as? String,
|
|
let action = Current.realm().object(ofType: Action.self, forPrimaryKey: actionID),
|
|
let server = Current.servers.server(for: action),
|
|
let api = Current.api(for: server) else {
|
|
Current.Log.warning("ActionID either does not exist or is not a string in the payload")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
return
|
|
}
|
|
|
|
firstly {
|
|
api.HandleAction(actionID: actionID, source: .Watch)
|
|
}.done {
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": true]))
|
|
}.catch { err in
|
|
Current.Log.error("Error during action event fire: \(err)")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["fired": false]))
|
|
}
|
|
}
|
|
|
|
private func pushAction(message: InteractiveImmediateMessage) {
|
|
let responseIdentifier = InteractiveImmediateResponses.pushActionResponse.rawValue
|
|
|
|
if let infoJSON = message.content["PushActionInfo"] as? [String: Any],
|
|
let info = Mapper<HomeAssistantAPI.PushActionInfo>().map(JSON: infoJSON),
|
|
let serverIdentifier = message.content["Server"] as? String,
|
|
let server = Current.servers.server(forServerIdentifier: serverIdentifier),
|
|
let api = Current.api(for: server) {
|
|
Current.backgroundTask(withName: "watch-push-action") { _ in
|
|
firstly {
|
|
api.handlePushAction(for: info)
|
|
}.ensure {
|
|
message.reply(.init(identifier: responseIdentifier))
|
|
}
|
|
}.catch { error in
|
|
Current.Log.error("error handling push action: \(error)")
|
|
}
|
|
}
|
|
}
|
|
|
|
private func sendMessage(message: ImmediateMessage) {
|
|
Communicator.shared.send(message)
|
|
}
|
|
}
|
|
|
|
// MARK: - Assist
|
|
|
|
extension WatchCommunicatorService {
|
|
private func assistPipelinesFetch(message: InteractiveImmediateMessage) {
|
|
let responseIdentifier = InteractiveImmediateResponses.assistPipelinesFetchResponse.rawValue
|
|
|
|
let serverId = message.content["serverId"] as? String
|
|
guard let server = Current.servers.all.first(where: { $0.identifier.rawValue == serverId }) ?? Current
|
|
.servers.all.first else {
|
|
Current.Log.warning("No server available to execute message \(message)")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["error": true]))
|
|
return
|
|
}
|
|
|
|
initAssistServiceIfNeeded(server: server).fetchPipelines { pipelinesResponse in
|
|
if let pipelines = pipelinesResponse?.pipelines,
|
|
let preferredPipeline = pipelinesResponse?.preferredPipeline {
|
|
message.reply(.init(identifier: responseIdentifier, content: [
|
|
"pipelines": pipelines.map({ pipeline in
|
|
[
|
|
"name": pipeline.name,
|
|
"id": pipeline.id,
|
|
]
|
|
}),
|
|
"preferredPipeline": preferredPipeline,
|
|
]))
|
|
} else {
|
|
Current.Log
|
|
.error("Error during fetch Assist pipelines: \(WatchAssistCommunicatorError.pipelinesFetchFailed)")
|
|
message.reply(.init(identifier: responseIdentifier, content: ["error": true]))
|
|
}
|
|
}
|
|
}
|
|
|
|
private func assistAudioData(blob: Blob) {
|
|
let serverId = blob.metadata?["serverId"] as? String
|
|
guard let server = Current.servers.all.first(where: { $0.identifier.rawValue == serverId }) ?? Current
|
|
.servers.all.first else {
|
|
let errorMessage = "No server available to execute message \(blob.identifier)"
|
|
Current.Log.warning(errorMessage)
|
|
return
|
|
}
|
|
|
|
let pipelineId = blob.metadata?["pipelineId"] as? String
|
|
guard let sampleRate = blob.metadata?["sampleRate"] as? Double else {
|
|
let errorMessage = "No sample rate received in message \(blob.identifier)"
|
|
Current.Log.error(errorMessage)
|
|
return
|
|
}
|
|
let audioData = blob.content
|
|
pendingAudioData = audioData
|
|
initAssistServiceIfNeeded(server: server).assist(source: .audio(
|
|
pipelineId: pipelineId,
|
|
audioSampleRate: sampleRate
|
|
))
|
|
}
|
|
|
|
private func initAssistServiceIfNeeded(server: Server) -> AssistServiceProtocol {
|
|
if let assistService {
|
|
assistService.replaceServer(server: server)
|
|
} else {
|
|
assistService = AssistService(server: server)
|
|
}
|
|
|
|
assistService?.delegate = self
|
|
|
|
return assistService!
|
|
}
|
|
|
|
private func sendPendingAudioData() {
|
|
if let pendingAudioData {
|
|
assistService?.sendAudioData(pendingAudioData)
|
|
/*
|
|
Since Apple watch sends the whole audio all at once, we can notify
|
|
the pipeline that the audio data flow ended
|
|
*/
|
|
assistService?.finishSendingAudio()
|
|
self.pendingAudioData = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - AssistServiceDelegate
|
|
|
|
extension WatchCommunicatorService: AssistServiceDelegate {
|
|
func didReceiveEvent(_ event: Shared.AssistEvent) {
|
|
Current.Log.info("Watch Assist received event: \(event)")
|
|
}
|
|
|
|
func didReceiveSttContent(_ content: String) {
|
|
let message = ImmediateMessage(
|
|
identifier: InteractiveImmediateResponses.assistSTTResponse.rawValue,
|
|
content: [
|
|
"content": content,
|
|
]
|
|
)
|
|
sendMessage(message: message)
|
|
}
|
|
|
|
func didReceiveIntentEndContent(_ content: String) {
|
|
let message = ImmediateMessage(
|
|
identifier: InteractiveImmediateResponses.assistIntentEndResponse.rawValue,
|
|
content: [
|
|
"content": content,
|
|
]
|
|
)
|
|
sendMessage(message: message)
|
|
}
|
|
|
|
func didReceiveGreenLightForAudioInput() {
|
|
sendPendingAudioData()
|
|
}
|
|
|
|
func didReceiveTtsMediaUrl(_ mediaUrl: URL) {
|
|
let message = ImmediateMessage(
|
|
identifier: InteractiveImmediateResponses.assistTTSResponse.rawValue,
|
|
content: [
|
|
"mediaURL": mediaUrl.absoluteString,
|
|
]
|
|
)
|
|
sendMessage(message: message)
|
|
}
|
|
|
|
func didReceiveError(code: String, message: String) {
|
|
let message = ImmediateMessage(
|
|
identifier: InteractiveImmediateResponses.assistError.rawValue,
|
|
content: [
|
|
"code": code,
|
|
"message": message,
|
|
]
|
|
)
|
|
sendMessage(message: message)
|
|
}
|
|
}
|
|
|
|
// MARK: - ServerObserver
|
|
|
|
extension WatchCommunicatorService: ServerObserver {
|
|
func serversDidChange(_ serverManager: ServerManager) {
|
|
_ = HomeAssistantAPI.SyncWatchContext()
|
|
}
|
|
}
|