sudoisbot/sudoisbot/sensors/sensors.py

322 lines
9.7 KiB
Python

#!/usr/bin/python3
from subprocess import check_output, STDOUT, CalledProcessError
import json
from json.decoder import JSONDecodeError
import os.path
from dataclasses import dataclass, asdict, InitVar
import time
import serial
from loguru import logger
from temper.temper import Temper
W1ROOT = "/sys/bus/w1/devices"
W1LIST = "w1_bus_master1/w1_master_slaves"
class SensorDisconnectedError(Exception): pass
class NoSensorDetectedError(Exception): pass
class ArduinoError(Exception): pass
@dataclass
class TempSensor(object):
name: str
kind: str
environment: bool
def as_dict(self):
return asdict(self)
def __str__(self):
return f"<{self.name} [kind: {self.kind}, environment: {self.environment}]>"
@classmethod
def from_kind(cls, **kwargs):
kind = kwargs['kind'].lower()
objname = kind + "sensor"
sensorobjdict = {a.__name__.lower(): a for a in cls.__subclasses__()}
try:
sensorobj = sensorobjdict[objname]
return sensorobj(**kwargs)
except KeyError as e:
e.args += ("unknown sensor kind", )
raise
@classmethod
def autodetect(cls, name):
"""autodetects what kind of sensor is connected, only works
when the syste has one and only one sensor conncted and only supports
the name arg."""
for sensorobj in cls.__subclasses__():
try:
sensr = sensorobj(name)
logger.info(f"found '{sensor.kind}' sensor")
return connected
except (SensorDisconnectedError, NoSensorDetectedError):
continue
else:
raise NoSensorDetectedError("audotdetect found no sensors connected")
@dataclass(init=True)
class TemperSensor(TempSensor):
def _read(self):
# this function is to abstract away some error handling and make
# read() look nicer
try:
data = self._temper.read()
if len(data) == 0:
raise SensorDisconnectedError("temper: no data returned")
if len(data) > 1:
# i just have the model with one sensor. will expand if i get
# a different model at some point.
raise NotImplementedError("only supports Tempers with one sensor")
return data[0]
except FileNotFoundError as e:
msg = f"temper: {e.args[1]}"
logger.error(msg)
raise SensorDisconnectedError(msg) from e
except PermissionError as e:
raise NoSensorDetectedError(e) from e
def read(self):
reading = self._read()
try:
return {
'measurements': {'temp': reading['internal temperature'] }
}
except KeyError:
if 'firmware' in reading:
logger.error(f"temper usb: temp value missing from '{reading}'")
# makes the for loop just not loop over anything
return dict()
else:
raise
def __post_init__(self):
self._temper = Temper()
# so we bail if temper is configured but not connected/functional
# on start
# call .read() because it is doing error handling, some funky errors
# will slip past if youre trying to be smart about the exception stack
try:
firstreading = self._read()
logger.trace(firstreading)
except SensorDisconnectedError as e:
# NoSensorDetected is already raised in ._read()
#raise NoSensorDetectedError("temper: not connected") from e
raise
@dataclass
class DhtSensor(TempSensor):
dht_pin: InitVar[int]
def __post_init__(self, dht_pin):
if dht_pin:
self.dht_cmd = ["dht", str(dht_pin)]
else:
self.dht_cmd = ["dht"]
def read(self):
# the dht.c binary doesnt write to stderr at the moment
# but lets redirect stderr to stdout now in case i change
# that so this wont break
try:
output = check_output(self.dht_cmd, shell=False, stderr=STDOUT)
logger.trace(output)
joutput = json.loads(output)
return {
'measurements': {
'temp': joutput['temp'],
'humidity': joutput['humidity']
}}
except CalledProcessError as e:
raise SensorDisconnectedError("dht disconnected") from e
@dataclass
class Ds18b20Sensor(TempSensor):
sensor_id: str = None
# study: 28-0300a279f70f
# outdoor: 28-0300a279bbc9
def __post_init__(self):
ds18b20s = self._list_detected_ds18b20()
if len(ds18b20s) > 1 and self.sensor_id is None:
raise RuntimeError("need 'sensor_id' when > 1 ds18b20's connected")
elif self.sensor_id is None:
self.sensor_id = ds18b20s[0]
logger.info(f"set ds18b20 sensor_id to '{self.sensor_id}'")
self.sensorpath = os.path.join(W1ROOT, self.sensor_id, "w1_slave")
def _read_sensor(self):
try:
with open(self.sensorpath, 'r') as f:
return f.read().splitlines()
except FileNotFoundError:
raise SensorDisconnectedError(f"ds18b20: '{self.sensorpath}' not found")
def _parse_sensor_data(self):
# YES = checksum matches
data = self._read_sensor()
if len(data) == 0:
# File "sudoisbot/temps/sensors.py", line 94, in _parse_data
# if not data[0].endswith("YES"):
# └ []
raise SensorDisconnectedError(f"ds18b20: no data")
if not data[0].endswith("YES"):
raise SensorDisconnectedError(f"ds18b20: got '{data}'")
tempstr = data[1].rsplit(" ", 1)[1][2:]
return int(tempstr)/1000.0
def _list_detected_ds18b20(self):
w1_listfile = os.path.join(W1ROOT, W1LIST)
with open(w1_listfile, 'r') as f:
w1_ids = f.read().splitlines()
if len(w1_ids) == 0:
raise NoSensorDetectedError("no ds18b20 sensors connected")
if not all(a.startswith("28-") for a in w1_ids):
# something funky is going on, if this error happens
# then investigate
raise NoSensorDetectedError(f"unexpected values in '{w1_listfile}': {w1_ids}")
return w1_ids
def read(self):
return {
'measurements': { 'temp': self._parse_sensor_data() },
'meta': {'sensorid': self.sensor_id }
}
@dataclass
class ArduinoSensor(object):
# ard_loop_time = how often arduino should send a value in seconds
# called 'timeout' in arduino code
# needs a better name
# especially since the next line also has a timeout variable
# but thats the serial read timeout
name: str
kind: str
device: InitVar[str] = "/dev/ttyUSB0" # linux is a sane default
baudrate: InitVar[int] = 9600
ard_loop_timeout: int = 5 # seconds
# device = "/dev/cu.usbserial-A800eGKH"
# device="/dev/ttyUSB0"
def __post_init__(self, device, baudrate):
assert self.kind.startswith("arduino-")
ser_timeout = float(self.ard_loop_timeout) # seconds
logger.debug(f"serial timeout: {ser_timeout}s")
try:
self.ser = serial.Serial(device, baudrate, timeout=ser_timeout)
except serial.SerialException as e:
raise NoSensorDetectedError(e)
def as_dict(self):
return asdict(self)
def hello(self):
for i in range(5):
try:
data = self.ser.readline()
jdata = json.loads(data)
# 'true' is hardcoded..
return jdata['ready']
except (KeyError, JSONDecodeError, UnicodeDecodeError) as e:
# need to polish this when im able to reproduce
# maybe figure out why it happens
logger.warning(f"got invalid json: {data}")
except serial.serialutil.SerialException as e:
logger.error(e)
logger.debug(f"waiting 5s to try again {i}/5")
time.sleep(5.0)
else:
raise NoSensorDetectedError("no data from arduino")
def start(self):
ready = self.hello()
# \n is important !
logger.success(f"{self.name} ready: {ready}")
timeout_ms = self.ard_loop_timeout * 1000
logger.info(f"getting data on {timeout_ms}ms interval")
self.ser.write(f"{timeout_ms}\r\n".encode())
def __enter__(self):
# if i want to use this not as a context manager ill need
# self.started
self.ser.__enter__()
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.ser.__exit__(exc_type, exc_value, traceback)
def iter_lines(self):
while True:
line = self.ser.readline()
logger.trace(line)
if line == b"":
continue
try:
jline = json.loads(line)
if "error" in jline:
raise ArduinoError(jline['error'])
yield jline
except JSONDecodeError:
logger.warning(f"discarging garbage: {line}")
@dataclass
class ArduinoRainSensor(ArduinoSensor):
def iter_lines(self):
for jline in super().iter_lines():
rain = jline['digital'] == "LOW"
yield {
'digital': jline['digital'],
'rain': rain
}
@dataclass
class ArduinoCurrentSensor(ArduinoSensor):
def iter_lines(self):
for jline in super().iter_lines():
if jline['on_fire']:
logger.error("sensor is on fire")
yield {'current': jline['value']}