rtl_433/src/output_influx.c

521 lines
16 KiB
C

/** @file
InfluxDB output for rtl_433 events.
Copyright (C) 2019 Daniel Krueger
based on output_mqtt.c
Copyright (C) 2019 Christian Zuckschwerdt
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
*/
// note: our unit header includes unistd.h for gethostname() via data.h
#include "output_influx.h"
#include "optparse.h"
#include "logger.h"
#include "fatal.h"
#include "r_util.h"
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include "mongoose.h"
/* InfluxDB client abstraction / printer */
typedef struct {
struct data_output output;
struct mg_mgr *mgr;
struct mg_connection *conn;
int prev_status;
int prev_resp_code;
char hostname[64];
char url[400];
char extra_headers[150];
tls_opts_t tls_opts;
int databufidxfill;
struct mbuf databufs[2];
} influx_client_t;
static void influx_client_send(influx_client_t *ctx);
static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data)
{
// note that while shutting down the ctx is NULL
influx_client_t *ctx = (influx_client_t *)nc->user_data;
struct http_message *hm = (struct http_message *)ev_data;
switch (ev) {
case MG_EV_CONNECT: {
int connect_status = *(int *)ev_data;
if (connect_status != 0) {
// Error, print only once
if (ctx) {
if (ctx->prev_status != connect_status)
print_logf(LOG_WARNING, "InfluxDB", "InfluxDB connect error: %s", strerror(connect_status));
ctx->conn = NULL;
}
}
if (ctx)
ctx->prev_status = connect_status;
break;
}
case MG_EV_HTTP_CHUNK: // response is normally empty (so mongoose thinks we received a chunk only)
case MG_EV_HTTP_REPLY:
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
if (hm->resp_code == 204) {
// mark influx data as sent
}
else {
if (ctx && ctx->prev_resp_code != hm->resp_code)
print_logf(LOG_WARNING, "InfluxDB", "InfluxDB replied HTTP code: %d with message:\n%s", hm->resp_code, hm->body.p);
}
if (ctx) {
ctx->prev_resp_code = hm->resp_code;
}
break;
case MG_EV_CLOSE:
if (ctx) {
ctx->conn = NULL;
influx_client_send(ctx);
}
break;
}
}
static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token)
{
snprintf(ctx->url, sizeof(ctx->url), "%s", url);
snprintf(ctx->extra_headers, sizeof (ctx->extra_headers), "Authorization: Token %s\r\n", token);
return ctx;
}
static void influx_client_send(influx_client_t *ctx)
{
struct mbuf *buf = &ctx->databufs[ctx->databufidxfill];
/*fprintf(stderr, "Influx %p msg: \"%s\" with %lu/%lu %s\n",
(void*)ctx, buf->buf, buf->len, buf->size,
ctx->conn ? "buffering" : "to be sent");*/
if (ctx->conn || !buf->len)
return;
char const *error_string = NULL;
struct mg_connect_opts opts = {.user_data = ctx, .error_string = &error_string};
if (ctx->tls_opts.tls_ca_cert) {
print_logf(LOG_INFO, "InfluxDB", "influxs (TLS) parameters are: "
"tls_cert=%s "
"tls_key=%s "
"tls_ca_cert=%s "
"tls_cipher_suites=%s "
"tls_server_name=%s "
"tls_psk_identity=%s "
"tls_psk_key=%s ",
ctx->tls_opts.tls_cert,
ctx->tls_opts.tls_key,
ctx->tls_opts.tls_ca_cert,
ctx->tls_opts.tls_cipher_suites,
ctx->tls_opts.tls_server_name,
ctx->tls_opts.tls_psk_identity,
ctx->tls_opts.tls_psk_key);
#if MG_ENABLE_SSL
opts.ssl_cert = ctx->tls_opts.tls_cert;
opts.ssl_key = ctx->tls_opts.tls_key;
opts.ssl_ca_cert = ctx->tls_opts.tls_ca_cert;
opts.ssl_cipher_suites = ctx->tls_opts.tls_cipher_suites;
opts.ssl_server_name = ctx->tls_opts.tls_server_name;
opts.ssl_psk_identity = ctx->tls_opts.tls_psk_identity;
opts.ssl_psk_key = ctx->tls_opts.tls_psk_key;
#else
print_log(LOG_FATAL, __func__, "influxs (TLS) not available");
exit(1);
#endif
}
if ((ctx->conn = mg_connect_http_opt(ctx->mgr, influx_client_event, opts, ctx->url, ctx->extra_headers, buf->buf)) == NULL) {
print_logf(LOG_WARNING, "InfluxDB", "Connect to InfluxDB (%s) failed (%s)", ctx->url, error_string);
}
else {
ctx->databufidxfill ^= 1;
buf->len = 0;
*buf->buf = '\0';
}
}
/* Helper */
/// clean the tag/identifier inplace to [-.A-Za-z0-9], esp. not whitespace, =, comma and replace any leading _ by x
static char *influx_sanitize_tag(char *tag, char *end)
{
for (char *p = tag; *p && p != end; ++p)
if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9'))
*p = '_';
for (char *p = tag; *p && p != end; ++p)
if (*p == '_')
*p = 'x';
else
break;
return tag;
}
/// reserve additional space of size len at end of mbuf a; returns actual free space
static size_t mbuf_reserve(struct mbuf *a, size_t len)
{
// insert undefined values at end of current buffer (it will increase the buffer if necessary)
len = mbuf_insert(a, a->len, NULL, len);
// reduce the buffer length again by actual inserted number of bytes
a->len -= len;
len = a->size - a->len;
if (len)
a->buf[a->len] = '\0';
return len;
}
static char *mbuf_snprintf(struct mbuf *a, char const *format, ...)
#if defined(__GNUC__) || defined(__clang__)
__attribute__((format(printf, 2, 3)))
#endif
;
static char *mbuf_snprintf(struct mbuf *a, char const *format, ...)
{
char *str = &a->buf[a->len];
int size;
va_list ap;
va_start(ap, format);
size = vsnprintf(str, a->size - a->len, format, ap);
va_end(ap);
if (size > 0) {
// vsnprintf might return size larger than actually filled
size_t len = strlen(str);
a->len += len;
}
return str;
}
static void mbuf_remove_part(struct mbuf *a, char *pos, size_t len)
{
if (pos >= a->buf && pos < &a->buf[a->len] && &pos[len] <= &a->buf[a->len]) {
memmove(pos, &pos[len], a->len - (pos - a->buf) - len);
a->len -= len;
}
}
static void R_API_CALLCONV print_influx_array(data_output_t *output, data_array_t *array, char const *format)
{
UNUSED(array);
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "\"array\""); // TODO
}
static void R_API_CALLCONV print_influx_data_escaped(data_output_t *output, data_t *data, char const *format)
{
char str[1000];
data_print_jsons(data, str, sizeof (str));
output->print_string(output, str, format);
}
static void R_API_CALLCONV print_influx_string_escaped(data_output_t *output, char const *str, char const *format)
{
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *databuf = &influx->databufs[influx->databufidxfill];
size_t size = databuf->size - databuf->len;
char *buf = &databuf->buf[databuf->len];
if (size < strlen(str) + 3) {
return;
}
*buf++ = '"';
size--;
for (; *str && size >= 3; ++str) {
if (*str == '\r') {
*buf++ = '\\';
size--;
*buf++ = 'r';
size--;
continue;
}
if (*str == '\n') {
*buf++ = '\\';
size--;
*buf++ = 'n';
size--;
continue;
}
if (*str == '\t') {
*buf++ = '\\';
size--;
*buf++ = 't';
size--;
continue;
}
if (*str == '"' || *str == '\\') {
*buf++ = '\\';
size--;
}
*buf++ = *str;
size--;
}
if (size >= 2) {
*buf++ = '"';
size--;
}
*buf = '\0';
databuf->len = databuf->size - size;
}
static void R_API_CALLCONV print_influx_string(data_output_t *output, char const *str, char const *format)
{
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%s", str);
}
// Generate InfluxDB line protocol
static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data, char const *format)
{
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
char *str;
char *end;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
bool comma = false;
data_t *data_org = data;
data_t *data_model = NULL;
data_t *data_time = NULL;
for (data_t *d = data; d; d = d->next) {
if (!strcmp(d->key, "model"))
data_model = d;
if (!strcmp(d->key, "time"))
data_time = d;
}
if (!data_model) {
// data isn't from device (maybe report for example)
// use hostname for measurement
mbuf_reserve(buf, 20000);
mbuf_snprintf(buf, "rtl_433_%s", influx->hostname);
}
else {
// use model for measurement
mbuf_reserve(buf, 1000);
str = &buf->buf[buf->len];
print_value(output, data_model->type, data_model->value, data_model->format);
influx_sanitize_tag(str, NULL);
}
// write tags
while (data) {
if (!strcmp(data->key, "model")
|| !strcmp(data->key, "time")) {
// skip
}
else if (!strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "channel")
|| !strcmp(data->key, "mic")) {
str = mbuf_snprintf(buf, ",%s=", data->key);
str++;
end = &buf->buf[buf->len - 1];
influx_sanitize_tag(str, end);
str = end + 1;
print_value(output, data->type, data->value, data->format);
influx_sanitize_tag(str, NULL);
}
data = data->next;
}
mbuf_snprintf(buf, " ");
// activate escaped output functions
influx->output.print_data = print_influx_data_escaped;
influx->output.print_string = print_influx_string_escaped;
// write fields
data = data_org;
while (data) {
if (!strcmp(data->key, "model")
|| !strcmp(data->key, "time")) {
// skip
}
else if (!strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "channel")
|| !strcmp(data->key, "mic")) {
// skip
}
else {
str = mbuf_snprintf(buf, comma ? ",%s=" : "%s=", data->key);
if (comma)
str++;
end = &buf->buf[buf->len - 1];
influx_sanitize_tag(str, end);
print_value(output, data->type, data->value, data->format);
comma = true;
}
data = data->next;
}
// restore original output functions
influx->output.print_data = print_influx_data;
influx->output.print_string = print_influx_string;
// write time if available
if (data_time) {
str = mbuf_snprintf(buf, " ");
print_value(output, data_time->type, data_time->value, data_time->format);
if (str[1] == '@' // relative time format configured
|| str[11] == ' ' // date time format configured
|| str[11] == 'T') { // ISO date time format configured
// -> bad, because InfluxDB doesn't under stand those formats -> remove timestamp
buf->len = str - buf->buf;
}
else if ((str = strchr(str, '.'))) {
// unix usec timestamp format configured
mbuf_remove_part(buf, str, 1);
mbuf_snprintf(buf, "000");
}
else {
// unix timestamp with seconds resolution configured
mbuf_snprintf(buf, "000000000");
}
}
mbuf_snprintf(buf, "\n");
influx_client_send(influx);
}
static void R_API_CALLCONV print_influx_double(data_output_t *output, double data, char const *format)
{
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%f", data);
}
static void R_API_CALLCONV print_influx_int(data_output_t *output, int data, char const *format)
{
UNUSED(format);
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%d", data);
}
static void R_API_CALLCONV data_output_influx_free(data_output_t *output)
{
influx_client_t *influx = (influx_client_t *)output;
if (!influx)
return;
// remove ctx from our connections
if (influx->conn) {
influx->conn->user_data = NULL;
influx->conn->flags |= MG_F_CLOSE_IMMEDIATELY;
}
free(influx);
}
struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
{
influx_client_t *influx = calloc(1, sizeof(influx_client_t));
if (!influx) {
FATAL_CALLOC("data_output_influx_create()");
}
gethostname(influx->hostname, sizeof(influx->hostname) - 1);
influx->hostname[sizeof(influx->hostname) - 1] = '\0';
// only use hostname, not domain part
char *dot = strchr(influx->hostname, '.');
if (dot)
*dot = '\0';
influx_sanitize_tag(influx->hostname, NULL);
char *token = NULL;
// param/opts starts with URL
if (!opts) {
opts = "";
}
char *url = opts;
opts = strchr(opts, ',');
if (opts) {
*opts = '\0';
opts++;
}
if (strncmp(url, "influx", 6) == 0) {
url += 2;
memcpy(url, "http", 4);
}
if (strncmp(url, "https", 5) == 0) {
influx->tls_opts.tls_ca_cert = "*"; // TLS is enabled but no cert verification is performed.
}
// check if valid URL has been provided
struct mg_str host, path, query;
if (mg_parse_uri(mg_mk_str(url), NULL, NULL, &host, NULL, &path,
&query, NULL) != 0
|| !host.len || !path.len || !query.len) {
print_logf(LOG_FATAL, __func__, "Invalid URL to InfluxDB specified.%s%s%s"
" Something like \"influx://<host>/write?org=<org>&bucket=<bucket>\" required at least.",
!host.len ? " No host specified." : "",
!path.len ? " No path component specified." : "",
!query.len ? " No query parameters specified." : "");
exit(1);
}
// parse auth and format options
char *key, *val;
while (getkwargs(&opts, &key, &val)) {
key = remove_ws(key);
val = trim_ws(val);
if (!key || !*key)
continue;
else if (!strcasecmp(key, "t") || !strcasecmp(key, "token"))
token = val;
else if (!tls_param(&influx->tls_opts, key, val)) {
// ok
}
else {
print_logf(LOG_FATAL, __func__, "Invalid key \"%s\" option.", key);
exit(1);
}
}
influx->output.print_data = print_influx_data;
influx->output.print_array = print_influx_array;
influx->output.print_string = print_influx_string;
influx->output.print_double = print_influx_double;
influx->output.print_int = print_influx_int;
influx->output.output_free = data_output_influx_free;
print_logf(LOG_CRITICAL, "InfluxDB", "Publishing data to InfluxDB (%s)", url);
influx->mgr = mgr;
influx_client_init(influx, url, token);
return (struct data_output *)influx;
}