dev work #1

Manually merged
ben merged 18 commits from dev into master 2020-12-18 16:01:46 +00:00
49 changed files with 3197 additions and 1226 deletions

4
.gitignore vendored
View File

@ -1,10 +1,12 @@
config.py
*~
*.pyc
dist/
build/
logs/
MANIFEST
sudoisbot.egg-info/
.#*
\#*
sudoisbot.yml
*-default.yml
notes/

View File

@ -1,14 +1,19 @@
FROM python:2.7
FROM python:3.8
MAINTAINER Benedikt Kristinsson <benedikt@lokun.is>
RUN mkdir /sudoisbot
WORKDIR /sudoisbot
RUN useradd -u 1210 -ms /bin/bash sudoisbot
COPY setup.py /sudoisbot
COPY README.md /sudoisbot
COPY bin /sudoisbot/bin
COPY sudoisbot /sudoisbot/sudoisbot
COPY dist/sudoisbot-latest.tar.gz /opt/sudoisbot.tar.gz
RUN python setup.py install
# should build dependencies first
RUN pip install /opt/sudoisbot.tar.gz
COPY sudoisbot.yml /etc/sudoisbot.yml
ENTRYPOINT ["python", "/usr/local/bin/tglistener.py"]
# idea is to override with bind mounts
# since config.py doesnt do env vars as-is
ENV SUDOISBOT_CONF "/etc/sudoisbot.yml"
ENV SUDOISBOT_LOGFILE "/data/sudoisbot.log"
USER sudoisbot
EXPOSE 5559
EXPOSE 5560

13
Dockerfile.build Normal file
View File

@ -0,0 +1,13 @@
FROM benediktkr/poetry:latest
MAINTAINER Benedikt Kristinsson <benedikt@lokun.is>
RUN mkdir /builddir
COPY pyproject.toml /builddir/pyproject.toml
COPY poetry.lock /builddir/poetry.lock
COPY sudoisbot/ /builddir/sudoisbot/
WORKDIR /builddir
ENTRYPOINT ["poetry"]
CMD ["build"]

14
Dockerfile.old Normal file
View File

@ -0,0 +1,14 @@
FROM python:2.7
RUN mkdir /sudoisbot
WORKDIR /sudoisbot
COPY setup.py /sudoisbot
COPY README.md /sudoisbot
COPY bin /sudoisbot/bin
COPY sudoisbot /sudoisbot/sudoisbot
RUN python setup.py install
COPY sudoisbot.yml /etc/sudoisbot.yml
ENTRYPOINT ["python", "/usr/local/bin/tglistener.py"]

30
Dockerfile.test Normal file
View File

@ -0,0 +1,30 @@
FROM python:3.8
MAINTAINER Benedikt Kristinsson <benedikt@lokun.is>
RUN useradd -u 1210 -ms /bin/bash sudoisbot
RUN mkdir /src && pip install poetry && poetry config virtualenvs.create false
WORKDIR /src
COPY pyproject.toml /src/pyproject.toml
COPY poetry.lock /src/poetry.lock
RUN poetry install --no-root
COPY sudoisbot/ /src/sudoisbot/
RUN poetry build
# should build dependencies first
#COPY dist/sudoisbot-latest.tar.gz /opt/sudoisbot.tar.gz
#RUN pip install /opt/sudoisbot.tar.gz
# idea is to override with bind mounts
# since config.py doesnt do env vars as-is
ENV SUDOISBOT_CONF "/etc/sudoisbot.yml"
ENV SUDOISBOT_LOGFILE "/data/sudoisbot.log"
USER sudoisbot
EXPOSE 5559
EXPOSE 5560

5
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,5 @@
stage('build') {
steps {
sh 'env'
}
}

17
Makefile Normal file
View File

