rtl_433/examples/rtl_433_mqtt_relay.py

286 lines
9.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""MQTT monitoring relay for rtl_433 communication."""
# This program listens on a UDP socket for syslog messages with a json
# payload, and publishes the data via MQTT. The broker connection is
# kept open (and automatically reconnects on failure). Each device
# is mapped to its own topic,
# Dependencies:
# Paho-MQTT; see https://pypi.python.org/pypi/paho-mqtt
# Optionally: PEP 3143 - Standard daemon process library
# (on 2.7, pip install python-daemon)
# To enable daemon support, uncomment the following line and adjust
# run(). Note that print() is still used.
# import daemon
from __future__ import print_function
from __future__ import with_statement
import json
import logging
import socket
import time
import paho.mqtt.client as mqtt
# The config class represents a config object. The constructor takes
# an optional pathname, and will switch on the suffix (.yaml for now)
# and read a dictionary.
class rtlconfig(object):
# Initialize with default values.
c = {
# Log level info (False) or debug (True)
'DEBUG': False,
# Address to listen on for syslog/json messages from rtl_433
'UDP_IP': "127.0.0.1",
'UDP_PORT': 1433,
# MQTT broker address and credentials
'MQTT_HOST': "127.0.0.1",
'MQTT_PORT': 1883,
'MQTT_USERNAME': None,
'MQTT_PASSWORD': None,
'MQTT_TLS': False,
# MQTT content
'MQTT_PREFIX': "sensor/rtl_433",
'MQTT_DEDUP': True,
'MQTT_INDIVIDUAL_TOPICS': True,
'MQTT_JSON_TOPIC': True,
}
def __init__(self, f=None):
fdict = None
# Try to read a dictionary from f.
if f:
try:
# Assume yaml. \todo Check and support other formats
import yaml
with open(f) as fh:
fdict = yaml.safe_load(fh)
except:
print('Did not read {f} (no yaml, not found, bad?).'.format(f=f))
# Merge fdict into configdict.
if fdict:
for (k, v) in fdict.items():
self.c[k] = v
# Support c['name'] references.
def __getitem__(self, k):
return self.c[k]
class dedup(object):
""" A dedup class object supports deduping a stream of reports by
answering if a report is interesting relative to the history. While
more complicated deduping is allowed by the interface, for now it is
very simple, keeping track of only the previous interesting object.
For now, we more or less require that all reports have the same keys. """
# \todo Consider a cache with several entries.
def __init__(self):
# Make this long enough to skip repeats, but allow messages
# every 10s to come through.
self.duration = 5
# Exclude reception metadata (time and RF).
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
# Initialize storage for what was last sent.
(self.last_report, self.last_now) = (None, None)
def send_store(self, report, n):
""" Record report, n as the last report declared interesting, and
return True (to denote interesting). """
(self.last_report, self.last_now) = (report, n)
return True
def equiv(self, j1, j2):
""" Return True if j1 and j2 are the same, except for boring_keys. """
for (k, v) in j1.items():
# If in boring, we don't care.
if k not in self.boring_keys:
# If in j1 and not j2, they are different.
if k not in j2:
logging.debug("equiv: %s in j1 and not j2" % (k))
return False
if j1[k] != j2[k]:
logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
return False
# If the lengths are different, they must be different.
if len(j1) != len(j2):
logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
return False
# If we get here, then the lengths are the same, and all
# non-boring keys in j1 exist in j2, and have the same value.
# It could be that j2 is missing a boring key and also has a
# new non-boring key, but boring keys in particular should not
# be variable.
return True
# report is a python dictionary
def is_interesting(self, report):
""" If report is intersting, return True and update records of the
most recent interesting report. Otherwise return False. """
n = time.time()
# If previous interesting is missing or empty, accept this one.
if self.last_report is None or self.last_now is None:
logging.debug("interesting: no previous")
return self.send_store(report, n)
# If previous one was too long ago, accept this one.
if n - self.last_now > self.duration:
logging.debug("interesting: time")
return self.send_store(report, n)
if not self.equiv(self.last_report, report):
logging.debug("interesting: different")
return self.send_store(report, n)
return False
# Create a config object, defaults modified by the config file if present.
c = rtlconfig("rtl_433_mqtt_relay.yaml")
# Create a dedup object for later use, even if it's configured off.
d = dedup()
def mqtt_connect(client, userdata, flags, rc):
"""Handle MQTT connection callback."""
logging.info("MQTT connected: " + mqtt.connack_string(rc))
def mqtt_disconnect(client, userdata, rc):
"""Handle MQTT disconnection callback."""
logging.info("MQTT disconnected: " + mqtt.connack_string(rc))
# Create listener for incoming json string packets.
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.bind((c['UDP_IP'], c['UDP_PORT']))
# Map characters that will cause problems or be confusing in mqtt
# topics.
def sanitize(text):
"""Sanitize a name for Graphite/MQTT use."""
return (text
.replace(" ", "_")
.replace("/", "_")
.replace(".", "_")
.replace("&", ""))
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""
if c['MQTT_DEDUP']:
# If this data is not novel relative to recent data, just skip it.
# Otherwise, send it via MQTT.
if not d.is_interesting(data):
logging.debug(" not interesting")
return
logging.debug( "INTERESTING")
# Construct a topic from the information that identifies which
# device this frame is from.
# NB: id is only used if channel is not present.
path = c['MQTT_PREFIX']
if "model" in data:
path += "/" + sanitize(data["model"])
if "channel" in data:
path += "/" + str(data["channel"])
if "id" in data:
path += "/" + str(data["id"])
if c['MQTT_INDIVIDUAL_TOPICS']:
# Publish some specific items on subtopics.
if "battery_ok" in data:
mqttc.publish(path + "/battery", data["battery_ok"])
if "humidity" in data:
mqttc.publish(path + "/humidity", data["humidity"])
if "temperature_C" in data:
mqttc.publish(path + "/temperature", data["temperature_C"])
if "depth_cm" in data:
mqttc.publish(path + "/depth", data["depth_cm"])
if c['MQTT_JSON_TOPIC']:
# Publish the entire json string on the main topic.
mqttc.publish(path, line)
def parse_syslog(line):
"""Try to extract the payload from a syslog line."""
line = line.decode("ascii") # also UTF-8 if BOM
if line.startswith("<"):
# Fields should be "<PRI>VER", timestamp, hostname, command, pid, mid, sdata, payload.
# The payload might have spaces, so force split to stop after the sixth space.
fields = line.split(None, 7)
line = fields[-1]
else:
# Hope that the line was just json without the syslog header.
pass
return line
def rtl_433_probe():
"""Run a rtl_433 UDP listener."""
## Connect to MQTT
if hasattr(mqtt, 'CallbackAPIVersion'): # paho >= 2.0.0
mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
else:
mqttc = mqtt.Client()
mqttc.on_connect = mqtt_connect
mqttc.on_disconnect = mqtt_disconnect
if c['MQTT_USERNAME'] != None:
mqttc.username_pw_set(c['MQTT_USERNAME'], password=c['MQTT_PASSWORD'])
if c['MQTT_TLS']:
mqttc.tls_set()
mqttc.connect_async(c['MQTT_HOST'], c['MQTT_PORT'], 60)
mqttc.loop_start()
## Receive UDP datagrams, extract json, and publish.
while True:
line, addr = sock.recvfrom(1024)
try:
line = parse_syslog(line)
data = json.loads(line)
logging.debug("received %s" % line)
publish_sensor_to_mqtt(mqttc, data, line)
except ValueError:
pass
def run():
"""Run main or daemon."""
# with daemon.DaemonContext(files_preserve=[sock]):
# detach_process=True
# uid
# gid
# working_directory
# Set up logging at INFO, and change to DEBUG if config asks for that.
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z')
logging.getLogger().setLevel(logging.INFO)
if c['DEBUG']:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("DEBUG LOGGING ENABLED")
rtl_433_probe()
if __name__ == "__main__":
run()