166 lines
4.9 KiB
Python
166 lines
4.9 KiB
Python
#!/usr/bin/python3 -u
|
|
|
|
|
|
import os
|
|
import json
|
|
from time import sleep
|
|
import sys
|
|
|
|
from loguru import logger
|
|
import zmq
|
|
|
|
from sudoisbot.network.sub import Subscriber, SubscriberTimedOutError
|
|
from sudoisbot.sink.models import Temperatures, Humidities, People, Weather, dbconnect
|
|
|
|
|
|
def as_bytes(astring):
|
|
if isinstance(astring, bytes):
|
|
return astring
|
|
else:
|
|
return astring.encode()
|
|
|
|
|
|
class ZFluxClient(object):
|
|
def __init__(self, addr=None, topic=None):
|
|
self.addr = addr
|
|
self.topic = as_bytes(topic)
|
|
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.PUB)
|
|
|
|
if addr:
|
|
self.connect()
|
|
|
|
def connect(self, addr=None):
|
|
if not addr:
|
|
addr = self.addr
|
|
self.socket.connect(addr)
|
|
logger.info(f"connected to: {addr}, emitting on topic: {self.topic}")
|
|
|
|
def disconnect(self):
|
|
self.socket.close()
|
|
self.context.destroy()
|
|
logger.debug("zflux client disconnected")
|
|
|
|
def __enter__(self):
|
|
if self.addr:
|
|
self.connect(self.addr)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
#self.disconnect()
|
|
pass
|
|
|
|
def send(self, msg):
|
|
self.socket.send_multipart([self.topic, json.dumps(msg).encode()])
|
|
|
|
class Sink(object):
|
|
def __init__(self, topics, write_path, zflux=None):
|
|
self.zflux = zflux
|
|
self.topics = topics
|
|
self.setup_loggers(write_path)
|
|
|
|
def setup_loggers(self, writepath):
|
|
# change to 11 or 19 to show with debug logging
|
|
logger.level("TXT", no=9, color="<yellow>")
|
|
logger.level("SINK", no=11, color="<green>")
|
|
|
|
for topic in self.topics:
|
|
def matcher(topic):
|
|
def inner(arg):
|
|
extra_topic = arg['extra'].get('topic', b"")
|
|
return extra_topic == as_bytes(topic)
|
|
return inner
|
|
|
|
|
|
logger.add(os.path.join(writepath, f"{topic}.txt"),
|
|
level="TXT", format="{message}",
|
|
filter=matcher(topic))
|
|
|
|
def make_subscriber(self, addr):
|
|
return Subscriber(addr, self.topics)
|
|
|
|
def listen(self, addr):
|
|
try:
|
|
# with self.make_subscriber(addr) as sub:
|
|
# for topic, msg in sub.recv():
|
|
# self.handle_msg(topic, msg)
|
|
#
|
|
# commented out because testing to no gracefully disconnected to get
|
|
# publishers to buffer when sink is dead
|
|
sub = self.make_subscriber(addr)
|
|
sub.connect()
|
|
for topic, msg, cached in sub.recv():
|
|
if cached:
|
|
logger.info(f"got a cached {topic} message from {msg['time']}")
|
|
self.handle_msg(topic, msg)
|
|
|
|
except zmq.error.Again:
|
|
logger.info(f"timeout after {sub.rcvtimeo_secs}s..")
|
|
raise SubscriberTimedOutError
|
|
|
|
def handle_msg(self, topic, msg):
|
|
self.log(topic, msg)
|
|
|
|
self.append_file(topic, msg)
|
|
self.update_db(topic, msg) # todo: keep records in sql
|
|
self.send_zflux(msg)
|
|
|
|
def update_db(self, topic, msg):
|
|
if topic == b"temp":
|
|
if msg['measurement'] == "temp":
|
|
Temperatures.insert_msg(msg)
|
|
elif msg['measurement'] == "humidity":
|
|
Humidities.insert_msg(msg)
|
|
elif topic == b'unifi':
|
|
People.update_state_if_changed(msg)
|
|
elif topic == b'weather':
|
|
Weather.insert_msg(msg)
|
|
|
|
|
|
def send_zflux(self, msg):
|
|
if self.zflux:
|
|
self.zflux.send(msg)
|
|
|
|
def append_file(self, topic, msg):
|
|
logger.bind(topic=topic).log("TXT", json.dumps(msg))
|
|
|
|
def log(self, topic, msg):
|
|
measurement = msg['measurement']
|
|
name = msg['tags']['name']
|
|
|
|
if 'value' in msg['fields']:
|
|
value = f": {msg['fields']['value']}"
|
|
else:
|
|
value = ""
|
|
logger.log("SINK", f"{topic}: {measurement} from '{name}'{value}")
|
|
|
|
def main(args, config):
|
|
|
|
db = dbconnect(**config['mysql'])
|
|
|
|
with ZFluxClient(topic=config['zflux']['topic']) as zflux:
|
|
zflux.connect(config['zflux']['addr'])
|
|
|
|
write_path = args.write_path or config['sink']['write_path']
|
|
|
|
sink = Sink(config['sink']['topics'], write_path, zflux)
|
|
while True:
|
|
try:
|
|
addr = config['sink']['addr']
|
|
sink.listen(addr)
|
|
|
|
except SubscriberTimedOutError:
|
|
sleep(1.0)
|
|
logger.info("reconnecting")
|
|
except KeyboardInterrupt:
|
|
logger.info("ok ill leave then")
|
|
return
|
|
|
|
def suicide_snail(timestamp, max_delay):
|
|
# suicide snail (move to common sub code?)
|
|
delay = datetime.now() - datetime.fromisoformat(timestamp)
|
|
if min(delay.seconds, 0) > max_delay:
|
|
logger.error(f"suicide snail: {delay.seconds} secs")
|
|
raise SystemExit("suicide snail")
|