@ -0,0 +1,17 @@
SHELL=/bin/bash
NAME=$(shell basename $(CURDIR))
POETRY_VERSION=$(shell poetry version)
NAMESIZE=$(shell ${#NAME})
VERSION=$(shell echo ${POETRY_VERSION:8} )
build: poetry-build docker-build docker-tag
poetry-build:
poetry build ${VERSION}
docker-build:
docker build -t sudoisbot .
docker-tag:
docker tag sudoisbot benediktkr/sudoisbot:latest

22
build Executable file
View File

@ -0,0 +1,22 @@
#!/bin/bash
# set the version in the script and use it for a docker tag too
# make this a makefile
arg=$1
version=$ grep "^version.*=" pyproject.toml | awk -F'"' '{print $2}')
poetry build -f sdist
sha1sum sudoisbot-${version}.tar.gz
docker build -t sudoisbot .
docker tag sudoisbot benediktkr/sudoisbot:latest
docker tag sudoisbot benediktkr/sudoisbot:$version
if [ "$arg" = "push" ]; then
docker push benediktkr/sudoisbot:latest
docker push benediktkr/sudoisbot:$version
fi

26
build.sh Executable file
View File

@ -0,0 +1,26 @@
#!/bin/bash
set -e
# set the version in the script and use it for a docker tag too
# make this a makefile
if [ "$1" = "" ]; then
version=$(grep "^version.*=" pyproject.toml | awk -F'"' '{print $2}')
else
version=$1
poetry version $version
fi
poetry build -f sdist
cp dist/sudoisbot-${version}.tar.gz dist/sudoisbot-latest.tar.gz
docker build -t sudoisbot .
docker tag sudoisbot benediktkr/sudoisbot:latest
docker tag sudoisbot benediktkr/sudoisbot:$version
docker push benediktkr/sudoisbot:latest
docker push benediktkr/sudoisbot:$version
#git add pyproject.toml
#git commit -m "verison bumped to $version"

18
docker/proxy/Dockerfile Normal file
View File

@ -0,0 +1,18 @@
FROM python:3.8
MAINTAINER Benedikt Kristinsson <benedikt@lokun.is>
#RUN mkdir /sudoisbot
#COPY sudoisbot/ /sudoisbot/sudoisbot/
#COPY pyproject.toml /sudoisbot/pyproject.toml
#COPY poetry.lock /sudoisbot/poetry.lock
#RUN pip install /sudoisbot
ENV SUDOISBOT_VERSION "0.2.1"
COPY dist/sudoisbot-${SUDOISBOT_VERSION}.tar.gz /opt/sudoisbot.tar.gz
RUN pip install /opt/sudoisbot.tar.gz && rm /opt/sudoisbot.tar.gz
# idea is to override with bind mounts
# since config.py doesnt do env vars as-is
ENV SUDOISBOT_CONF "/etc/sudoisbot.yml"
EXPOSE 5559
EXPOSE 5560

928
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,48 +1,34 @@
[tool.poetry]
name = "sudoisbot"
version = "0.2.1"
description = ""
version = "0.3.10.post4"
description = "a home automation and monitoring system written to learn zmq"
authors = ["Benedikt Kristinsson <benedikt@lokun.is>"]
repository = "https://github.com/benediktkr/sudoisbot"
[tool.poetry.dependencies]
python = "^3.7"
python-telegram-bot = "*"
PyYAML = "*"
zmq = "^0.0.0"
numpy = "^1.18.4"
matplotlib = "^3.2.1"
loguru = "^0.5.0"
temper = {git = "https://github.com/benediktkr/temper.git"}
requests = "^2.23.0"
peewee = "^3.13.3"
pyzmq = "^19.0.2"
pyyaml = "^5.3.1"
sudoistemper = "^0.1.0"
peewee = {version = "^3.14.0", optional = true}
python-telegram-bot = {version = "^13.1", optional = true}
matplotlib = {version = "^3.3.3", optional = true}
numpy = {version = "^1.19.4", optional = true}
requests = {version = "^2.25.0", optional = true}
PyMySQL = {version = "^0.10.1", optional = true}
python-dateutil = {version = "^2.8.1", optional = true}
[tool.poetry.extras]
graphs = ["numpy", "matplotlib"]
sink = ["peewee", "PyMySql", "requests", "python-telegram-bot"]
utils = ["python-dateutil"]
[tool.poetry.dev-dependencies]
pytest = "^5.2"
[tool.poetry.scripts]
tglistener = "sudoisbot.tglistener:main"
sendtelegram = "sudoisbot.sendtelegram:main"
proxy = "sudoisbot.network.proxy:pubsub_listener"
proxy_pubsub = "sudoisbot.network.proxy:pubsub_listener"
broker = "sudoisbot.network.broker:main"
sink = "sudoisbot.sink.sink:main"
graphtemps = "sudoisbot.sink.graphtemps:main"
temper_sub = "sudoisbot.sink.sink:main"
rain_notify = "sudoisbot.sink.notifier:main"
screen_pub = "sudoisbot.screen.screen_pub:main"
unifi_clients = "sudoisbot.unifi_clients:show_clients"
recenttemps = "sudoisbot.recenttemps:main"
weather_pub = "sudoisbot.weather.weather_pub:main"
temper_pub = "sudoisbot.temps.temp_pub:main"
sudoisbot = "sudoisbot:main"
[build-system]
requires = ["poetry>=0.12"]

View File

@ -1,12 +0,0 @@
---
telegram:
api_key: "your-token"
bot:
me:
username: your-username
id: your-id
authorized_users:
- your-id

View File

@ -1 +1,107 @@
__version__ = '0.1.0'
#!/usr/bin/python3 -u
__version__ = '0.3.0'
import argparse
import os
import sys
from sudoisbot.config import read_config
"""
tglistener = "sudoisbot.tglistener:main"
sendtelegram = "sudoisbot.sendtelegram:main"
# these will pretty much only be run while developing so just use
# poetry run python unifi/clients.py
# or something like that
#recenttemps = "sudoisbot.recenttemps:main"
#unifi_clients = "sudoisbot.unifi_clients:show_clients"
#graphtemps = "sudoisbot.sink.graphtemps:main"
"""
def run_temp_pub(args, config):
import sudoisbot.sensors.temp_pub
return sudoisbot.sensors.temp_pub.main(config)
def run_sink(args, config):
import sudoisbot.sink.sink
return sudoisbot.sink.sink.main(args, config)
def run_proxy(args, config):
from sudoisbot.network import proxy
return proxy.main_buffering(args, config)
def run_weather_pub(args, config):
from sudoisbot.apis import weather_pub
return weather_pub.main(config)
def run_screen_pub(args, config):
from sudoisbot.screen import screen_pub
return screen_pub.main(args, config)
def run_unifi_pub(args, config):
from sudoisbot.apis import unifi
if args.show_clients:
return unifi.show_clients(config)
else:
return unifi.main(config)
def run_rain_pub(args, config):
from sudoisbot.sensors import rain_pub
return rain_pub.main(config)
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# will default to env var for config path, and allow
# overriding with --config
env_confpath = os.environ.get("SUDOISBOT_CONF", None)
parser.add_argument("--config", default=env_confpath,
help="overrides default with $SUDOISBOT_CONF if set")
subparser = parser.add_subparsers(dest="cmd")
subparser.required = True
parser_sink = subparser.add_parser('sink', help="start sink")
parser_sink.add_argument("--write-path")
parser_sink.set_defaults(func=run_sink)
parser_proxy = subparser.add_parser('proxy', help="start proxy")
parser_proxy.add_argument('--forwarder', action='store_true')
parser_proxy.add_argument('--capture', action='store_true')
parser_proxy.set_defaults(func=run_proxy)
parser_temp_pub = subparser.add_parser('temp_pub', help="start temp_publisher")
parser_temp_pub.set_defaults(func=run_temp_pub)
parser_rain_pub = subparser.add_parser('rain_pub', help="start rain_pub")
parser_rain_pub.set_defaults(func=run_rain_pub)
parser_screen_pub = subparser.add_parser('screen_pub', help="start screen_pub")
parser_screen_pub.add_argument("--no-loop", action="store_true")
parser_screen_pub.add_argument("--dry-run", action="store_true")
parser_screen_pub.add_argument("--rotation", type=int)
parser_screen_pub.add_argument("--statedir")
parser_screen_pub.set_defaults(func=run_screen_pub)
parser_weather_pub = subparser.add_parser('weather_pub', help="start weather_pub")
parser_weather_pub.set_defaults(func=run_weather_pub)
parser_unifi_pub = subparser.add_parser('unifi_pub', help="start unifi_pub")
parser_unifi_pub.add_argument("--show-clients", action="store_true")
parser_unifi_pub.set_defaults(func=run_unifi_pub)
args = parser.parse_args()
config = read_config(args.config)
#if args.cmd not in config['allowed_cmds']:
# parser.error(f"config {config['file_path']} is not configured for '{cmd}'")
rc = args.func(args, config)
sys.exit(rc)

View File

@ -6,13 +6,54 @@
import json
from urllib.parse import urljoin
from itertools import groupby
from datetime import datetime, timezone
import urllib3
urllib3.disable_warnings()
import requests
from requests.exceptions import RequestException
from loguru import logger
from sudoisbot.common import init
from sudoisbot.network.pub import Publisher
class UnifiPublisher(Publisher):
def __init__(self, addr, freq, unifi_config, people, location):
super().__init__(addr, b"unifi", "unifi", freq)
self.unifi_config = unifi_config
self.people = people
self.location = location
def publish(self):
try:
# constructor logs in
api = UnifiApi(self.unifi_config)
wifi_clients = api.get_client_names()
except RequestException as e:
logger.error(e)
raise # ???
home = dict()
for initials, devices in self.people.items():
home[initials] = any(d in wifi_clients for d in devices)
data = {
'measurement': 'people',
'time': datetime.now(timezone.utc).isoformat(),
'tags': {
'name': 'unifi',
'frequency': self.frequency,
'location': self.location
},
'fields': home
}
self.pub(data)
#print(data)
class UnifiApi(object):
def __init__(self, unifi_config):
@ -80,5 +121,22 @@ class UnifiApi(object):
logger.warning(f"weird client on unifi: {client}")
return names
if __name__ == "__main__":
show_clients()
def main(config):
addr = config['addr']
name = 'unifi'
sleep = 60
unifi_config = config['unifi']
people = config['people']
location = config['location']
with UnifiPublisher(addr, sleep, unifi_config, people, location) as pub:
pub.loop()
def show_clients(config):
unifi_config = config['unifi']
api = UnifiApi(unifi_config)
for client in api.get_clients_short():
logger.info(client)

View File

@ -0,0 +1,195 @@
#!/usr/bin/python3
# met.no:
#
# tuncate lat/long to 4 decimals
#
# Reponse headers (firefox):
#
# Date Thu, 25 Jun 2020 20:55:23 GMT
# Expires Thu, 25 Jun 2020 21:26:39 GMT
#
# Seems like 30 mins, but check "Expires"
#
# Use "If-Modified-Since" request header
#
# Depending on how i do this, add a random number of mins/secs to
# not do it on the hour/minute
#
# must support redirects and gzip compression (Accept-Encoding: gzip, deflate)
#
# openweatherap:
#
#
# triggers: https://openweathermap.org/triggers
# - polling
# - may as well poll nowcast
#
# ratelimit: 60 calls/minute
#
# weather condition codes: https://openweathermap.org/weather-conditions#Weather-Condition-Codes-2
#
# maybe interesting project: https://github.com/aceisace/Inky-Calendar
from datetime import datetime, timezone
from decimal import Decimal
import time
import json
import requests
from loguru import logger
from requests.exceptions import RequestException
from sudoisbot.network.pub import Publisher
from sudoisbot.config import read_config
# rain_conditions = [
# 'rain',
# 'drizzle',
# 'thunderstorm'
# ]
# # raining = 'rain' in main.lower() or 'rain' in desc.lower()
# # snowing = 'snow' in main.lower() or 'snow' in desc.lower()
# # drizzling = 'drizzle' in main.lower() or 'drizzle' in desc.lower()
# # thunderstorm = 'thunderstorm' in main.lower() or 'thunderstorm' in desc.lower()
# # any_percip = raining or snowing or drizzling or thunderstorm
# # if any_percip:
# # logger.bind(odd=True).trace(json.dumps(w))
# # precipitation = {
# # 'raining': raining,
# # 'snowing': snowing,
# # 'drizzling': drizzling,
# # 'thunderstorm': thunderstorm,
# # 'any': any_percip
# # }
def useragent():
import pkg_resources
version = pkg_resources.get_distribution('sudoisbot').version
return f"sudoisbot/{version} github.com/benediktkr/sudoisbot"
OWM_URL = "https://api.openweathermap.org/data/2.5/weather?lat={lat:.4f}&lon={lon:.4f}&appid={token}&sea_level={msl}&units=metric"
class NowcastPublisher(Publisher):
def __init__(self, addr, locations, token, frequency):
super().__init__(addr, b"weather", None, frequency)
self.locations = [{
'name': a['name'],
'lat': Decimal(a['lat']),
'lon': Decimal(a['lon']),
'msl': a['msl']
} for a in locations]
self.token = token
self.base_url = OWM_URL
self.session = requests.Session()
self.session.headers.update({"User-Agent": useragent(),
"Accept": "application/json"})
def get_nowcast(self, location):
url = self.base_url.format(token=self.token, **location)
r = self.session.get(url)
r.raise_for_status()
if r.status_code == 203:
logger.warning("deprecation warning: http 203 returned")
w = r.json()
d = dict(
desc = ', '.join([a['description'] for a in w['weather']]),
main = ', '.join([a['main'] for a in w['weather']]),
temp = float(w['main']['temp']),
feel_like = float(w['main']['feels_like']),
pressure = float(w['main']['pressure']),
humidity = float(w['main']['humidity']),
wind_speed = float(w['wind'].get('speed', 0.0)),
wind_deg = float(w['wind'].get('deg', 0.0)),
visibility = w['visibility'],
cloudiness = w['clouds']['all'],
dt = w['dt'],
# misnomer on my behalf
# .fromtimestamp() -> converts to our tz (from UTC)
# .utcfromtimestamp() -> returns in UTC
weather_dt = datetime.fromtimestamp(w['dt']).isoformat()
)
# only in the data when it's been raining/showing
if 'rain' in w:
rain_1h = w['rain'].get('1h'),
rain_3h = w['rain'].get('3h'),
if 'snow' in w:
snow_1h = w['snow'].get('1h'),
snow_3h = w['snow'].get('3h'),
return d
def publish(self):
try:
for location in self.locations:
nowcast = self.get_nowcast(location)
self.send(location['name'], nowcast)
time.sleep(0.2)
except RequestException as e:
logger.error(e)
def send(self, name, weather):
now = datetime.now(timezone.utc).isoformat()
tags = {
'name': name,
'frequency': self.frequency,
'type': 'weather',
'kind': 'weather',
'source': 'api',
'environment': 'outside',
'location': name,
}
data = {
'measurement': self.topic.decode(),
'tags': tags,
'time': now,
'fields': weather
}
# for legacy and consistency reasons
for measurement in ['temp', 'humidity']:
data2 = {
'measurement': measurement,
'tags': tags,
'time': now,
'fields': {'value': weather[measurement] }
}
jdata = json.dumps(data2)
self.socket.send_multipart([b'temp', jdata.encode()])
#bytedata = json.dumps(data).encode()
#logger.debug(bytedata)
#self.socket.send_multipart([self.topic, bytedata])
msg = self.pub(data)
logger.trace(msg)
def main(config):
addr = config['addr']
locations = config['locations']
token = config['owm_token']
freq = config['frequency']
with NowcastPublisher(addr, locations, token, freq) as publisher:
publisher.loop()

View File

@ -2,17 +2,16 @@ import argparse
import copy
import os
import sys
from itertools import islice
from loguru import logger
import yaml
from sudoisbot.sendmsg import send_to_me
def useragent():
import pkg_resources
version = pkg_resources.get_distribution('sudoisbot').version
return f"sudoisbot/{version} github.com/benediktkr/sudoisbot"
def chunk(it, size=10):
it = iter(it)
return list(iter(lambda: list(islice(it, size)), []))
def catch22():
def actual_decorator(decorated_function):
@ -81,6 +80,11 @@ def read_configfile(name, section):
_section.setdefault(s, _d)
return _section
except KeyError:
# throws an error sayign eg "section temper_pub not found" but the
# actual problem is that "logging" insnt found (deepcopy stuff raises
# the exception.
#
# really need to rewrite this crap.....
logger.error("Section '{}' not found in '{}'",
section, conffile)
sys.exit(1)
@ -183,6 +187,13 @@ def init(name, argparser=None, fullconfig=False):
else:
raise
except PermissionError as e:
if args.verbose:
pass
else:
logger.error("try running with --verbose")
raise
# NOTE: used to disable deafult logger here
# my defaults have backtrace/diagnose disabled

47
sudoisbot/config.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/python3
import sys
import os
from loguru import logger
import yaml
def read_config(fullpath=None):
if 'SUDOISBOT_LOGFILE' in os.environ:
logfile = os.environ["SUDOISBOT_LOGFILE"]
loglevel = os.environ.get("SUDOISBOT_LOGLEVEL", "DEBUG")
logger.remove()
logger.add(sys.stderr, level=loglevel)
logger.add(logfile, level=loglevel)
logger.debug("configured logger for env vars")
if 'SUDOISBOT_CONF' in os.environ:
locations = [os.environ['SUDOISBOT_CONF']]
elif fullpath is not None:
fname = fullpath
locations = [fullpath]
else:
fname = "sudoisbot.yml"
locations = [
os.path.join('/etc/', fname),
os.path.join('/usr/local/etc', fname),
os.path.join(os.curdir, fname),
os.path.join(os.path.expanduser("~"), "." + fname)
]
for conffile in locations:
try:
with open(conffile, 'r') as cf:
config = yaml.safe_load(cf)
config['file_path'] = conffile
logger.info(f"config file: {conffile}")
return config
except IOError as e:
if e.errno == 2: continue
else: raise
else:
logger.error(f"config file not found: '{fname}', searched: {locations}")
raise SystemExit("No config file found")

79
sudoisbot/datatypes.py Normal file
View File

@ -0,0 +1,79 @@
#!/usr/bin/python3
from dataclasses import asdict, dataclass
@dataclass
class Typed:
def __post_init__(self):
for (name, field_type) in self.__annotations__.items():
if not isinstance(self.__dict__[name], field_type):
current_type = type(self.__dict__[name])
raise ValueError(f"The field '{name}' was '{current_type}' instead of '{field_type}'")
def as_dict(self):
return asdict(self)
def __getitem__(self, key):
from loguru import logger
logger.warning("not sure if i will keep this, prob use .as_dict()?")
return self.__dict__[key]
@dataclass
class Tags(Typed):
name: str
location: str
kind: str
@dataclass
class Message(Typed):
time: str
measurement: str
fields: dict
tags: Tags
@classmethod
def from_msg(cls, topic, msg):
# thee type of the 'fields' type
fields_type = cls.__annotations__['fields']
return cls(
measurement=topic.decode(),
time=msg['time'],
tags=Tags(**msg['tags']),
fields=fields_type(**msg['fields'])
)
@classmethod
def from_topic(cls, topic, msg):
if topic == b'rain':
return RainMessage.from_msg(topic, msg)
else:
return Message.from_msg(topic, msg)
@dataclass
class RainFields(Typed):
value: bool
value_int: int
@dataclass
class RainMessage(Message):
time: str
measurement: str
fields: RainFields
tags: Tags
def as_csv(self):
return f"{self.time},{self.tags.name},{self.fields.value_int}"
@dataclass
class InfluxDBDatapoint(Typed):
time: str
measurement: str
fields: dict
tags: dict

View File

@ -59,10 +59,14 @@ class ConfiguredBotHandlers(object):
def _get_temps(self):
statefile = self.config['listener']['temp_state']
temps = simplestate.get_recent(statefile)
return temps
def _temp_to_string(self, temps):
strs = [f"{k}: `{v['temp']}`C" for (k,v) in temps.items()]
sort = sorted(temps.items(), key=lambda v: v[1].get('type'))
strs = [f"{k}: `{v['temp']:.1f}`C" for (k,v) in temps.items()]
return "\n".join(strs)
def temp1m(self, update, context: CallbackContext):

View File

@ -1,11 +1,16 @@
#!/usr/bin/python3 -u
from collections import deque, defaultdict
import os
import json
import time
import base64
from loguru import logger
import zmq
from sudoisbot.common import init
from sudoisbot.config import read_config
def dealer(dealer_addr, router_addr):
print("dealer")
@ -27,37 +32,273 @@ def dealer(dealer_addr, router_addr):
context.close()
def pubsub(frontend_addr, backend_addr):
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None):
context = zmq.Context()
disk_interval = 3
disk_at = int(time.time()) + disk_interval
def save_cache_to_disk(target_dir="/tmp/proxy_cache/"):
for topic in cache.keys():
filename = topic.decode() + ".cache"
with open(os.path.join(target_dir, filename), 'wb') as f:
for multipart_msg in list(cache[topic]):
parts64 = [base64.b64encode(a) for a in multipart_msg]
#print(parts64)
f.write(b"|".join(parts64))
f.write(b"\n")
def load_cache_from_disk(target_dir="/tmp/proxy_cache"):
files = os.listdir(target_dir)
for filename in files:
fullpath = os.path.join(target_dir, filename)
with open(fullpath, 'rb') as f:
for line in f.readlines():
parts64 = line.split(b"|")
yield [base64.b64decode(a) for a in parts64]
#os.remove(fullpath)
def delete_cache_on_disk(topic, target_dir="/tmp/proxy_cache"):
filename = topic.decode() + ".cache"
fullpath = os.path.join(target_dir, filename)
try:
os.remove(fullpath)
except FileNotFoundError:
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
# facing publishers
frontend = context.socket(zmq.XSUB)
frontend = context.socket(zmq.SUB)
frontend.setsockopt(zmq.SUBSCRIBE, b'')
frontend.bind(frontend_addr)
# facing services (sinks/subsribers)
backend = context.socket(zmq.XPUB)
backend.bind(backend_addr)
# infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
zmq.proxy(frontend, backend)
if capture_addr:
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
logger.info(f"zmq capture: {capture_addr}")
# we never get here
else:
capture = None
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
if capture:
poller.register(backend, zmq.POLLIN)
# send \x01 to all publishers when they connect
lvc = dict()
cache = defaultdict(deque)
cache_topics = set()
for item in load_cache_from_disk():
cache[item[0]].append(item)
for topic in cache.keys():
csize = len(cache[topic])
if csize > 0:
logger.warning(f"{topic} - {csize} cached items loaded")
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
logger.info("im leaving")
save_cache_to_disk()
logger.info("saved cache")
break
now = int(time.time())
if now > disk_at:
save_cache_to_disk()
disk_at = now + disk_interval
if capture:
stats = {
'cache_size': {
k.decode(): len(v) for (k, v) in cache.items()
},
'topics': [a.decode() for a in lvc.keys()],
'cache_topics': [a.decode() for a in cache_topics],
'disk_at': disk_at
}
capture.send_multipart([b"meta:stats", json.dumps(stats).encode()])
if frontend in events:
msg = frontend.recv_multipart()
topic = msg[0]
#frontend.send_multipart([b"\x00rain"])
if topic not in lvc:
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
cache_topics.add(topic)
lvc[topic] = msg
if topic in cache_topics:
#logger.debug(f"[o] cached {msg}")
cache[topic].append(msg)
else:
backend.send_multipart(msg)
if capture:
capture.send_multipart(msg)
if backend in events:
msg = backend.recv_multipart()
#logger.warning(f"[x] backend: {msg}")
if msg[0][0] == 0:
topic = msg[0][1:]
cache_topics.add(topic)
logger.info(f"[o] now caching {topic}")
if msg[0][0] == 1: #'\x01'
topic = msg[0][1:]
if topic not in lvc:
# the keys of the topic dir are also a list of "known topics"
logger.success(f"registered {topic}")
lvc[topic] = None
if topic in cache_topics:
csize = len(cache[topic])
if csize > 0:
logger.info(f"draning {csize} messages for {topic}")
while len(cache[topic]) > 0:
buffered = cache[topic].popleft()
backend.send_multipart(buffered)
save_cache_to_disk()
logger.success(f"stopped caching {topic}")
cache_topics.discard(topic)
elif topic in lvc and lvc[topic] is not None:
cached = lvc[topic]
backend.send_multipart(cached + [b"cached"])
logger.success(f"[>] lvc sent for {topic}")
#frontend.send(msg)
#logger.success(f"[>] backend: {msg}")
if capture in events:
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
#zmq.proxy(frontend, backend, capture)
#while True:
# we never used to get here
frontend.close()
backend.close()
context.close()
def pubsub_listener():
config = init("proxy_pubsub")
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
context = zmq.Context()
frontend_addr = config['zmq_frontend']
backend_addr = config['zmq_backend']
# facing publishers
#frontend = context.socket(zmq.XSUB)
return pubsub(frontend_addr, backend_addr)
frontend = context.socket(zmq.SUB)
frontend.setsockopt(zmq.SUBSCRIBE, b'')
frontend.connect(frontend_addr)
def dealer_listener():
config = init("proxy_dealer")
# facing services (sinks/subsribers)
backend = context.socket(zmq.XPUB)
backend.bind(backend_addr)
# infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
dealer_addr = config['zmq_dealer']
router_addr = config['zmq_router']
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
return dealer(dealer_addr, router_addr)
if capture_addr:
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
logger.info(f"zmq capture: {capture_addr}")
zmq.proxy(frontend, backend, capture)
else:
zmq.proxy(frontend, backend)
# we never get here
frontend.close()
backend.close()
if capture:
capture.close()
context.close()
def capture(capture_addr):
capture_port = capture_addr.split(":")[-1]
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b'')
addr = f"tcp://127.0.0.1:{capture_port}"
socket.connect(addr)
logger.info("connecting to " + addr)
import pprint
import sys
while True:
r = socket.recv_multipart()
#pprint.pprint(r[1].decode())
#print(r)
jdata = json.loads(r[1].decode())
if "cache_size" in jdata:
print(r[1].decode(), end="\n")
sys.stdout.flush()
#print("")
def main_forwarder(config):
# config = init("pubsub_forwarder")
# zmq_in_connect = config['zmq_in_connect']
# zmq_frontend = config['zmq_frontend']
# zmq_capture = config['zmq_capture']
zmq_in_connect = "tcp://192.168.1.2:5560"
zmq_backend = "tcp://*:5560"
zmq_capture = "tcp://127.0.0.1:5561"
return forwarder(
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
def main_buffering(args, config):
capture_addr = config.get('capture_addr')
if args.capture:
return capture(capture_addr)
return proxy_buffering(
config['frontend_addr'], config['backend_addr'], capture_addr)

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3
from datetime import datetime
from datetime import datetime, timezone
import json
import time
@ -11,13 +11,10 @@ class Publisher(object):
# have this class be a context manager with the loop?
def __init__(self, addr, topic, name, frequency):
self.addr = addr
self.name = None # this should be phased out
self.topic = topic
self.name = name
self.frequency = frequency
# TODO: decide if this is a good term or not
self.type = self.topic.decode()
# And even though I'm the publisher, I can do the connecting rather
# than the binding
@ -25,6 +22,8 @@ class Publisher(object):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.set_hwm(256000) # 0 is supposdenly no limit
logger.info(f"emitting on {self.topic} every {self.frequency}s")
def __enter__(self):
self.socket.connect(self.addr)
@ -36,37 +35,46 @@ class Publisher(object):
# print(exc_value)
# print(traceback)
logger.debug("closing socket and destroyed context")
self.socket.close()
self.context.destroy()
logger.info("closed socket and destroyed context")
def publish(self):
raise NotImplementedError("base class cant do anything")
def start(self):
raise NotImplementedError("base class cant do anything")
def loop(self):
while True:
try:
self.publish()
time.sleep(self.frequency)
except KeyboardInterrupt:
logger.info("Caught C-c..")
logger.info("ok im leaving")
break
except StopIteration:
break
def message(self, msg={}):
def message(self, data={}):
base = {
'name': self.name,
'timestamp': datetime.now().isoformat(),
'timestamp': datetime.now(timezone.utc).isoformat(),
'frequency': self.frequency,
'type': self.type,
}
return {**msg, **base}
return {**data, **base}
def send(self, temp):
data = self.message(temp)
def pub(self, data):
jdata = json.dumps(data).encode()
logger.debug(jdata)
logger.trace(jdata)
msg = [self.topic, jdata]
self.socket.send_multipart(msg)
return msg
def send(self, values):
# retire this method
raise NotImplementedError("use '.message()' for envelope and then '.pub()'")
#data = self.message(values)
#self.pub(data)

View File

@ -1,32 +1,70 @@
#!/usr/bin/python3
import json
import time
from loguru import logger
import zmq
class SubscriberTimedOutError(Exception): pass
def reconnect(delay=3.0):
def wrapper(f):
while True:
try:
f()
except zmq.error.Again:
logger.info(f"reconnecting after {delay}sec")
time.sleep(delay)
continue
except KeyboardInterrupt:
logger.info("ok fine im leaving")
return
return wrapper
class Subscriber(object):
def __init__(self, addr, topic, timeout=2):
def __init__(self, addr, topics, rcvtimeo=5*60):
if not isinstance(topics, list):
topics = [topics]
self.addr = addr
if isinstance(topic, bytes):
self.topic = topic
else:
self.topic = topic.encode("utf-8")
self.timeout = int(timeout)
self.topics = [t.encode() if isinstance(t, str) else t for t in topics]
self.rcvtimeo_secs = int(rcvtimeo)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.setsockopt(zmq.RCVTIMEO, self.rcvtimeo_secs * 1000)
#logger.info(f"RCVTIMEO is {self.rcvtimeo_secs}s")
for topic in self.topics:
#self.socket.setsockopt(zmq.SUBSCRIBE, topic)
self.socket.send_multipart([b"\x01" + topic])
def connect(self, addr=None):
self.socket.connect(self.addr)
logger.info(f"connected to: {self.addr}, topics: {self.topics}")
def __enter__(self):
self.connect()
return self
def connect(self):
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
self.socket.connect(addr)
def __exit__(self, exc_type, exc_value, traceback):
self.socket.close()
self.context.destroy()
logger.debug("closed socket and destroyed context")
def recv(self):
try:
return self.socket.recv()
while True:
msg = self.socket.recv_multipart()
cached = len(msg) > 2 and msg[2] == b"cached"
yield (msg[0], json.loads(msg[1]), cached)
except zmq.error.Again:
finish_this_file()
logger.warning(f"no messages in {self.rcvtimeo_secs}s")
raise

View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2004, 2005 Tristan Grimmer
Copyright (c) 2014 Manchson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,7 @@
basis33 is a fixed-width bitmap font for programming and text editing, which contains Latin, Cyrillic, Greek and Hebrew characters.
Its designed by Manchson basing on Latin-only font Proggy Clean by Tristan Grimmer.
basis33 is free/libre software, you are welcome to redistribute and/or modify it under the terms of MIT/Expat license; see LICENSE for details.

Binary file not shown.

View File

@ -3,177 +3,199 @@
# ansible for now
import argparse
import json
import time
from datetime import datetime, timedelta
import os
from datetime import datetime, timezone
from os import path
import sys
import random
import time
from dataclasses import dataclass, field, asdict
import dateutil.parser
import zmq
from requests.exceptions import RequestException
from loguru import logger
from sudoisbot.network.pub import Publisher
from sudoisbot.sink import simplestate
from sudoisbot.unifi import UnifiApi
from sudoisbot.common import init, catch
from sudoisbot.common import chunk
def bark():
import random
numberofwoofs = random.randint(1,3)
woofs = " " + ", ".join(["woof"] * numberofwoofs)
return woofs
def temps_fmt(state):
t = list()
# weird hack to show weather last
for k, v in sorted(state.items(), key=lambda a: a[1].get('type', '') == "weather"):
temp = v['temp']
if v.get('type', "") == "weather":
desc = v['weather']['desc']
diff = datetime.now() - datetime.fromisoformat(v['timestamp'])
age = diff.seconds // 60
fmt = f"{k}[{age}]: {temp} C - {desc}"
@dataclass
class ScreenPublisher(Publisher):
addr: str
weather_location: str
t.append(fmt)
else:
fmt = f"{k}: {temp} C"
t.append(fmt)
return '\n'.join(t)
freq: int = 60
rotation: int = 0
statedir: str = "/dev/shm"
msgs: list = field(default_factory=list)
def people_home(unifi_config, people):
home = set()
try:
api = UnifiApi(unifi_config)
wifi_clients = api.get_client_names()
except RequestException as e:
logger.error(e)
raise
no_loop: bool = False
dry_run: bool = False
for person, devices in people.items():
for device in devices:
if device in wifi_clients:
home.add(person)
return home
def __post_init__(self):
super().__init__(self.addr, b"eink", "screen_pub", self.freq)
if self.rotation is None:
self.rotation = 0
if self.dry_run:
self.no_loop = True
def people_home_fmt(home):
if home:
return "home: " + ", ".join(home)
else:
return "nobody home"
self.first_loop = True
def publisher(addr, name, sleep, rot, statef, upd_int, people, unifi, noloop):
topic = b"eink"
context = zmq.Context()
socket = context.socket(zmq.PUB)
# just hold the last message in memory
# screen doesnt care about missed updates
#socket.setsockopt(zmq.ZMQ_HWM, 1)
logger.info(f"Connected to {addr}")
socket.connect(addr)
self.halfway = 17
self.msgs = [self.align_center(msg) for msg in self.msgs]
# will force an update on first loop
last_home = set()
while True:
home_update = False
logger.info(f"weather location: {self.weather_location}")
def align_center(self, msg):
if len(msg) >= self.halfway*2:
logger.warning("msg '{msg}' is too long, {len(msg)} chars.")
msg_padding = max(self.halfway - (len(msg) // 2), 0)
return " "*msg_padding + msg
def align_right(self, msg):
pad_length = self.halfway * 2 - len(msg)
padding = " "*pad_length
return padding + msg
def get_recent_state(self, measurement='temp'):
state_file = path.join(
self.statedir, f"{measurement}-state.json")
return simplestate.get_recent(state_file)
def make_weather(self):
try:
currently_home = people_home(unifi, people)
home = people_home_fmt(currently_home)
# has anyone come or gone?
if len(currently_home) != len(last_home):
home_update = True
last_home = currently_home
except RequestException:
home = "home: error"
try:
state = simplestate.get_recent(statef)
temps = temps_fmt(state)
state = self.get_recent_state('weather')
weather = state[self.weather_location]['fields']['desc']
except ValueError as e:
logger.error(e)
temps = str(e)
logger.warning(e)
weather = "[err: no recent weather info]"
logger.debug(temps.replace("\n", ", "))
logger.debug(home)
return self.align_center(weather)
rona = " wash hands and shoes off "
woof = " " + bark()
text = temps + '\n' + home + '\n\n' + rona + '\n' + woof
def make_rain(self):
try:
state = self.get_recent_state('rain')
rains = any(v['fields']['value'] for v in state.values())
indicator = "R" if rains else "-"
except ValueError as e:
logger.warning(e)
indicator = "?"
# force more frequent updates for debugging
# 'min_update_interval': 60
data = {
'name': name,
'text': text,
'timestamp': datetime.now().isoformat(),
'rotation': rot,
'home': list(currently_home)
}
# for debugging/dev use
if noloop:
data['min_update_interval'] = 0
logger.warning("forcing update")
# if someone came or left, force update
elif home_update:
logger.info("Someone came/left, forcing update")
data['min_update_interval'] = 0
# prevent getting stuck on forcing updates
home_update = False
# but if nobody is at home then lets just update every 3 hours
elif not last_home:
data['min_update_interval'] = 60*60*3
# otherwise default
else:
data['min_update_interval'] = upd_int
return self.align_right(indicator)
bdata = json.dumps(data).encode()
logger.trace(bdata)
socket.send_multipart([topic, bdata])
def make_people(self):
try:
state = self.get_recent_state('people')['unifi']
if noloop:
break
home = [k for k, v in state['fields'].items() if v]
count = len(home)
indicators = " ".join(home)
except ValueError as e:
logger.warning(e)
indicators = "- - -"
count = 1
return self.align_right(indicators), count
def make_text(self):
return random.choice(self.msgs + [self.align_center(bark())])
def make_temps(self):
l = list()
try:
time.sleep(sleep)
except KeyboardInterrupt:
logger.info("Caught C-c, exiting..")
socket.close()
context.destroy()
return 0
state = self.get_recent_state('temp')
except ValueError as e:
logger.warning(e)
state = dict()
@catch
def main():
for a in ['bedroom', 'study', 'livingroom', 'ls54', 'outdoor']:
# .replace does not mutate original string
shortname = a.replace('room', 'r')
#shortname = a[:min(len(a), 4)]
try:
temp = state[a]['fields']['value']
tempstr = f"{temp:.1f}"
if temp < 10.0:
tempstr = " " + tempstr
l.append(f"{shortname}: {tempstr} C")
except KeyError:
logger.trace(f"no recent temp for '{a}'")
l.append(f"{shortname}: -- C")
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--noloop", action="store_true")
parser.add_argument("--rot", type=int)
fullconfig, args = init(__name__, parser, fullconfig=True)
config = fullconfig["screen_pub"]
unifi = fullconfig["unifi"]
fill = max([len(a) for a in l])
chunks = chunk([a.rjust(fill) for a in l], 2)
temp_rows = list()
for row in chunks:
if len(row) == 1:
temp_rows.append(f"{row[0]} |")
else:
temp_rows.append(" | ".join(row))
return "\n".join(temp_rows)
def publish(self):
woof = " " + bark()
weth = self.make_weather()
temps = self.make_temps()
folk, inhabitants = self.make_people()
text = self.make_text()
rain = self.make_rain()
text = f"{weth}\n{temps}\n{folk}\n{text}\n{rain}"
# add back logic to turn update intervals down pr stop when
# nodody is home
if inhabitants > 0:
update_interval = 15*60 # 15m
else:
update_interval = 66*60*6
data = {
'name': "screen_pub",
'text': text,
'timestamp': datetime.now(timezone.utc).isoformat(),
'rotation': self.rotation,
'min_update_interval': update_interval,
'force_update': self.first_loop or self.no_loop
}
if self.first_loop:
time.sleep(0.3)
self.first_loop = False
if data['force_update'] or self.no_loop:
logger.warning(f"screen should update: \n{data['text']}")
if self.dry_run:
import json
jmsg = json.dumps(data, indent=2)
logger.warning(f"not publishing: \n{jmsg}")
raise StopIteration
self.pub(data)
if self.no_loop:
raise StopIteration
def main(args, config):
name = config['name']
addr = config['addr']
sleep = config['sleep']
rotation = config['rotation'] if not args.rot else args.rot
temp_state_file = config['temp_state_file']
people_home = config['people_home']
update_interval = config['update_interval']
noloop = args.noloop
#people_home = config['people_home']
kwargs = {**config['screen'],
**{
'rotation': args.rotation,
'dry_run': args.dry_run,
'no_loop': args.no_loop,
'statedir': args.statedir,
}}
return publisher(addr,
name,
sleep,
rotation,
temp_state_file,
update_interval,
people_home,
unifi,
noloop)
with ScreenPublisher(addr=addr, **kwargs) as p:
p.loop()
if __name__ == "__main__":
sys.exit(main())
return 0

View File

@ -17,7 +17,7 @@
# sudo systemctl start screen_sub
import argparse
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import json
from time import sleep
@ -36,7 +36,7 @@ def log(text):
# assuming systemd and syslog
print(text)
else:
ts = datetime.now().isoformat()[:19]
ts = datetime.now().isoformat()[5:19]
s = "{}\t{}".format(ts, text)
print(s)
with open("/tmp/screen_sub.log", 'a') as f:
@ -47,9 +47,8 @@ def should_update(last_updated, min_update_interval, debug=False):
if debug:
log("last_updated is False")
return True
# TBB: discard flood messages
now = datetime.now()
now = datetime.now(timezone.utc)
age = now - last_updated
next_ = min_update_interval - age.seconds
@ -71,24 +70,30 @@ def gettext(message):
text = message['text']
have = len(text.strip().split('\n'))
fillers = '\n'*(max(MAX_LINES - have, 0))
timestamp = message['timestamp'].replace("T", " ")[:16]
updated = timestamp
timestamp = datetime.now().isoformat().replace("T", " ")[5:16]
bottom_right = message.get('bottom_right', '')
# doesnt handle too long messagse fix later
return text.strip() + fillers + updated
return text + fillers + timestamp + bottom_right
def inky_write(text, rotation=0):
def inky_write(text, rotation=0, color='black'):
if not have_inky:
print(text)
return
inkyphat.set_colour("red")
inkyphat.set_colour(color)
inkyphat.set_rotation(rotation)
#inkyphat.set_border('black')
#font = inkyphat.ImageFont.truetype(
# inkyphat.fonts.PressStart2P, 8)
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf")
xy = (5, 1)
fill = inkyphat.BLACK
#font = ImageFont.truetype("/usr/share/fonts/basis33.ttf", 13)
xy = (0, 0)
if color == "red":
fill = inkyphat.RED
else:
fill = inkyphat.BLACK
inkyphat.clear()
inkyphat.text(xy, text, fill=fill, font=font)
inkyphat.show()
@ -118,31 +123,41 @@ def sub(addr, topic, timeout, debug):
j = json.loads(msg[1].decode())
# shortening mui means what the loop decides to using
# for minimum_update_interval
# minimum allowed unless forced. regulating updater intervals
# is the responsiblity of screen_pub, this is just to prevent flooding
# since refreshing the display takes a while, a flood would take
# very long to process, and a suicide snail doesnt make much sense
# since 2 min valid information is perfectly fine, and a forced
# update would bypass this anyway
default_mui = 2*60
mui = int(j.get('min_update_interval', default_mui))
if mui == 0:
log("received request to update e-ink screen now")
if debug:
log("received: " + repr(msg[1].decode()))
log("mui: {}".format(mui))
force_update = j.get('force_update', False)
color = j.get('color', 'black')
if should_update(last_updated, mui, debug) or force_update:
# TBB: discard flood messages
ts = datetime.strptime(j['timestamp'][:-6], "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
if ts - now > timedelta(seconds=1.0):
log("discarding old message: '{}'".format(j['timestamp']))
elif should_update(last_updated, mui, debug) or force_update:
if mui == 0 or force_update:
log("starting forced update")
if not have_inky:
log("would update e-ink display")
rotation = j.get("rotation", 0)
text = gettext(j)
inky_write(text, rotation)
inky_write(text, rotation, color)
if have_inky:
if mui == 0:
log("e-ink screen updated (forced)")
else:
log("e-ink screen updated")
last_updated = datetime.now()
log("e-ink screen updated")
last_updated = datetime.now(timezone.utc)
else:
pass
@ -172,6 +187,8 @@ if __name__ == "__main__":
log("Have inky: {}".format(have_inky))
inky_write("Starting.. waiting for update \nfrom {}..".format(addr))
sleep(3.0)
while True:
# endless loop to handle reconnects
try:

View File

@ -0,0 +1,78 @@
// based on https://lastminuteengineers.com/rain-sensor-arduino-tutorial/
// Sensor pins
#define sensorPower 7
#define sensorPin 8
long timeout;
long initial_timeout = 300000;
void setup() {
pinMode(sensorPower, OUTPUT);
pinMode(sensorPin, INPUT);
// Initially keep the sensor OFF
digitalWrite(sensorPower, LOW);
Serial.begin(9600);
// wait 5 minutes for the other side to tell us what the timeout should be
Serial.setTimeout(initial_timeout);
// signal we are ready to recieve the timeout value
Serial.println("{\"ready\": true }");
// block until we get the timeout value. since we power on the pin for the
// raindrop sensor when we take the measurement, we dont want it looping
// needlessly when nothing is reading because it corrodes when theres
// voltage on the sensor.
timeout = Serial.parseInt();
Serial.setTimeout(timeout);
}
void loop() {
if (timeout != 0) {
int val = read_sensor();
String digital;
if (val == 1) {
digital = "HIGH";
}
else {
// water on the rain sensor, resistance has been LOWered
digital = "LOW";
}
String json = "{"
"\"digital\": \"" + digital + "\", " +
"\"timeout\": " + String(timeout) +
"}";
Serial.println(json);
delay(timeout);
}
else {
String error = "{"
"\"error\": \"timeout value was not sent in time\", "
"\"timeout\": " + String(initial_timeout) +
"}";
Serial.println(error);
// print it again every 60 seconds
delay(60000);
}
}
int read_sensor() {
// when the amount of water exceeds the threshold value:
// - status LED lights up
// - digital output (DO) returns LOW.
// - water LOWers resistence
digitalWrite(sensorPower, HIGH);
// Allow power to settle
delay(10);
int val = digitalRead(sensorPin);
digitalWrite(sensorPower, LOW);
return val;
}

92
sudoisbot/sensors/dht.c Normal file
View File

@ -0,0 +1,92 @@
/*
* dht.c:
* read temperature and humidity from DHT11 or DHT22 sensor
*
* depends on 'wiringpi' apt package
*/
#include <wiringPi.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#define MAX_TIMINGS 85
int data[5] = { 0, 0, 0, 0, 0 };
void print_json(int dht_pin) {
uint8_t laststate = HIGH;
uint8_t counter = 0;
uint8_t j = 0, i;
data[0] = data[1] = data[2] = data[3] = data[4] = 0;
/* pull pin down for 18 milliseconds */
pinMode(dht_pin, OUTPUT);
digitalWrite(dht_pin, LOW);
delay(18);
/* prepare to read the pin */
pinMode(dht_pin, INPUT);
/* detect change and read data */
for (i = 0; i < MAX_TIMINGS; i++) {
counter = 0;
while ( digitalRead(dht_pin) == laststate ) {
counter++;
delayMicroseconds(1);
if (counter == 255) {
break;
}
}
laststate = digitalRead(dht_pin);
if (counter == 255) {
break;
}
/* ignore first 3 transitions */
if ( (i >= 4) && (i % 2 == 0) ) {
/* shove each bit into the storage bytes */
data[j / 8] <<= 1;
if (counter > 16)
data[j / 8] |= 1;
j++;
}
}
/*
* check we read 40 bits (8bit x 5 ) + verify checksum in the last byte
* print it out if data is good
*/
if ((j >= 40) && (data[4] == ((data[0]+data[1]+data[2]+data[3]) & 0xFF))) {
float h = (float)((data[0] << 8) + data[1]) / 10;
if ( h > 100 ) {
h = data[0]; // for DHT11
}
float c = (float)(((data[2] & 0x7F) << 8) + data[3]) / 10;
if ( c > 125 ) {
c = data[2]; // for DHT11
}
if ( data[2] & 0x80 ) {
c = -c;
}
printf("{\"humidity\": %.1f, \"temp\": %.1f}\n", h, c);
} else {
printf("{\"error\": \"checksum\"}\n" );
}
}
int main(int argc, char *argv[]) {
if (wiringPiSetup() == -1) {
exit(1);
}
/* dht_pin 3 is GPIO-22 */
int dht_pin;
if (argc > 0 && sscanf(argv[1], "%i", &dht_pin) != 1) {
exit(2);
}
print_json(dht_pin);
return(0);
}

28
sudoisbot/sensors/old.txt Normal file
View File

@ -0,0 +1,28 @@
def read2(self):
""" this code would handle multiple Temper's connected, or
a Temper with both internal/external sensors. but i dont have that
so this isnt used"""
data = self._read()
mapping = {
'internal temperature': 'temp',
'internal humidity': 'humidity',
'external temperature': 'temp',
'external humidity': 'humidity'
}
results = []
for item in data:
# get a dict with the old keys and their values, each of these
# values will be their own dict
sources = [key for key in mapping.keys() if key in item.keys()]
base = {k: v for (k, v) in item.items() if k not in mapping.keys()}
for oldkey in sources:
newkey = mapping[oldkey]
fixed = {newkey: item[oldkey], 'source': oldkey}
results.append({**base, **fixed})
return results

View File

@ -0,0 +1,115 @@
#!/usr/bin/python3
import json
from time import time, sleep
from socket import gethostname
from datetime import datetime, timezone
from loguru import logger
from sudoisbot.network.pub import Publisher
from sudoisbot.sensors.sensors import ArduinoRainSensor, NoSensorDetectedError
class RainPublisher(Publisher):
def __init__(self, sensor, addr, location, freq):
super().__init__(addr, b"rain", None, freq)
self.sensor = sensor
self.location = location
self.freq = freq
self.pub_at = time()
self.tags = {
'hostname': gethostname(),
'name': self.sensor.name,
'location':self.location,
'kind': self.sensor.kind,
'frequency': self.freq,
}
logger.info(f"emitting data as '{self.sensor.name}' every {freq}s")
def publish(self, line):
# LOW = water on the rain sensor, resistance has been LOWered
msg = {
'measurement': 'rain',
'time': datetime.now(timezone.utc).isoformat(),
'tags': self.tags,
'fields': {
'value': line['rain'],
'digital': line['digital'],
'value_int': int(line['rain'])
}
}
logmsg = f"{msg['tags']['name']} rain: {bool(line['rain'])}"
logger.log("RAIN", logmsg)
self.pub(msg)
def start(self):
try:
return self._start()
except KeyboardInterrupt:
logger.info("ok im leaving")
def _start(self):
rain_state = False
for rainline in self.sensor.iter_lines():
#logger.debug(rainline)
now = time()
if now >= self.pub_at or rainline['rain'] != rain_state:
if rainline['rain'] != rain_state:
if rainline['rain']:
logger.warning("started raining")
else:
logger.info("stopped raining")
self.publish(rainline)
self.pub_at = now + self.freq
rain_state = rainline['rain']
def main(config):
broker = config['broker']
freq = config['frequency']
loc = config['location']
log_no = config.get('sensor_log_no', 9)
logger.level("RAIN", no=log_no, color="<green>")
logger.info(f"logging level RAIN on no {log_no}")
try:
if len(config['sensors']['rain']) > 1:
raise NotImplementedError("does not make sense at the moment")
sensor = config['sensors']['rain'][0]
with ArduinoRainSensor(**sensor) as rain_sensor:
with RainPublisher(rain_sensor, broker, loc, freq) as pub:
pub.start()
except (IndexError, KeyError) as e:
raise SystemExit("no rain sensors configured, exiting") from e
except NoSensorDetectedError as e:
logger.error(e)
return 1
except KeyboardInterrupt:
logger.info("Exiting..")
return 0
#name = "balcony"
#loc = "s21"
#freq = 60
return 0

View File

@ -0,0 +1,308 @@
#!/usr/bin/python3
from subprocess import check_output, STDOUT, CalledProcessError
import json
from json.decoder import JSONDecodeError
import os.path
from dataclasses import dataclass, asdict, InitVar
import time
import serial
from loguru import logger
from temper.temper import Temper
W1ROOT = "/sys/bus/w1/devices"
W1LIST = "w1_bus_master1/w1_master_slaves"
class SensorDisconnectedError(Exception): pass
class NoSensorDetectedError(Exception): pass
@dataclass
class TempSensor(object):
name: str
kind: str
environment: bool
def as_dict(self):
return asdict(self)
def __str__(self):
return f"<{self.name} [kind: {self.kind}, environment: {self.environment}]>"
@classmethod
def from_kind(cls, **kwargs):
kind = kwargs['kind'].lower()
objname = kind + "sensor"
sensorobjdict = {a.__name__.lower(): a for a in cls.__subclasses__()}
try:
sensorobj = sensorobjdict[objname]
return sensorobj(**kwargs)
except KeyError as e:
e.args += ("unknown sensor kind", )
raise
@classmethod
def autodetect(cls, name):
"""autodetects what kind of sensor is connected, only works
when the syste has one and only one sensor conncted and only supports
the name arg."""
for sensorobj in cls.__subclasses__():
try:
sensr = sensorobj(name)
logger.info(f"found '{sensor.kind}' sensor")
return connected
except (SensorDisconnectedError, NoSensorDetectedError):
continue
else:
raise NoSensorDetectedError("audotdetect found no sensors connected")
@dataclass(init=True)
class TemperSensor(TempSensor):
def _read(self):
# this function is to abstract away some error handling and make
# read() look nicer
try:
data = self._temper.read()
if len(data) == 0:
raise SensorDisconnectedError("temper: no data returned")
if len(data) > 1:
# i just have the model with one sensor. will expand if i get
# a different model at some point.
raise NotImplementedError("only supports Tempers with one sensor")
return data[0]
except FileNotFoundError as e:
msg = f"temper: {e.args[1]}"
logger.error(msg)
raise SensorDisconnectedError(msg) from e
except PermissionError as e:
raise NoSensorDetectedError(e) from e
def read(self):
reading = self._read()
try:
return {
'measurements': {'temp': reading['internal temperature'] }
}
except KeyError:
if 'firmware' in reading:
logger.error(f"temper usb: temp value missing from '{reading}'")
# makes the for loop just not loop over anything
return dict()
else:
raise
def __post_init__(self):
self._temper = Temper()
# so we bail if temper is configured but not connected/functional
# on start
# call .read() because it is doing error handling, some funky errors
# will slip past if youre trying to be smart about the exception stack
try:
firstreading = self._read()
logger.trace(firstreading)
except SensorDisconnectedError as e:
# NoSensorDetected is already raised in ._read()
#raise NoSensorDetectedError("temper: not connected") from e
raise
@dataclass
class DhtSensor(TempSensor):
dht_pin: InitVar[int]
def __post_init__(self, dht_pin):
if dht_pin:
self.dht_cmd = ["dht", str(dht_pin)]
else:
self.dht_cmd = ["dht"]
def read(self):
# the dht.c binary doesnt write to stderr at the moment
# but lets redirect stderr to stdout now in case i change
# that so this wont break
try:
output = check_output(self.dht_cmd, shell=False, stderr=STDOUT)
logger.trace(output)
joutput = json.loads(output)
return {
'measurements': {
'temp': joutput['temp'],
'humidity': joutput['humidity']
}}
except CalledProcessError as e:
raise SensorDisconnectedError("dht disconnected") from e
@dataclass
class Ds18b20Sensor(TempSensor):
sensor_id: str = None
# study: 28-0300a279f70f
# outdoor: 28-0300a279bbc9
def __post_init__(self):
ds18b20s = self._list_detected_ds18b20()
if len(ds18b20s) > 1 and self.sensor_id is None:
raise RuntimeError("need 'sensor_id' when > 1 ds18b20's connected")
elif self.sensor_id is None:
self.sensor_id = ds18b20s[0]
logger.info(f"set ds18b20 sensor_id to '{self.sensor_id}'")
self.sensorpath = os.path.join(W1ROOT, self.sensor_id, "w1_slave")
def _read_sensor(self):
try:
with open(self.sensorpath, 'r') as f:
return f.read().splitlines()
except FileNotFoundError:
raise SensorDisconnectedError(f"ds18b20: '{self.sensorpath}' not found")
def _parse_sensor_data(self):
# YES = checksum matches
data = self._read_sensor()
if len(data) == 0:
# File "sudoisbot/temps/sensors.py", line 94, in _parse_data
# if not data[0].endswith("YES"):
# └ []
raise SensorDisconnectedError(f"ds18b20: no data")
if not data[0].endswith("YES"):
raise SensorDisconnectedError(f"ds18b20: got '{data}'")
tempstr = data[1].rsplit(" ", 1)[1][2:]
return int(tempstr)/1000.0
def _list_detected_ds18b20(self):
w1_listfile = os.path.join(W1ROOT, W1LIST)
with open(w1_listfile, 'r') as f:
w1_ids = f.read().splitlines()
if len(w1_ids) == 0:
raise NoSensorDetectedError("no ds18b20 sensors connected")
if not all(a.startswith("28-") for a in w1_ids):
# something funky is going on, if this error happens
# then investigate
raise NoSensorDetectedError(f"unexpected values in '{w1_listfile}': {w1_ids}")
return w1_ids
def read(self):
return {
'measurements': { 'temp': self._parse_sensor_data() },
'meta': {'sensorid': self.sensor_id }
}
@dataclass
class ArduinoSensor(object):
# ard_loop_time = how often arduino should send a value in seconds
# called 'timeout' in arduino code
# needs a better name
# especially since the next line also has a timeout variable
# but thats the serial read timeout
name: str
kind: str
device: InitVar[str] = "/dev/ttyUSB0" # linux is a sane default
baudrate: InitVar[int] = 9600
ard_loop_timeout: int = 5 # seconds
# device = "/dev/cu.usbserial-A800eGKH"
# device="/dev/ttyUSB0"
def __post_init__(self, device, baudrate):
assert self.kind == "arduino-rain"
ser_timeout = float(self.ard_loop_timeout) # seconds
logger.debug(f"serial timeout: {ser_timeout}s")
try:
self.ser = serial.Serial(device, baudrate, timeout=ser_timeout)
except serial.SerialException as e:
raise NoSensorDetectedError(e)
def as_dict(self):
return asdict(self)
def hello(self):
for i in range(5):
try:
data = self.ser.readline()
jdata = json.loads(data)
# 'true' is hardcoded..
return jdata['ready']
except (KeyError, JSONDecodeError, UnicodeDecodeError) as e:
# need to polish this when im able to reproduce
# maybe figure out why it happens
logger.warning(f"got invalid json: {data}")
except serial.serialutil.SerialException as e:
logger.error(e)
logger.debug(f"waiting 5s to try again {i}/5")
time.sleep(5.0)
else:
raise NoSensorDetectedError("no data from arduino")
def start(self):
ready = self.hello()
# \n is important !
logger.success(f"{self.name} ready: {ready}")
timeout_ms = self.ard_loop_timeout * 1000
logger.info(f"getting data on {timeout_ms}ms interval")
self.ser.write(f"{timeout_ms}\r\n".encode())
def __enter__(self):
# if i want to use this not as a context manager ill need
# self.started
self.ser.__enter__()
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.ser.__exit__(exc_type, exc_value, traceback)
def iter_lines(self):
while True:
line = self.ser.readline()
logger.trace(line)
if line == b"":
continue
try:
yield json.loads(line)
except JSONDecodeError:
logger.warning(f"discarging garbage: '{line}'")
@dataclass
class ArduinoRainSensor(ArduinoSensor):
def iter_lines(self):
for jline in super().iter_lines():
rain = jline['digital'] == "LOW"
yield {
'digital': jline['digital'],
'rain': rain
}

View File

@ -0,0 +1,79 @@
#!/usr/bin/python3 -u
import time
from datetime import datetime, timezone
from loguru import logger
from socket import gethostname
from sudoisbot.network.pub import Publisher
from sudoisbot.sensors.sensors import TempSensor
from sudoisbot.sensors.sensors import SensorDisconnectedError, NoSensorDetectedError
class TempPublisher(Publisher):
def __init__(self, addr, freq, location, sensors):
super().__init__(addr, b"temp", None, freq)
logger.info(f"HWM: {self.socket.get_hwm()}")
# might ditch 'frequency' here..
self.tags = {
'location': location,
'frequency': freq,
'hostname': gethostname(),
'source': 'sensor'
}
self.sensors = sensors
def publish(self):
for sensor in self.sensors:
reading = sensor.read()
now = datetime.now(timezone.utc).isoformat()
for measurement, value in reading['measurements'].items():
msg = {
'measurement': measurement,
'time': now,
'tags': {**self.tags, **sensor.as_dict()},
'fields': {
'value': float(value)
}
}
logmsg = f"{msg['tags']['name']} {measurement}: {value}"
logger.log("TEMP", logmsg)
self.pub(msg)
def main(config):
broker = config['broker']
freq = config['frequency']
loc = config['location']
log_no = config.get('sensor_log_no', 9)
logger.level("TEMP", no=log_no, color="<green>")
logger.info(f"logging level TEMP on no {log_no}")
while True:
try:
conf_sensors = config['sensors']['temp']
sensors = [TempSensor.from_kind(**a) for a in conf_sensors]
for sensor in sensors:
logger.info(f"emitting data for {sensor}")
with TempPublisher(broker, freq, loc, sensors) as publisher:
publisher.loop()
return 0
except NoSensorDetectedError as e:
logger.error(e)
return 1
except SensorDisconnectedError as e:
# especially usb sensors can be unplugged for a short time
# for various reasons
logger.error(e)
logger.info("waiting 30s for sensor to come back")
time.sleep(30.0)
continue
except KeyboardInterrupt:
logger.info("Exiting..")
return 0

View File

@ -1,28 +1,109 @@
#!/usr/bin/python3
import peewee
from peewee import DateTimeField, TextField, DecimalField, IntegerField
from playhouse.db_url import connect
import json
db = peewee.DatabaseProxy()
import peewee
from peewee import DateTimeField, TextField, DecimalField, CharField
from peewee import MySQLDatabase
from peewee import IntegrityError
from loguru import logger
db_proxy = peewee.DatabaseProxy()
def dbconnect(**mysqlconf):
db = MySQLDatabase(**mysqlconf)
db_proxy.initialize(db)
return db
# db = connect(dburl)
# models.db.initialize(db)
# try:
# with models.db:
# models.Temps.create(timestamp=j['timestamp'],
# name=j['name'],
# temp=j['temp'],
# extra=extra)
# except IntegrityError as e:
# logger.error(e)
class BaseModel(peewee.Model):
class Meta:
database = db
database = db_proxy
class Temps(BaseModel):
timestamp = DateTimeField(index=True)
name = TextField()
class Temperatures(BaseModel):
time = DateTimeField(index=True)
name = CharField(max_length=32)
location = TextField()
environment = TextField()
source = TextField()
temp = DecimalField()
humidity = IntegerField(null=True)
extra = TextField(null=True)
json = TextField(null=False)
def as_msg(self):
return json.loads(self.json)
@classmethod
def insert_msg(cls, msg):
name = msg['tags']['name']
try:
return cls.create(
time = msg['time'],
name = name,
location = msg['tags']['location'],
environment = msg['tags']['environment'],
source = msg['tags']['source'],
temp = msg['fields']['value'],
json = json.dumps(msg)
)
except IntegrityError as e:
logger.error(f"error on message from {name}")
logger.error(e)
return None
class Meta:
indexes = (
(('timestamp', 'name'), True),
(('time', 'name'), True),
)
def create_tables(uri):
db.initialize(connect(uri))
with db:
db.create_tables([Temps])
class Humidities(BaseModel):
time = DateTimeField(index=True)
name = TextField()
location = TextField()
environment = TextField()
source = TextField()
humidity = DecimalField()
json = TextField(null=False)
@classmethod
def insert_msg(cls, msg):
name = msg['tags']['name']
try:
return cls.create(
time = msg['time'],
name = msg['tags']['name'],
location = msg['tags']['location'],
environment = msg['tags']['environment'],
source = msg['tags']['source'],
humidity = msg['fields']['value'],
json = json.dumps(msg)
)
except IntegrityError as e:
logger.error(f"error on message from {name}")
logger.error(e)
return None
class Meta:
indexes = (
(('time', 'humidity'), True),
)
class Sensor(BaseModel):
name = TextField()
sensortype = TextField()
host = TextField()
comment = TextField()
created = DateTimeField()

View File

@ -2,12 +2,34 @@
import sys
import os
from sudoistemps import sink
from peewee import MySQLDatabase, ProgrammingError
from loguru import logger
from sudoisbot.sink import models
from sudoisbot.config import read_config
sqlitefile = sys.argv[1]
conf_file = sys.argv[1]
if os.path.exists(sqlitefile):
raise SystemExit(f"file '{sqlitefile}' exists, not doing anything")
config = read_config(conf_file)
with MySQLDatabase(**config['mysql']) as db:
models.db_proxy.initialize(db)
sink.create_tables("sqlite:///" + sqlitefile)
should_exist = [models.Temperatures, models.Humidities]
create = []
for table in should_exist:
try:
count = table.select().count()
if count > 0:
logger.info(f"{table} table has {count} rows, ignoring")
continue
except ProgrammingError as e:
if not e.args[1].endswith("doesn't exist"):
raise
create.append(table)
if len(create) > 0:
db.create_tables(create)
logger.info(f"created {create}")
else:
logger.warning("did nothing")

View File

@ -1,19 +1,37 @@
#!/usr/bin/env python3
import json
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from loguru import logger
import sudoisbot.datatypes
def get_recent(statefile, grace=10):
state = get_state(statefile)
now = datetime.now()
now = datetime.now(timezone.utc)
temps = dict()
for name, values in state.items():
okdiff = timedelta(minutes=grace, seconds=int(values['frequency']))
dt = datetime.fromisoformat(values['timestamp'])
if now - dt < okdiff:
temps[name] = values
for name, data in state.items():
okdiff = timedelta(minutes=grace, seconds=int(data['tags'].get('frequency', 240)))
dt = datetime.fromisoformat(data['time'])
try:
diff = now - dt
is_recent = diff < okdiff
except TypeError as e:
if "offset-naive and offset-aware" in e.args[0]:
logger.warning(f"record for '{name}' doesnt have a tz")
continue
else:
raise
if is_recent:
logger.trace(f"age of '{name}' state: {diff}")
temps[name] = data
else:
logger.warning(f"record for '{name}' is too old (diff {diff})")
if not any(temps.values()):
raise ValueError("no recent temp data was found")
else:
@ -46,12 +64,20 @@ def get_state(statename):
logger.warning(f"possible race condition: '{e}'")
time.sleep(1.0)
def update_state(update, statename, key=""):
state = get_state(statename)
def update_state(updatemsg, statefilename, key=""):
if isinstance(updatemsg, sudoisbot.datatypes.Message):
logger.warning("i sholdnt be called often and should be removed if this hacking session is fruitful")
updatemsg = updatemsg.as_dict()
name = updatemsg['tags']['name']
state = get_state(statefilename)
try:
name = update['name']
state[update['name']] = update
except TypeError:
state[key] = update
with open(statename, 'w') as f:
state[name].update(updatemsg)
except KeyError:
logger.info(f"adding '{name}' to state {statefilename}")
state[name] = updatemsg
with open(statefilename, 'w') as f:
f.write(json.dumps(state, indent=4))

View File

@ -1,111 +1,175 @@
#!/usr/bin/python3 -u
import os
import json
import sys
from time import sleep
from datetime import datetime
import sys
from loguru import logger
import zmq
from sudoisbot.common import init, catch
from sudoisbot.sink.simplestate import update_state
from sudoisbot.sink import simplestate
from sudoisbot.network.sub import Subscriber, SubscriberTimedOutError
from sudoisbot.sink.models import Temperatures, Humidities, dbconnect
def as_bytes(astring):
if isinstance(astring, bytes):
return astring
else:
return astring.encode()
class ZFluxClient(object):
def __init__(self, addr=None, topic=None):
self.addr = addr
self.topic = as_bytes(topic)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
if addr:
self.connect()
def connect(self, addr=None):
if not addr:
addr = self.addr
self.socket.connect(addr)
logger.info(f"connected to: {addr}, emitting on topic: {self.topic}")
def disconnect(self):
self.socket.close()
self.context.destroy()
logger.debug("zflux client disconnected")
def __enter__(self):
if self.addr:
self.connect(self.addr)
return self
def __exit__(self, exc_type, exc_value, traceback):
#self.disconnect()
pass
def send(self, msg):
self.socket.send_multipart([self.topic, json.dumps(msg).encode()])
class Sink(object):
def __init__(self, topics, write_path, zflux=None):
self.zflux = zflux
self.topics = topics
self.setup_loggers(write_path)
self.state_dir = write_path
def setup_loggers(self, writepath):
# change to 11 or 19 to show with debug logging
logger.level("TXT", no=9, color="<yellow>")
logger.level("SINK", no=11, color="<green>")
for topic in self.topics:
def matcher(topic):
def inner(arg):
extra_topic = arg['extra'].get('topic', b"")
return extra_topic == as_bytes(topic)
return inner
logger.add(os.path.join(writepath, f"{topic}.txt"),
level="TXT", format="{message}",
filter=matcher(topic))
def make_subscriber(self, addr):
return Subscriber(addr, self.topics)
def listen(self, addr):
try:
# with self.make_subscriber(addr) as sub:
# for topic, msg in sub.recv():
# self.handle_msg(topic, msg)
#
# commented out because testing to no gracefully disconnected to get
# publishers to buffer when sink is dead
sub = self.make_subscriber(addr)
sub.connect()
for topic, msg, cached in sub.recv():
if cached:
logger.info(f"got a cached {topic} message from {msg['time']}")
self.handle_msg(topic, msg)
except zmq.error.Again:
logger.info(f"timeout after {sub.rcvtimeo_secs}s..")
raise SubscriberTimedOutError
def handle_msg(self, topic, msg):
self.log(topic, msg)
self.append_file(topic, msg)
self.update_db(topic, msg) # todo: keep records in sql
self.send_zflux(msg)
self.update_state(topic, msg)
def update_db(self, topic, msg):
if topic == b"temp":
if msg['measurement'] == "temp":
Temperatures.insert_msg(msg)
elif msg['measurement'] == "humidity":
Humidities.insert_msg(msg)
def update_state(self, topic, newstate):
measurement = newstate['measurement']
filename = os.path.join(self.state_dir, f"{measurement}-state.json")
simplestate.update_state(newstate, filename)
def send_zflux(self, msg):
if self.zflux:
self.zflux.send(msg)
def append_file(self, topic, msg):
logger.bind(topic=topic).log("TXT", json.dumps(msg))
def log(self, topic, msg):
measurement = msg['measurement']
name = msg['tags']['name']
if 'value' in msg['fields']:
value = f": {msg['fields']['value']}"
else:
value = ""
logger.log("SINK", f"{topic}: {measurement} from '{name}'{value}")
def main(args, config):
db = dbconnect(**config['mysql'])
with ZFluxClient(topic=config['zflux']['topic']) as zflux:
zflux.connect(config['zflux']['addr'])
write_path = args.write_path or config['sink']['write_path']
sink = Sink(config['sink']['topics'], write_path, zflux)
while True:
try:
addr = config['sink']['addr']
sink.listen(addr)
except SubscriberTimedOutError:
sleep(1.0)
logger.info("reconnecting")
except KeyboardInterrupt:
logger.info("ok ill leave then")
return
def suicide_snail(timestamp, max_delay):
# suicide snail (move to common sub code?)
delay = datetime.now() - datetime.fromisoformat(timestamp)
if min(delay.seconds, 0) > max_delay:
logger.error(f"suicide snail: {delay.seconds} secs")
sys.exit(13)
def msg2csv(msg):
short_timestamp = msg['timestamp'][:19] # no millisec
csv = f"{short_timestamp},{msg['name']},{msg['temp']}"
return csv
def sink(addr, timeout, max_delay, state_file):
topic = b"temp"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, topic)
socket.setsockopt(zmq.RCVTIMEO, timeout)
# Even though I'm the subscriber, I'm allowed to get this party
# started with `bind`
#socket.bind('tcp://*:5000')
socket.connect(addr)
logger.info(f"Connected to: '{addr}'")
while True:
try:
msg = socket.recv_multipart()
except zmq.error.Again:
secs = timeout // 1000
logger.warning(f"no messages after {secs} seconds")
socket.close()
context.destroy()
raise
j = json.loads(msg[1])
csv = msg2csv(j)
logger.bind(csv=True).log("TEMPS", csv)
if state_file:
try:
update_state(j, state_file)
except PermissionError as e:
logger.error(e)
raise SystemExit
@catch()
def main():
#config = init(__name__)
config = init("temper_sub")
addr = config['addr']
state_file = config.get("state_file", "")
csv_file = config.get("csv_file", False)
timeout = config.get("timeout", 1000*60*5) # 5 minutes
max_delay = config.get('max_delay', 2) # seconds
if state_file:
logger.info(f"Maintaining state file: {state_file}")
else:
logger.info("Not maintaining a state file")
# adding a new log level. INFO is 20, temps should not be logged
# by an INFO logger
logger.level("TEMPS", no=19, color="<yellow>", icon="🌡️")
if csv_file:
# adding a logger to write the rotating csv files
# no logger timestamp since thats part of the csv data
try:
logger.add(csv_file,
level="TEMPS",
format="{message}",
rotation=config['csv_file_rotation'],
filter=lambda a: "csv" in a['extra'])
logger.info(f"Saving csv to: {csv_file}")
except PermissionError as e:
logger.error(e)
raise SystemExit
else:
logger.info("Not saving csv files")
logger.info(f"max_delay: {max_delay} secs")
while True:
# endless loop to handle reconnects
try:
sink(addr, timeout, max_delay, state_file)
except zmq.error.Again:
logger.info("reconnecting after 10 seconds")
sleep(10.0)
continue
if __name__ == "__main__":
main()
raise SystemExit("suicide snail")

View File

@ -1,6 +0,0 @@
#!/usr/bin/python3
class SensorDisconnectedError(Exception): pass
class NoSensorDetectedError(Exception): pass

View File

@ -1,142 +0,0 @@
#!/usr/bin/python3
from loguru import logger
from temper.temper import Temper
import os.path
from sudoisbot.temps.exceptions import *
W1ROOT = "/sys/bus/w1/devices"
W1LIST = "w1_bus_master1/w1_master_slaves"
class TempSensorBase(object):
pass
class TemperSensor(Temper, TempSensorBase):
sensortype = "temper"
@classmethod
def get(cls):
if cls.is_connected():
return cls()
else:
raise NoSensorDetectedError
@classmethod
def is_connected(cls):
try:
temper = cls()
return len(temper.read()) > 0
except SensorDisconnectedError:
return False
def _read(self):
# error handling
try:
data = super().read()
if len(data) == 0: raise SensorDisconnectedError("temper: no data")
return data
except FileNotFoundError as e:
msg = f"temper: {e.args[1]}"
logger.error(msg)
raise SensorDisconnectedError(msg)
except PermissionError as e:
msg = f"temper found but got: {e}"
logger.error(msg)
raise SensorDisconnectedError(msg)
def read(self):
data = self._read()
mapping = {'internal temperature': 'temp',
'internal humidity': 'humidity',
'external temperature': 'temp',
'external humidity': 'humidity'}
results = []
for item in data:
# get a dict with the old keys and their values, each of these
# values will be their own dict
sources = [key for key in mapping.keys() if key in item.keys()]
base = {k: v for (k, v) in item.items() if k not in mapping.keys()}
for oldkey in sources:
newkey = mapping[oldkey]
fixed = {newkey: item[oldkey], 'source': oldkey}
results.append({**base, **fixed})
return results
class Ds18b20Sensor(TempSensorBase):
sensortype = "ds18b20"
def __init__(self, sensor_ids):
def w1path(sensor_id):
return os.path.join(W1ROOT, sensor_id, "w1_slave")
self.sensors = [(a, w1path(a)) for a in sensor_ids]
def _read_sensor(self, sensor):
try:
with open(sensor, 'r') as f:
return f.read().splitlines()
except FileNotFoundError:
raise SensorDisconnectedError(sensor)
def _parse_data(self, data):
if not data[0].endswith("YES"):
raise SensorDisconnectedError
tempstr = data[1].rsplit(" ", 1)[1][2:]
return int(tempstr)/1000.0
def read(self):
# just expecting one sensor now
for sensorid, sensorpath in self.sensors:
data = self._read_sensor(sensorpath)
temp = self._parse_data(data)
# figure out the rest and do checksums in the future
yield {'temp': temp }
else:
raise SensorDisconnectedError(sensorid)
@classmethod
def get(cls):
with open(os.path.join(W1ROOT, W1LIST), 'r') as f:
w1_ids = f.read().splitlines()
if not all(a.startswith("28-") for a in w1_ids) and len(w1_ids) > 0:
raise NoSensorDetectedError
return cls(w1_ids)
@classmethod
def is_connected(cls):
return len(cls.get().sensors) > 0
def detect_sensor(sensortype=None):
if sensortype:
logger.info(f"skipping detection, attempting to use '{sensortype}'")
return supported_sensors[sensortype].get()
for sensor in supported_sensors.values():
if sensor.is_connected():
logger.info(f"found '{sensor.sensortype}' sensor")
return sensor.get()
else:
raise NoSensorDetectedError
supported_sensors = {a.sensortype: a for a in TempSensorBase.__subclasses__()}

View File

@ -1,103 +0,0 @@
#!/usr/bin/python3 -u
import argparse
import time
import sys
import zmq
from temper.temper import Temper as TemperBase
from loguru import logger
from sudoisbot.common import init, catch
from sudoisbot.network.pub import Publisher
from sudoisbot.temps.sensors import TemperSensor, Ds18b20Sensor
from sudoisbot.temps.sensors import supported_sensors, detect_sensor
from sudoisbot.temps.exceptions import *
# TODO:
# use tmpfs on raspi for state
# set up ntp on raspbi
class TempPublisher(Publisher):
def __init__(self, addr, name, freq, sensor=None):
super().__init__(addr, b"temp", name, freq)
self.sensor = sensor
self.sensortype = self.sensor.sensortype
logger.info(f"emitting data from a {self.sensortype} as '{self.name}'")
def publish(self):
try:
temp = self.sensor.read()
for t in temp:
data = { 'temp': t['temp'],
'metadata': { 'sensortype': self.sensortype,
'firmware': t.get('firmware') } }
# adds name, timestamp, frequency, type
return self.send(data)
except KeyError as e:
if self.sensortype == "temper" and e.args[0] == 'temp':
# seems to happen intermittently
logger.error(t)
else:
raise
except SensorDisconnectedError:
# temper was most likely unplugged
# disconnect handled by __exit__
logger.warning(f"{self.sensortype} sensor unplugged, disconnecting")
raise
def wait_for_sensor(sensortype=None):
sleep_mode = False
while True:
try:
return detect_sensor(sensortype)
except NoSensorDetectedError:
if not sleep_mode:
logger.info("entering sleep mode, checking for sensors every 15m")
sleep_mode = True
time.sleep(15.0*60)
@catch
def main():
parser = argparse.ArgumentParser(
description="emit temp data from therm sensor",
add_help=False)
parser.add_argument("--name", help="set temper name")
parser.add_argument("--sleep", help="publish interval", type=int, default=240)
parser.add_argument("--sensortype", choices=supported_sensors.keys())
config, args = init("temper_pub", parser)
addr = config['addr']
name = config['name'] if not args.name else args.name
sleep = config['sleep'] if not args.sleep else args.sleep
while True:
try:
sensor = wait_for_sensor(args.sensortype)
with TempPublisher(addr, name, sleep, sensor) as publisher:
publisher.loop()
return 0
except SensorDisconnectedError as e:
# especially usb sensors can be unplugged for a short time
# for various reasons
logger.info("waiting 30s for sensor to come back")
time.sleep(30.0)
continue
except PermissionError as e:
logger.error(e)
return 2
except KeyboardInterrupt:
logger.info("Exiting..")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,12 +0,0 @@
#!/usr/bin/env python3
from loguru import logger
from sudoisbot.unifi import UnifiApi
from sudoisbot.common import init
def show_clients():
config = init("unifi")
api = UnifiApi(config)
for client in api.get_clients_short():
logger.info(client)

View File

@ -112,5 +112,6 @@ if __name__ == "__main__":
logger.error(e)
logger.info(f"duplicates: {len(dups)}")
logger.info(f"imported {len(imported)} rows from '{args.csv}'")
logger.info(f"database: '{args.db}'")
logger.info(f"from: {imported[0].timestamp}")
logger.info(f"to: {imported[-1].timestamp}")

View File

@ -0,0 +1,169 @@
#!/usr/bin/python3
import sys
import argparse
from datetime import datetime
import os
import json
import dateutil.parser
from datetime import timezone
import fileinput
from loguru import logger
#from influxdb import InfluxDBClient
import requests.exceptions
#from sudoisbot.sink import models
from sudoisbot.sink.sink import ZFluxClient
from sudoisbot.config import read_config
from sudoisbot.common import init
def mkbody(dt, name, temp):
dt = dateutil.parser.parse(dt).astimezone(timezone.utc).isoformat()
return {
"measurement": "temp",
"tags": {
"name": name
},
"time": dt,
"fields": {
"value": float(f"{float(temp):.2f}") # ugh......
}
}
if __name__ == "__main__":
config = read_config()
zflux = ZFluxClient(topic=config['zflux']['topic'])
zflux.connect(config['zflux']['addr'])
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--csv")
parser.add_argument("--last", type=int)
args = parser.parse_args()
# -csv /srv/temps/temps.csv --last 9500 &&
#config, args = init("csv2influx", parser, fullconfig=True)
#print(os.environ['GRAFANAPASS'])
# logger.info("creating influxdb client")
# client = InfluxDBClient(
# host='ingest.sudo.is',
# port=443,
# username='sudoisbot',
# password=os.environ['GRAFANAPASS'],
# ssl=True,
# verify_ssl=True,
# database='sudoisbot'
# )
if not args.csv:
logger.info("sleeping")
import time
time.sleep(3.0)
logger.info("waiting for stdin data")
try:
for line in fileinput.input():
text = line.strip()
dt, name, temp = text.split(",")
body = mkbody(dt, name, temp)
try:
zflux.send(body)
print(json.dumps(body))
# socket.gaierror: [Errno -2] Name or service not known
# urllib3.exceptions.NewConnectionError
# urllib3.exceptions.MaxRetryError: HTTPSConnectionPool
# requests.exceptions.ConnectionError: HTTPSConnectionPool(host='ingest.sudo.is',
# port=443): Max retries exceeded with url: /write?db=sudoisbot&precision=m (Cause
# d by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0xb56a243
# 0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
# 20.09.2020
# influxdb.exceptions.InfluxDBServerError: b'<html>\r\n<head><title>504 Gateway Time-out</title></head>\r\n<body>\r\n<center><h1>504 Gateway Time-out</h1></center>\r\n<hr><center>nginx/1.18.0 (Ubuntu)</center>\r\n</body>\r\n</html>\r\n'
except requests.exceptions.ConnectionError as e:
raise SystemExit(f"fatal error: {e}")
except KeyboardInterrupt:
logger.info("ok ok im leaving!")
raise SystemExit
import time
time.sleep(10.0)
logger.info('done sleeping')
l = list()
name_input = ""
logger.info(f"reading {args.csv}...")
with open(args.csv, 'r') as f:
for line in f.readlines():
d = dict()
items = line.strip().split(",")
if len(items) == 2:
# before i was smart enough to log the name
if not name_input:
name_input = input("enter name: ")
dt, d['temp'] = items
d['name'] = name_input
else:
dt, name, temp = items
d['name'], d['temp'] = name, temp
#d['timestamp'] = datetime.fromisoformat(dt)
d['timestamp'] = dt
body = mkbody(dt, name, temp)
# import json
# print(json.dumps(body, indent=2))
# raise SystemExit
l.append(body)
logger.info("finished reading file")
# send to influx
logger.info("sending to zflux")
if args.last:
sendthis = l[-args.last:]
logger.info(f"just sending last {args.last} measurements")
#client.write_points(sendthis, batch_size=100, time_precision='m')
for item in sendthis:
zflux.send(item)
print(json.dumps(sendthis[0], indent=2))
print(json.dumps(sendthis[-1], indent=2))
#print(len([a for a in sendthis if a['tags']['name'] == 'bedroom']))
else:
raise NotImplementedError
#logger.info("sending all measurements from csv file")
#client.write_points(l, batch_size=100, time_precision='m')
# try:
# record = models.Temps.create(**d)
# imported.append(record)
# except IntegrityError as e:
# if e.args[0].startswith("UNIQUE"):
# dups.append(line)
# if not args.ignore_dup:
# # still ignore them per say, put still print
# # a warning if we're not expecting them
# logger.warning(f"{e}: '{line}'")
# else:
# logger.error(e)
# logger.info(f"duplicates: {len(dups)}")
# logger.info(f"imported {len(imported)} rows from '{args.csv}'")
# logger.info(f"database: '{args.db}'")
# logger.info(f"from: {imported[0].timestamp}")
# logger.info(f"to: {imported[-1].timestamp}")

View File

@ -0,0 +1,79 @@
#!/usr/bin/python3
import sys
import argparse
from datetime import datetime
import os
import json
import dateutil.parser
from datetime import timezone
import fileinput
from loguru import logger
import requests.exceptions
#from sudoisbot.sink import models
from sudoisbot.config import read_config
if __name__ == "__main__":
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("csv")
parser.add_argument("json")
parser.add_argument("--state")
args = parser.parse_args()
with open(args.state, 'r') as f:
state = json.load(f)
def mktags(name):
if name == "inside":
return {
'name': name,
'environment': 'inside',
'kind': 'temper',
'source': 'sensor',
'frequency': 240
}
else:
tags = state[name]['tags']
tags['frequency'] = 240
return tags
def mkjson(dt, name, temp):
dt = dateutil.parser.parse(dt).astimezone(timezone.utc).isoformat()
return json.dumps({
"measurement": "temp",
"tags": mktags(name),
"time": dt,
"fields": {
"value": float(f"{float(temp):.2f}") # ugh......
}
})
name_input = ""
with open(args.csv, 'r') as f:
with open(args.json, 'w') as j:
for line in f.readlines():
d = dict()
items = line.strip().split(",")
if len(items) == 2:
# before i was smart enough to log the name
if not name_input:
name_input = input("enter name: ")
dt, d['temp'] = items
d['name'] = name_input
else:
dt, name, temp = items
d['name'], d['temp'] = name, temp
#d['timestamp'] = datetime.fromisoformat(dt)
d['timestamp'] = dt
j.write(mkjson(dt, name, temp))
j.write("\n")

View File

@ -0,0 +1,59 @@
#!/usr/bin/python3
import argparse
import json
from time import sleep
from loguru import logger
from sudoisbot.sink.sink import ZFluxClient
from sudoisbot.config import read_config
if __name__ == "__main__":
config = read_config('/usr/local/etc/sudoisbot-sink.yml')
parser = argparse.ArgumentParser()
parser.add_argument("--json-file", required=True)
parser.add_argument("--last", type=int)
args = parser.parse_args()
zflux = ZFluxClient(topic=config['zflux']['topic'])
zflux.connect(config['zflux']['addr'])
logger.info(f"reading {args.json_file}...")
l = list()
with open(args.json_file, 'r') as f:
for line in f.readlines():
jline = json.loads(line)
l.append(jline)
if args.last:
sendthis = l[-args.last:]
else:
sendthis = l
logger.info(f"read: {len(l)}, sending: {len(sendthis)}")
logger.info("sleeping to avoid the late joiner syndrome")
sleep(1.0)
for item in sendthis:
tochange = [k for k, v in item['fields'].items() if isinstance(v, int)]
if tochange:
n = item['tags']['name']
m = item['measurement']
for k in tochange:
logger.warning(f"field: '{k}', measurement: {m}, name: {n} to float")
tosend = {
'measurement': item['measurement'],
'fields': {
k: float(v) if isinstance(v, int) else v
for k, v in item['fields'].items()
},
'tags': item['tags'],
'time': item['time'],
}
zflux.send(tosend)
print(f"oldets sent: {sendthis[0]['time']}")
print(f"newestsent: {sendthis[-1]['time']}")

View File

@ -0,0 +1,55 @@
#!/usr/bin/python3
import sys
import argparse
from datetime import datetime
import os
import json
import dateutil.parser
from datetime import timezone
import fileinput
from peewee import IntegrityError
from loguru import logger
import requests.exceptions
#from sudoisbot.sink import models
from sudoisbot.config import read_config
from sudoisbot.sink.models import Temperatures, Humidities, dbconnect
if __name__ == "__main__":
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("json")
parser.add_argument("--config")
parser.add_argument("--ignore-dups")
args = parser.parse_args()
config = read_config(args.config)
db = dbconnect(**config['mysql'])
temp_count = Temperatures.select().count()
humi_count = Humidities.select().count()
logger.info(f"temp count: {temp_count}")
logger.info(f"humi count: {humi_count}")
with open(args.json, 'r') as j:
for line in j.readlines():
msg = json.loads(line)
msg['tags'].setdefault('location', 'unknown')
try:
if msg['measurement'] == "temp":
Temperatures.insert_msg(msg)
elif msg['measurement'] == "humidity":
Humidities.insert_msg(msg)
except IntegrityError as e:
if e.args[1].startswith("Duplicate") and args.ignore_dups:
name = msg['tags']['name']
time = msg['time']
logger.info(f"ignoring from {name} on {time}")
pass
else:
raise
temp_count = Temperatures.select().count()
humi_count = Humidities.select().count()
logger.success(f"temp count: {temp_count}")
logger.success(f"humi count: {humi_count}")

View File

@ -1,6 +1,7 @@
#!/usr/bin/python3
import argparse
import sys
import zmq
from loguru import logger
@ -11,14 +12,15 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--topic", default="")
# just get the config, so logger is just default config
config, args = init('suball', parser, fullconfig=True)
parser.add_argument("--broker", default="broker.s21.sudo.is")
args = parser.parse_args()
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, args.topic.encode())
socket.setsockopt(zmq.SUBSCRIBE, b'') #args.topic.encode())
addr = config['temper_sub']['addr']
addr = f"tcp://{args.broker}:5560"
socket.connect(addr)
logger.info(f"connected to '{addr}'")

View File

@ -1,187 +0,0 @@
#!/usr/bin/python3
# S21 msl: 40
# S21 lat long: (52.5167654, 13.4656278)
#
# met.no:
#
# tuncate lat/long to 4 decimals
#
# Reponse headers (firefox):
#
# Date Thu, 25 Jun 2020 20:55:23 GMT
# Expires Thu, 25 Jun 2020 21:26:39 GMT
#
# Seems like 30 mins, but check "Expires"
#
# Use "If-Modified-Since" request header
#
# Depending on how i do this, add a random number of mins/secs to
# not do it on the hour/minute
#
# must support redirects and gzip compression (Accept-Encoding: gzip, deflate)
#
# openweatherap:
#
#
# triggers: https://openweathermap.org/triggers
# - polling
# - may as well poll nowcast
#
# ratelimit: 60 calls/minute
#
# weather condition codes: https://openweathermap.org/weather-conditions#Weather-Condition-Codes-2
#
# maybe interesting project: https://github.com/aceisace/Inky-Calendar
from datetime import datetime
from decimal import Decimal
import os
import time
import json
import requests
from loguru import logger
from requests.exceptions import RequestException
from sudoisbot.network.pub import Publisher
from sudoisbot.common import init, catch, useragent
#user_agent2 = f"{user_agent} schedule: 60m. this is a manual run for development, manually run by my author. hello to anyone reading, contact info on github"
lat_lon = ('52.5167654', '13.4656278')
lat, lon = map(Decimal, lat_lon)
msl = 40
owm_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat:.4f}&lon={lon:.4f}&appid={owm_token}&sea_level={msl}&units=metric"
rain_conditions = [
'rain',
'drizzle',
'thunderstorm'
]
class NowcastPublisher(Publisher):
def __init__(self, addr, name, freq, location, msl, config):
topic = b"temp"
super().__init__(addr, topic, name, freq)
self.type = "weather"
self.lat, self.lon = map(Decimal, location)
#self.token = config['token']
#self.url = config['url']
self.token = owm_token
self.url = owm_url.format(lat=self.lat, lon=self.lon)
logger.debug(self.url)
# for debugging and understanding the data
logger.add("/tmp/owm_odd.json",
format="{message}",
filter=lambda x: 'odd' in x['extra'], level="TRACE")
self.session = requests.Session()
self.session.headers.update({"User-Agent": useragent(),
"Accept": "application/json"})
# def message(self, weather):
# super().message()
def send(self, weather):
data = self.message()
data['weather'] = weather
data['temp'] = weather['temp']
data['humidity'] = weather['humidity']
bytedata = json.dumps(data).encode()
logger.debug(bytedata)
# parent class has debug logger
self.socket.send_multipart([self.topic, bytedata])
def query_api(self):
r = self.session.get(self.url)
r.raise_for_status()
if r.status_code == 203:
logger.warning("deprecation warning: http 203 returned")
return r.json()
def get_nowcast(self):
w = self.query_api()
if len(w['weather']) > 1:
logger.warning(f"got {len(w['weather'])} conditions")
logger.warning(f"{w['weather']}")
logger.bind(odd=True).trace(json.dumps(w))
desc = ', '.join([a['description'] for a in w['weather']])
main = ', '.join([a['main'] for a in w['weather']])
raining = 'rain' in main.lower() or 'rain' in desc.lower()
snowing = 'snow' in main.lower() or 'snow' in desc.lower()
drizzling = 'drizzle' in main.lower() or 'drizzle' in desc.lower()
thunderstorm = 'thunderstorm' in main.lower() or 'thunderstorm' in desc.lower()
any_percip = raining or snowing or drizzling or thunderstorm
if any_percip:
logger.bind(odd=True).trace(json.dumps(w))
precipitation = {
'raining': raining,
'snowing': snowing,
'drizzling': drizzling,
'thunderstorm': thunderstorm,
'any': any_percip
}
temp = w['main']['temp']
humidity = w['main']['humidity']
pressure = w['main']['pressure']
wind = w.get('wind', {})
# this is the rain/snow volume for the last 1h and 3h
rain = w.get('rain', {})
snow = w.get('snow', {})
dt = w['dt']
# misnomer on my behalf
# .fromtimestamp() -> converts to our tz (from UTC)
# .utcfromtimestamp() -> returns in UTC
weather_dt = datetime.fromtimestamp(dt).isoformat()
return {
'temp': temp,
'desc': desc,
'humidity': humidity,
'wind': wind,
'rain': rain,
'main': main,
'snow': snow,
'pressure': pressure,
'precipitation': precipitation
}
def publish(self):
try:
nowcast = self.get_nowcast()
return self.send(nowcast)
except RequestException as e:
logger.error(e)
def pub(addr):
freq = 60 * 5 # 5 mins
with NowcastPublisher(addr, "fhain", freq, lat_lon, msl, {}) as publisher:
publisher.loop()
@catch
def main():
config = init("weather_pub", fullconfig=True)
addr = config['temper_pub']['addr']
pub(addr)