joshuar-go-hass-agent/internal/linux/disk/ioWorker.go

147 lines
3.9 KiB
Go

// Copyright (c) 2024 Joshua Rich <joshua.rich@gmail.com>
//
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT
//revive:disable:unused-receiver
package disk
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"github.com/joshuar/go-hass-agent/internal/hass/sensor"
"github.com/joshuar/go-hass-agent/internal/linux"
"github.com/joshuar/go-hass-agent/internal/logging"
)
const (
ratesUpdateInterval = 5 * time.Second
ratesUpdateJitter = time.Second
ratesWorkerID = "disk_rates_sensors"
totalsID = "total"
)
// ioWorker creates sensors for disk IO counts and rates per device. It
// maintains an internal map of devices being tracked.
type ioWorker struct {
boottime time.Time
devices map[string][]*diskIOSensor
linux.PollingSensorWorker
delta time.Duration
mu sync.Mutex
}
// addDevice adds a new device to the tracker map. If sthe device is already
// being tracked, it will not be added again. The bool return indicates whether
// a device was added (true) or not (false).
func (w *ioWorker) addDevice(dev *device) {
w.mu.Lock()
defer w.mu.Unlock()
if _, found := w.devices[dev.id]; !found {
w.devices[dev.id] = newDeviceSensors(w.boottime, dev)
}
}
// updateDevice will update a tracked device's stats. For rates, it will
// recalculate based on the given time delta.
func (w *ioWorker) updateDevice(id string, stats map[stat]uint64, delta time.Duration) []sensor.Entity {
w.mu.Lock()
defer w.mu.Unlock()
sensors := make([]sensor.Entity, len(w.devices[id]))
if _, found := w.devices[id]; found && stats != nil {
for idx := range w.devices[id] {
w.devices[id][idx].update(stats, delta)
sensors[idx] = *w.devices[id][idx].Entity
}
}
return sensors
}
func (w *ioWorker) UpdateDelta(delta time.Duration) {
w.delta = delta
}
func (w *ioWorker) Sensors(ctx context.Context) ([]sensor.Entity, error) {
// Get valid devices.
deviceNames, err := getDeviceNames()
if err != nil {
return nil, fmt.Errorf("could not fetch disk devices: %w", err)
}
var sensors []sensor.Entity
statsTotals := make(map[stat]uint64)
// Get the current device info and stats for all valid devices.
for _, name := range deviceNames {
dev, stats, err := getDevice(name)
if err != nil {
logging.FromContext(ctx).
With(slog.String("worker", ratesWorkerID)).
Debug("Unable to read device stats.", slog.Any("error", err))
continue
}
// Add device (if it isn't already tracked).
w.addDevice(dev)
// Update device stats and return updated sensors.
sensors = append(sensors, w.updateDevice(dev.id, stats, w.delta)...)
// Don't include "aggregate" devices in totals.
if strings.HasPrefix(dev.id, "dm") || strings.HasPrefix(dev.id, "md") {
continue
}
// Add device stats to the totals.
for stat, value := range stats {
statsTotals[stat] += value
}
}
// Update total stats.
sensors = append(sensors, w.updateDevice(totalsID, statsTotals, w.delta)...)
return sensors, nil
}
func NewIOWorker(ctx context.Context) (*linux.PollingSensorWorker, error) {
worker := linux.NewPollingSensorWorker(ratesWorkerID, ratesUpdateInterval, ratesUpdateJitter)
boottime, found := linux.CtxGetBoottime(ctx)
if !found {
return worker, fmt.Errorf("%w: no boottime value", linux.ErrInvalidCtx)
}
// Add sensors for a pseudo "total" device which tracks total values from
// all devices.
devices := make(map[string][]*diskIOSensor)
devices["total"] = newDeviceSensors(boottime, &device{id: totalsID})
worker.PollingSensorType = &ioWorker{
devices: devices,
boottime: boottime,
}
return worker, nil
}
func newDeviceSensors(boottime time.Time, dev *device) []*diskIOSensor {
return []*diskIOSensor{
newDiskIOSensor(dev, diskReads, boottime),
newDiskIOSensor(dev, diskWrites, boottime),
newDiskIOSensor(dev, diskReadRate, boottime),
newDiskIOSensor(dev, diskWriteRate, boottime),
newDiskIOSensor(dev, diskIOInProgress, boottime),
}
}