joshuar-go-hass-agent/internal/agent/runners.go

325 lines
9.2 KiB
Go

// Copyright (c) 2024 Joshua Rich <joshua.rich@gmail.com>
//
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT
package agent
import (
"context"
"sync"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
mqttapi "github.com/joshuar/go-hass-anything/v9/pkg/mqtt"
"github.com/joshuar/go-hass-agent/internal/commands"
"github.com/joshuar/go-hass-agent/internal/device"
"github.com/joshuar/go-hass-agent/internal/hass"
"github.com/joshuar/go-hass-agent/internal/hass/sensor"
"github.com/joshuar/go-hass-agent/internal/linux"
"github.com/joshuar/go-hass-agent/internal/preferences"
"github.com/joshuar/go-hass-agent/internal/scripts"
)
// WorkerController represents an object that manages one or more Workers.
type WorkerController interface {
// ActiveWorkers is a list of the names of all currently active Workers.
ActiveWorkers() []string
// InactiveWorkers is a list of the names of all currently inactive Workers.
InactiveWorkers() []string
// Start provides a way to start the named Worker.
Start(ctx context.Context, name string) (<-chan sensor.Details, error)
// Stop provides a way to stop the named Worker.
Stop(name string) error
// StartAll will start all Workers that this controller manages.
StartAll(ctx context.Context) (<-chan sensor.Details, error)
// StopAll will stop all Workers that this controller manages.
StopAll() error
}
// Worker represents an object that is responsible for controlling the
// publishing of one or more sensors.
type Worker interface {
// Name is the collective name of the sensors this worker controls.
Name() string
// Description is a longer text of what particular sensors are gathered and
// where from.
Description() string
// Sensors returns an array of the current value of all sensors, or a
// non-nil error if this is not possible.
Sensors(ctx context.Context) ([]sensor.Details, error)
// Updates returns a channel on which updates to sensors will be published,
// when they become available.
Updates(ctx context.Context) (<-chan sensor.Details, error)
}
// MQTTWorker represents an object that is responsible for controlling the
// publishing of one or more commands over MQTT.
type MQTTWorker interface {
// Subscriptions is a list of MQTT subscriptions this object wants to
// establish on the MQTT broker.
Subscriptions() []*mqttapi.Subscription
// Configs are MQTT messages sent to the broker that Home Assistant will use
// to set up entities.
Configs() []*mqttapi.Msg
// Msgs returns a channel on which this object will send MQTT messages on
// certain events.
Msgs() chan *mqttapi.Msg
// Setup provides a way to perform any required initialisation of the
// object.
Setup(_ context.Context) error
}
// runWorkers will call all the sensor worker functions that have been defined
// for this device.
func runWorkers(ctx context.Context, trk SensorTracker, reg sensor.Registry) {
// Create sensor workers for OS.
osWorkers := createSensorWorkers()
// Start sensor workers for OS.
sensorUpdates, err := osWorkers.StartAll(ctx)
if err != nil {
log.Debug().Err(err).Msg("Some OS workers could not be started.")
}
// Create sensor workers for device.
deviceWorkers := device.CreateSensorWorkers()
// Start sensor workers for device.
deviceUpdates, err := deviceWorkers.StartAll(ctx)
if err != nil {
log.Debug().Err(err).Msg("Some device workers could not be started.")
}
// Listen for sensor updates from all workers.
go func() {
log.Debug().Msg("Listening for sensor updates.")
for update := range sensor.MergeSensorCh(ctx, sensorUpdates, deviceUpdates) {
go func(update sensor.Details) {
if err := trk.UpdateSensor(ctx, reg, update); err != nil {
log.Warn().Err(err).Str("id", update.ID()).Msg("Update failed.")
} else {
log.Debug().
Str("name", update.Name()).
Str("id", update.ID()).
Interface("state", update.State()).
Str("units", update.Units()).
Msg("Sensor updated.")
}
}(update)
}
}()
var wg sync.WaitGroup
wg.Add(1)
// When context is cancelled, stop all sensor workers.
go func() {
defer wg.Done()
<-ctx.Done()
if err := osWorkers.StopAll(); err != nil {
log.Debug().Err(err).Msg("Stopping OS workers reported an error.")
}
if err := deviceWorkers.StopAll(); err != nil {
log.Debug().Err(err).Msg("Stopping device workers reported an error.")
}
}()
wg.Wait()
}
// runScripts will retrieve all scripts that the agent can run and queue them up
// to be run on their defined schedule using the cron scheduler. It also sets up
// a channel to receive script output and send appropriate sensor objects to the
// sensor.
func runScripts(ctx context.Context, path string, trk SensorTracker, reg sensor.Registry) {
allScripts, err := scripts.FindScripts(path)
switch {
case err != nil:
log.Error().Err(err).Msg("Error getting scripts.")
return
case len(allScripts) == 0:
log.Debug().Msg("Could not find any script files.")
return
}
scheduler := cron.New()
outCh := make([]<-chan sensor.Details, 0, len(allScripts))
for _, script := range allScripts {
schedule := script.Schedule()
if schedule != "" {
_, err := scheduler.AddJob(schedule, script)
if err != nil {
log.Warn().Err(err).Str("script", script.Path()).
Msg("Unable to schedule script.")
break
}
outCh = append(outCh, script.Output)
log.Debug().Str("schedule", schedule).Str("script", script.Path()).
Msg("Added script sensor.")
}
}
log.Debug().Msg("Starting cron scheduler for script sensors.")
scheduler.Start()
go func() {
for scriptUpdates := range sensor.MergeSensorCh(ctx, outCh...) {
go func(update sensor.Details) {
if err := trk.UpdateSensor(ctx, reg, update); err != nil {
log.Warn().Err(err).Str("id", update.ID()).Msg("Update sensor failed.")
} else {
log.Debug().
Str("name", update.Name()).
Str("id", update.ID()).
Interface("state", update.State()).
Str("units", update.Units()).
Msg("Sensor updated.")
}
}(scriptUpdates)
}
}()
<-ctx.Done()
log.Debug().Msg("Stopping cron scheduler for script sensors.")
cronCtx := scheduler.Stop()
<-cronCtx.Done()
}
// runNotificationsWorker will run a goroutine that is listening for
// notification messages from Home Assistant on a websocket connection. Any
// received notifications will be dipslayed on the device running the agent.
func (agent *Agent) runNotificationsWorker(ctx context.Context) {
log.Debug().Msg("Listening for notifications.")
notifyCh := hass.StartWebsocket(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
log.Debug().Msg("Stopping notification handler.")
return
case n := <-notifyCh:
agent.ui.DisplayNotification(n)
}
}
}()
wg.Wait()
}
// runMQTTWorker will set up a connection to MQTT and listen on topics for
// controlling this device from Home Assistant.
func runMQTTWorker(ctx context.Context, commandsFile string) {
prefs, err := preferences.Load()
if err != nil {
log.Error().Err(err).Msg("Could not load MQTT preferences.")
return
}
if !prefs.MQTTEnabled {
return
}
mqttCtx, mqttCancel := context.WithCancel(ctx)
defer mqttCancel()
var deviceController, commandController MQTTWorker
var subscriptions []*mqttapi.Subscription
var configs []*mqttapi.Msg
// Create an MQTT device for this operating system and run its Setup.
deviceController = newMQTTDevice(mqttCtx)
if err = deviceController.Setup(mqttCtx); err != nil {
log.Error().Err(err).Msg("Could not set up device MQTT functionality.")
} else {
subscriptions = append(subscriptions, deviceController.Subscriptions()...)
configs = append(configs, deviceController.Configs()...)
}
// Create an MQTT device for this operating system and run its Setup.
commandController, err = commands.NewCommandsController(ctx, commandsFile, linux.MQTTDevice())
if err != nil {
log.Warn().Err(err).Msg("Could not set up commands MQTT functionality.")
} else {
subscriptions = append(subscriptions, commandController.Subscriptions()...)
configs = append(configs, commandController.Configs()...)
}
// Create a new connection to the MQTT broker. This will also publish the
// device subscriptions.
client, err := mqttapi.NewClient(mqttCtx, prefs, subscriptions, configs)
if err != nil {
log.Error().Err(err).Msg("Could not connect to MQTT broker.")
return
}
go func() {
log.Debug().Msg("Listening for messages to publish to MQTT.")
for {
select {
case msg := <-deviceController.Msgs():
if err := client.Publish(msg); err != nil {
log.Warn().Err(err).Msg("Unable to publish message to MQTT.")
}
case <-ctx.Done():
mqttCancel()
log.Debug().Msg("Stopped listening for messages to publish to MQTT.")
return
}
}
}()
<-ctx.Done()
}
func resetMQTTWorker(ctx context.Context) {
prefs, err := preferences.Load()
if err != nil {
log.Error().Err(err).Msg("Could not load MQTT preferences.")
return
}
if !prefs.MQTTEnabled {
return
}
mqttDevice := newMQTTDevice(ctx)
client, err := mqttapi.NewClient(ctx, prefs, nil, nil)
if err != nil {
log.Error().Err(err).Msg("Could not connect to MQTT broker.")
return
}
if err := client.Unpublish(mqttDevice.Configs()...); err != nil {
log.Error().Err(err).Msg("Failed to reset MQTT.")
}
}