216 lines
6.2 KiB
Python
216 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from time import time
|
|
import json
|
|
|
|
from collections import deque
|
|
from itertools import islice
|
|
|
|
import zmq
|
|
from loguru import logger
|
|
from influxdb import InfluxDBClient
|
|
|
|
# exceptions influxdb throws
|
|
from socket import gaierror
|
|
from requests.exceptions import RequestException
|
|
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
|
|
|
|
def exc_str(exception):
|
|
return f"{type(exception).__name__}: {exception}"
|
|
|
|
class Zflux(object):
|
|
|
|
def __init__(self, topic, batch=4, max_age=5, poll_secs=2):
|
|
"""
|
|
influxdb_client: InfluxDBClient object
|
|
batch: how many messages to send to influxdb at a time
|
|
max_age: how long a msg is kept in memory before trying to send it
|
|
poll_secs: duration of polling loop
|
|
|
|
messages are sent when it has been max_age seconds since last attempt OR
|
|
there are batch messages in the bufger
|
|
|
|
|
|
"""
|
|
self.topic = topic
|
|
self.poll_secs = poll_secs
|
|
self.batch = batch
|
|
self.max_age=max_age
|
|
|
|
self.influx_at = time() + self.max_age
|
|
self.buffer = deque()
|
|
|
|
# PUSH/PULL is round-robin
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
|
|
|
# healthcecks and healtchecks
|
|
self.metrics = self.context.socket(zmq.REP)
|
|
|
|
self.poller = zmq.Poller()
|
|
self.poller.register(self.socket, zmq.POLLIN)
|
|
|
|
def __del__(self):
|
|
self.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
return self.close()
|
|
|
|
def close(self):
|
|
logger.info("closing sockets and exiting")
|
|
self.socket.close()
|
|
self.metrics.close()
|
|
self.context.term()
|
|
|
|
def connect(self, addr):
|
|
self.socket.connect(addr)
|
|
|
|
logger.info(f"connected to: {addr}")
|
|
self.connect_addr = addr
|
|
|
|
def listen_metrics(self, addr):
|
|
self.metrics.bind(addr)
|
|
self.poller.register(self.metrics, zmq.POLLIN)
|
|
|
|
logger.info(f"metrics: {addr}")
|
|
self.metrics_addr = addr
|
|
|
|
def bind(self, addr, metrics=None):
|
|
self.socket.bind(addr)
|
|
|
|
logger.info(f"bind: {addr}")
|
|
self.bind_addr = addr
|
|
|
|
|
|
|
|
def influxdb_setup(self, host, db, user, passwd, timeout=2, precision='m'):
|
|
self.influxdb_client = InfluxDBClient(
|
|
host=host,
|
|
port=443,
|
|
ssl=True,
|
|
verify_ssl=True,
|
|
timeout=timeout,
|
|
database=db,
|
|
username=user,
|
|
password=passwd)
|
|
self.precision = precision
|
|
|
|
def influxdb_write(self, msgs):
|
|
|
|
if len(msgs) > self.batch:
|
|
raise ValueError("just send one chunk")
|
|
|
|
write = self.influxdb_client.write_points(
|
|
msgs,
|
|
time_precision=self.precision,
|
|
batch_size=self.batch)
|
|
|
|
if not write:
|
|
raise ValueError("influxdb client write returned False")
|
|
return len(msgs)
|
|
|
|
|
|
def run(self):
|
|
try:
|
|
while True:
|
|
self.handle_recv()
|
|
if len(self.buffer) > 0:
|
|
self.handle_buffer()
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
if len(self.buffer) > 0:
|
|
logger.info(f"nonempty buffer {len(self.buffer)}, flushing")
|
|
logger.debug(self.buffer[0])
|
|
try:
|
|
self.send_buffer()
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
raise SystemExit
|
|
|
|
def handle_recv(self):
|
|
polled = dict(self.poller.poll(timeout=self.poll_secs*1000))
|
|
|
|
if self.metrics in polled and polled[self.metrics] == zmq.POLLIN:
|
|
msg = self.metrics.recv()
|
|
if msg == b"ruok":
|
|
self.metrics.send(b'imok')
|
|
elif msg == b"metrics":
|
|
metrics = {'buffer_size': len(self.buffer), 'influx_at': self.influx_at}
|
|
self.metrics.send(json.dumps(metrics).encode())
|
|
|
|
if self.socket in polled and polled[self.socket] == zmq.POLLIN:
|
|
topic, msg = self.socket.recv_multipart()
|
|
# topic is not used but very probably will be
|
|
jmsg = json.loads(msg.decode())
|
|
#logger.debug(jmsg)
|
|
|
|
self.buffer.append(jmsg)
|
|
|
|
|
|
def handle_buffer(self):
|
|
|
|
now = time()
|
|
count = len(self.buffer)
|
|
|
|
if (now > self.influx_at ) or (count > self.batch):
|
|
until_next = int(self.influx_at) - int(now)
|
|
|
|
# if self.influx_at is more than a max_age away, an error handler
|
|
# is asking us to wait
|
|
#
|
|
# idea: keep self.buffer_size and check if it is growing?
|
|
if until_next <= self.max_age:
|
|
self.send_buffer()
|
|
|
|
def send_buffer(self):
|
|
|
|
try:
|
|
while len(self.buffer) > 0:
|
|
thisbatch = list(islice(self.buffer, self.batch))
|
|
#thisbatch = self.buffer[:1]
|
|
self.influxdb_write(thisbatch)
|
|
self.influx_at = time() + self.max_age
|
|
|
|
for _ in range(len(thisbatch)):
|
|
self.buffer.popleft()
|
|
|
|
except InfluxDBClientError as e:
|
|
# client errors are f.ex. invalid format or bad auth. those requeusts
|
|
# will not succeed by trying again so we bail (maybe improve later?)
|
|
logger.error(exc_str(e))
|
|
raise SystemExit(1)
|
|
|
|
except (InfluxDBServerError, gaierror, RequestException, ValueError) as e:
|
|
#if isinstance(e, InfluxDBServerError) and e.args[0]['error'] == 'timeout':
|
|
# # influxdb.exceptions.InfluxDBServerError: {"error":"timeout"}
|
|
wait = self.max_age*3
|
|
logger.warning(exc_str(e))
|
|
logger.warning(f"buffer size: {len(self.buffer)}, wait: {wait}s")
|
|
self.influx_at = time() + wait
|
|
|
|
|
|
|
|
def main():
|
|
|
|
import zflux.config
|
|
conf = zflux.config.Config.read()
|
|
|
|
zflux = Zflux(conf.zmq.topic)
|
|
if conf.zmq.connect:
|
|
zflux.connect(conf.zmq.connect)
|
|
else:
|
|
zflux.bind(conf.zmq.bind)
|
|
|
|
if conf.zmq.metrics is not None:
|
|
zflux.listen_metrics(conf.zmq.metrics)
|
|
|
|
zflux.influxdb_setup(**vars(conf.influxdb))
|
|
|
|
zflux.run()
|