293 lines
7.6 KiB
Go
293 lines
7.6 KiB
Go
// Copyright 2024 Joshua Rich <joshua.rich@gmail.com>.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
//go:generate go run github.com/matryer/moq -out worker_mocks_test.go . PollingType EventType OneShotType
|
|
package linux
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/joshuar/go-hass-agent/internal/device/helpers"
|
|
"github.com/joshuar/go-hass-agent/internal/hass/event"
|
|
"github.com/joshuar/go-hass-agent/internal/hass/sensor"
|
|
"github.com/joshuar/go-hass-agent/internal/logging"
|
|
)
|
|
|
|
var ErrUnknownWorker = errors.New("unknown sensor worker type")
|
|
|
|
// PollingSensorType interface represents sensors that are generated on some poll interval.
|
|
type PollingSensorType interface {
|
|
UpdateDelta(delta time.Duration)
|
|
Sensors(ctx context.Context) ([]sensor.Entity, error)
|
|
}
|
|
|
|
// EventSensorType interface represents sensors that are generated on some event
|
|
// trigger, such as D-Bus messages.
|
|
type EventSensorType interface {
|
|
Events(ctx context.Context) (<-chan sensor.Entity, error)
|
|
Sensors(ctx context.Context) ([]sensor.Entity, error)
|
|
}
|
|
|
|
// OneShotSensorType interface represents sensors that are generated only one-time and
|
|
// have no ongoing updates.
|
|
type OneShotSensorType interface {
|
|
Sensors(ctx context.Context) ([]sensor.Entity, error)
|
|
}
|
|
|
|
type EventType interface {
|
|
Events(ctx context.Context) (<-chan event.Event, error)
|
|
}
|
|
|
|
// Worker is a struct embedded in other specific worker structs. It holds
|
|
// a function to cancel the worker and its ID.
|
|
type Worker struct {
|
|
cancelFunc context.CancelFunc
|
|
WorkerID string
|
|
}
|
|
|
|
// ID is a name that can be used as an ID to represent the group of sensors
|
|
// managed by this worker.
|
|
func (w *Worker) ID() string {
|
|
if w != nil {
|
|
return w.WorkerID
|
|
}
|
|
|
|
return "Unknown Worker"
|
|
}
|
|
|
|
// Stop will stop any processing of sensors controlled by this worker.
|
|
func (w *Worker) Stop() error {
|
|
w.cancelFunc()
|
|
return nil
|
|
}
|
|
|
|
// EventSensorWorker is a worker that generates sensors on some kind of
|
|
// event(s).
|
|
type EventSensorWorker struct {
|
|
EventSensorType
|
|
Worker
|
|
}
|
|
|
|
func (w *EventSensorWorker) Disabled() bool {
|
|
return w.EventSensorType == nil
|
|
}
|
|
|
|
func (w *EventSensorWorker) Start(ctx context.Context) (<-chan sensor.Entity, error) {
|
|
// Create a new context for the updates scope.
|
|
updatesCtx, cancelFunc := context.WithCancel(ctx)
|
|
// Save the context cancelFunc in the worker to be used as part of its
|
|
// Stop() method.
|
|
w.cancelFunc = cancelFunc
|
|
// Create a child logger for the worker.
|
|
|
|
return handleSensorEvents(updatesCtx, w.EventSensorType), nil
|
|
}
|
|
|
|
func NewEventSensorWorker(id string) *EventSensorWorker {
|
|
return &EventSensorWorker{
|
|
Worker: Worker{WorkerID: id},
|
|
}
|
|
}
|
|
|
|
// PollingSensorWorker is a worker that requires polling for generating sensors.
|
|
// It has a poll interval and jitter amount.
|
|
type PollingSensorWorker struct {
|
|
PollingSensorType
|
|
Worker
|
|
PollInterval time.Duration
|
|
JitterAmount time.Duration
|
|
}
|
|
|
|
func (w *PollingSensorWorker) Disabled() bool {
|
|
return w.PollingSensorType == nil
|
|
}
|
|
|
|
func (w *PollingSensorWorker) Start(ctx context.Context) (<-chan sensor.Entity, error) {
|
|
// Create a new context for the updates scope.
|
|
updatesCtx, cancelFunc := context.WithCancel(ctx)
|
|
// Save the context cancelFunc in the worker to be used as part of its
|
|
// Stop() method.
|
|
w.cancelFunc = cancelFunc
|
|
// Create a child logger for the worker.
|
|
|
|
return handleSensorPolling(updatesCtx, w.PollInterval, w.JitterAmount, w.PollingSensorType), nil
|
|
}
|
|
|
|
func NewPollingSensorWorker(id string, interval, jitter time.Duration) *PollingSensorWorker {
|
|
return &PollingSensorWorker{
|
|
Worker: Worker{WorkerID: id},
|
|
PollInterval: interval,
|
|
JitterAmount: jitter,
|
|
}
|
|
}
|
|
|
|
// OneShotSensorWorker is a worker that runs one-time, generates sensors, then
|
|
// exits.
|
|
type OneShotSensorWorker struct {
|
|
OneShotSensorType
|
|
Worker
|
|
}
|
|
|
|
func (w *OneShotSensorWorker) Disabled() bool {
|
|
return w.OneShotSensorType == nil
|
|
}
|
|
|
|
func (w *OneShotSensorWorker) Start(ctx context.Context) (<-chan sensor.Entity, error) {
|
|
// Create a new context for the updates scope.
|
|
updatesCtx, cancelFunc := context.WithCancel(ctx)
|
|
// Save the context cancelFunc in the worker to be used as part of its
|
|
// Stop() method.
|
|
w.cancelFunc = cancelFunc
|
|
// Create a child logger for the worker.
|
|
|
|
return handleSensorOneShot(updatesCtx, w.OneShotSensorType), nil
|
|
}
|
|
|
|
func NewOneShotSensorWorker(id string) *OneShotSensorWorker {
|
|
return &OneShotSensorWorker{
|
|
Worker: Worker{WorkerID: id},
|
|
}
|
|
}
|
|
|
|
type EventWorker struct {
|
|
EventType
|
|
Worker
|
|
}
|
|
|
|
func (w *EventWorker) Disabled() bool {
|
|
return w.EventType == nil
|
|
}
|
|
|
|
func (w *EventWorker) Start(ctx context.Context) (<-chan event.Event, error) {
|
|
// Create a new context for the updates scope.
|
|
updatesCtx, cancelFunc := context.WithCancel(ctx)
|
|
// Save the context cancelFunc in the worker to be used as part of its
|
|
// Stop() method.
|
|
w.cancelFunc = cancelFunc
|
|
// Create a child logger for the worker.
|
|
|
|
return handleEvents(updatesCtx, w.EventType), nil
|
|
}
|
|
|
|
func NewEventWorker(id string) *EventWorker {
|
|
return &EventWorker{
|
|
Worker: Worker{WorkerID: id},
|
|
}
|
|
}
|
|
|
|
// handleSensorPolling: create an updater function to run the worker's Sensors
|
|
// function and pass this to the PollSensors helper, using the interval
|
|
// and jitter the worker has requested.
|
|
func handleSensorPolling(ctx context.Context, interval, jitter time.Duration, worker PollingSensorType) <-chan sensor.Entity {
|
|
outCh := make(chan sensor.Entity)
|
|
|
|
updater := func(d time.Duration) {
|
|
// Send the delta (time since last poll) to the worker. Some workers may
|
|
// not use this value and the UpdateDelta for them will be a no-op.
|
|
worker.UpdateDelta(d)
|
|
// Get the updated sensors.
|
|
sensors, err := worker.Sensors(ctx)
|
|
if err != nil {
|
|
logging.FromContext(ctx).
|
|
With(slog.String("worker_type", "polling")).
|
|
Error("Worker error occurred.", slog.Any("error", err))
|
|
|
|
return
|
|
}
|
|
|
|
if len(sensors) == 0 {
|
|
logging.FromContext(ctx).
|
|
With(slog.String("worker_type", "polling")).
|
|
Warn("Worker returned no sensors.")
|
|
|
|
return
|
|
}
|
|
|
|
for _, s := range sensors {
|
|
outCh <- s
|
|
}
|
|
}
|
|
go func() {
|
|
defer close(outCh)
|
|
helpers.PollSensors(ctx, updater, interval, jitter)
|
|
}()
|
|
|
|
return outCh
|
|
}
|
|
|
|
// handleSensorEvents: read sensors from the worker Events function and pass these on.
|
|
func handleSensorEvents(ctx context.Context, worker EventSensorType) <-chan sensor.Entity {
|
|
outCh := make(chan sensor.Entity)
|
|
|
|
go func() {
|
|
defer close(outCh)
|
|
|
|
eventCh, err := worker.Events(ctx)
|
|
if err != nil {
|
|
logging.FromContext(ctx).
|
|
With(slog.String("worker_type", "events")).
|
|
Debug("Unable to retrieve sensor events.", slog.Any("error", err))
|
|
|
|
return
|
|
}
|
|
|
|
for s := range eventCh {
|
|
outCh <- s
|
|
}
|
|
}()
|
|
|
|
return outCh
|
|
}
|
|
|
|
// handleSensorOneShot: run the worker Sensors function to gather the sensors, pass these
|
|
// through the channel, then close it.
|
|
func handleSensorOneShot(ctx context.Context, worker OneShotSensorType) <-chan sensor.Entity {
|
|
outCh := make(chan sensor.Entity)
|
|
|
|
go func() {
|
|
defer close(outCh)
|
|
|
|
sensors, err := worker.Sensors(ctx)
|
|
if err != nil {
|
|
logging.FromContext(ctx).
|
|
With(slog.String("worker_type", "oneshot")).
|
|
Debug("Unable to retrieve sensors.", slog.Any("error", err))
|
|
|
|
return
|
|
}
|
|
|
|
for _, s := range sensors {
|
|
outCh <- s
|
|
}
|
|
}()
|
|
|
|
return outCh
|
|
}
|
|
|
|
// handleEvents: read events from the worker Events function and pass these on.
|
|
func handleEvents(ctx context.Context, worker EventType) <-chan event.Event {
|
|
outCh := make(chan event.Event)
|
|
|
|
go func() {
|
|
defer close(outCh)
|
|
|
|
eventCh, err := worker.Events(ctx)
|
|
if err != nil {
|
|
logging.FromContext(ctx).
|
|
With(slog.String("worker_type", "events")).
|
|
Debug("Unable to retrieve sensor events.", slog.Any("error", err))
|
|
|
|
return
|
|
}
|
|
|
|
for s := range eventCh {
|
|
outCh <- s
|
|
}
|
|
}()
|
|
|
|
return outCh
|
|
}
|