mirror of https://github.com/merbanan/rtl_433.git
286 lines
9.5 KiB
Python
Executable File
286 lines
9.5 KiB
Python
Executable File
#!/usr/bin/env python
|
||
|
||
"""MQTT monitoring relay for rtl_433 communication."""
|
||
|
||
# This program listens on a UDP socket for syslog messages with a json
|
||
# payload, and publishes the data via MQTT. The broker connection is
|
||
# kept open (and automatically reconnects on failure). Each device
|
||
# is mapped to its own topic,
|
||
|
||
# Dependencies:
|
||
# Paho-MQTT; see https://pypi.python.org/pypi/paho-mqtt
|
||
|
||
# Optionally: PEP 3143 - Standard daemon process library
|
||
# (on 2.7, pip install python-daemon)
|
||
|
||
# To enable daemon support, uncomment the following line and adjust
|
||
# run(). Note that print() is still used.
|
||
# import daemon
|
||
|
||
from __future__ import print_function
|
||
from __future__ import with_statement
|
||
|
||
import json
|
||
import logging
|
||
import socket
|
||
import time
|
||
|
||
import paho.mqtt.client as mqtt
|
||
|
||
|
||
# The config class represents a config object. The constructor takes
|
||
# an optional pathname, and will switch on the suffix (.yaml for now)
|
||
# and read a dictionary.
|
||
class rtlconfig(object):
|
||
|
||
# Initialize with default values.
|
||
c = {
|
||
# Log level info (False) or debug (True)
|
||
'DEBUG': False,
|
||
|
||
# Address to listen on for syslog/json messages from rtl_433
|
||
'UDP_IP': "127.0.0.1",
|
||
'UDP_PORT': 1433,
|
||
|
||
# MQTT broker address and credentials
|
||
'MQTT_HOST': "127.0.0.1",
|
||
'MQTT_PORT': 1883,
|
||
'MQTT_USERNAME': None,
|
||
'MQTT_PASSWORD': None,
|
||
'MQTT_TLS': False,
|
||
|
||
# MQTT content
|
||
'MQTT_PREFIX': "sensor/rtl_433",
|
||
'MQTT_DEDUP': True,
|
||
'MQTT_INDIVIDUAL_TOPICS': True,
|
||
'MQTT_JSON_TOPIC': True,
|
||
}
|
||
|
||
def __init__(self, f=None):
|
||
fdict = None
|
||
|
||
# Try to read a dictionary from f.
|
||
if f:
|
||
try:
|
||
# Assume yaml. \todo Check and support other formats
|
||
import yaml
|
||
with open(f) as fh:
|
||
fdict = yaml.safe_load(fh)
|
||
except:
|
||
print('Did not read {f} (no yaml, not found, bad?).'.format(f=f))
|
||
|
||
# Merge fdict into configdict.
|
||
if fdict:
|
||
for (k, v) in fdict.items():
|
||
self.c[k] = v
|
||
|
||
# Support c['name'] references.
|
||
def __getitem__(self, k):
|
||
return self.c[k]
|
||
|
||
class dedup(object):
|
||
""" A dedup class object supports deduping a stream of reports by
|
||
answering if a report is interesting relative to the history. While
|
||
more complicated deduping is allowed by the interface, for now it is
|
||
very simple, keeping track of only the previous interesting object.
|
||
For now, we more or less require that all reports have the same keys. """
|
||
|
||
# \todo Consider a cache with several entries.
|
||
|
||
def __init__(self):
|
||
# Make this long enough to skip repeats, but allow messages
|
||
# every 10s to come through.
|
||
self.duration = 5
|
||
# Exclude reception metadata (time and RF).
|
||
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
|
||
# Initialize storage for what was last sent.
|
||
(self.last_report, self.last_now) = (None, None)
|
||
|
||
def send_store(self, report, n):
|
||
""" Record report, n as the last report declared interesting, and
|
||
return True (to denote interesting). """
|
||
(self.last_report, self.last_now) = (report, n)
|
||
return True
|
||
|
||
def equiv(self, j1, j2):
|
||
""" Return True if j1 and j2 are the same, except for boring_keys. """
|
||
for (k, v) in j1.items():
|
||
# If in boring, we don't care.
|
||
if k not in self.boring_keys:
|
||
# If in j1 and not j2, they are different.
|
||
if k not in j2:
|
||
logging.debug("equiv: %s in j1 and not j2" % (k))
|
||
return False
|
||
if j1[k] != j2[k]:
|
||
logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
|
||
return False
|
||
# If the lengths are different, they must be different.
|
||
if len(j1) != len(j2):
|
||
logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
|
||
return False
|
||
|
||
# If we get here, then the lengths are the same, and all
|
||
# non-boring keys in j1 exist in j2, and have the same value.
|
||
# It could be that j2 is missing a boring key and also has a
|
||
# new non-boring key, but boring keys in particular should not
|
||
# be variable.
|
||
return True
|
||
|
||
# report is a python dictionary
|
||
def is_interesting(self, report):
|
||
""" If report is intersting, return True and update records of the
|
||
most recent interesting report. Otherwise return False. """
|
||
n = time.time()
|
||
|
||
# If previous interesting is missing or empty, accept this one.
|
||
if self.last_report is None or self.last_now is None:
|
||
logging.debug("interesting: no previous")
|
||
return self.send_store(report, n)
|
||
|
||
# If previous one was too long ago, accept this one.
|
||
if n - self.last_now > self.duration:
|
||
logging.debug("interesting: time")
|
||
return self.send_store(report, n)
|
||
|
||
if not self.equiv(self.last_report, report):
|
||
logging.debug("interesting: different")
|
||
return self.send_store(report, n)
|
||
|
||
return False
|
||
|
||
# Create a config object, defaults modified by the config file if present.
|
||
c = rtlconfig("rtl_433_mqtt_relay.yaml")
|
||
|
||
# Create a dedup object for later use, even if it's configured off.
|
||
d = dedup()
|
||
|
||
def mqtt_connect(client, userdata, flags, rc):
|
||
"""Handle MQTT connection callback."""
|
||
logging.info("MQTT connected: " + mqtt.connack_string(rc))
|
||
|
||
|
||
def mqtt_disconnect(client, userdata, rc):
|
||
"""Handle MQTT disconnection callback."""
|
||
logging.info("MQTT disconnected: " + mqtt.connack_string(rc))
|
||
|
||
|
||
# Create listener for incoming json string packets.
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||
sock.bind((c['UDP_IP'], c['UDP_PORT']))
|
||
|
||
|
||
# Map characters that will cause problems or be confusing in mqtt
|
||
# topics.
|
||
def sanitize(text):
|
||
"""Sanitize a name for Graphite/MQTT use."""
|
||
return (text
|
||
.replace(" ", "_")
|
||
.replace("/", "_")
|
||
.replace(".", "_")
|
||
.replace("&", ""))
|
||
|
||
|
||
def publish_sensor_to_mqtt(mqttc, data, line):
|
||
"""Publish rtl_433 sensor data to MQTT."""
|
||
|
||
if c['MQTT_DEDUP']:
|
||
# If this data is not novel relative to recent data, just skip it.
|
||
# Otherwise, send it via MQTT.
|
||
if not d.is_interesting(data):
|
||
logging.debug(" not interesting")
|
||
return
|
||
logging.debug( "INTERESTING")
|
||
|
||
# Construct a topic from the information that identifies which
|
||
# device this frame is from.
|
||
# NB: id is only used if channel is not present.
|
||
path = c['MQTT_PREFIX']
|
||
if "model" in data:
|
||
path += "/" + sanitize(data["model"])
|
||
if "channel" in data:
|
||
path += "/" + str(data["channel"])
|
||
if "id" in data:
|
||
path += "/" + str(data["id"])
|
||
|
||
if c['MQTT_INDIVIDUAL_TOPICS']:
|
||
# Publish some specific items on subtopics.
|
||
if "battery_ok" in data:
|
||
mqttc.publish(path + "/battery", data["battery_ok"])
|
||
|
||
if "humidity" in data:
|
||
mqttc.publish(path + "/humidity", data["humidity"])
|
||
|
||
if "temperature_C" in data:
|
||
mqttc.publish(path + "/temperature", data["temperature_C"])
|
||
|
||
if "depth_cm" in data:
|
||
mqttc.publish(path + "/depth", data["depth_cm"])
|
||
|
||
if c['MQTT_JSON_TOPIC']:
|
||
# Publish the entire json string on the main topic.
|
||
mqttc.publish(path, line)
|
||
|
||
def parse_syslog(line):
|
||
"""Try to extract the payload from a syslog line."""
|
||
line = line.decode("ascii") # also UTF-8 if BOM
|
||
if line.startswith("<"):
|
||
# Fields should be "<PRI>VER", timestamp, hostname, command, pid, mid, sdata, payload.
|
||
# The payload might have spaces, so force split to stop after the sixth space.
|
||
fields = line.split(None, 7)
|
||
line = fields[-1]
|
||
else:
|
||
# Hope that the line was just json without the syslog header.
|
||
pass
|
||
return line
|
||
|
||
|
||
def rtl_433_probe():
|
||
"""Run a rtl_433 UDP listener."""
|
||
|
||
## Connect to MQTT
|
||
if hasattr(mqtt, 'CallbackAPIVersion'): # paho >= 2.0.0
|
||
mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
|
||
else:
|
||
mqttc = mqtt.Client()
|
||
mqttc.on_connect = mqtt_connect
|
||
mqttc.on_disconnect = mqtt_disconnect
|
||
if c['MQTT_USERNAME'] != None:
|
||
mqttc.username_pw_set(c['MQTT_USERNAME'], password=c['MQTT_PASSWORD'])
|
||
if c['MQTT_TLS']:
|
||
mqttc.tls_set()
|
||
mqttc.connect_async(c['MQTT_HOST'], c['MQTT_PORT'], 60)
|
||
mqttc.loop_start()
|
||
|
||
## Receive UDP datagrams, extract json, and publish.
|
||
while True:
|
||
line, addr = sock.recvfrom(1024)
|
||
try:
|
||
line = parse_syslog(line)
|
||
data = json.loads(line)
|
||
logging.debug("received %s" % line)
|
||
publish_sensor_to_mqtt(mqttc, data, line)
|
||
|
||
except ValueError:
|
||
pass
|
||
|
||
|
||
def run():
|
||
"""Run main or daemon."""
|
||
# with daemon.DaemonContext(files_preserve=[sock]):
|
||
# detach_process=True
|
||
# uid
|
||
# gid
|
||
# working_directory
|
||
|
||
# Set up logging at INFO, and change to DEBUG if config asks for that.
|
||
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z')
|
||
logging.getLogger().setLevel(logging.INFO)
|
||
if c['DEBUG']:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
logging.debug("DEBUG LOGGING ENABLED")
|
||
|
||
rtl_433_probe()
|
||
|
||
if __name__ == "__main__":
|
||
run()
|