Browse Source

better error handling and improved tests

pull/1/head
parent
commit
22f27abf05
  1. 1
      .gitignore
  2. 2
      Dockerfile
  3. 88
      tests/test_zflux.py
  4. 11
      zflux/config.py
  5. 38
      zflux/zflux.py

1
.gitignore

@ -4,3 +4,4 @@ nunittest.log
tests/__pycache__
zflux/__pycache__
*.log
zflux.egg-info/

2
Dockerfile

@ -11,7 +11,7 @@ RUN pip install /zflux
# idea is to override with bind mounts
# since config.py doesnt do env vars as-is
ENV ZFLUX_CONF "/zflux.yml"
ENV ZFLUX_CONF "/etc/zflux.yml"
EXPOSE 5559
ENTRYPOINT ["zflux"]

88
tests/test_zflux.py

@ -1,10 +1,12 @@
import threading
from time import sleep, time
from socket import gaierror
import random
import zmq.error
from requests.exceptions import RequestException
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
from loguru import logger
from zflux.zflux import Zflux
@ -14,8 +16,9 @@ from tests.stresstester import StressTester
logger.add("unittest.log")
logger.info("-----")
class StopTestingMe(Exception): pass
EXCEPTIONS = [InfluxDBServerError, gaierror, RequestException, ValueError]
class StopTestingMe(Exception): pass
class Zflux2(Zflux):
def __init__(self, topic, count, *args, **kwargs):
@ -49,55 +52,58 @@ class Zflux2(Zflux):
temp_last = val
if self.fake_influxdb_errors:
if random.randint(0, 1000) < 2:
if random.randint(0, 500) == 2:
err = f"simulating errors {self.last_i}/{self.count}"
raise RequestException(err)
exc = random.choice(EXCEPTIONS)
raise exc(err)
self.last_i = temp_last
if self.last_i >= self.count:
logger.success(f"counted {self.last_i} messages, stopping")
raise StopTestingMe
def influxdb_write2(self, msgs):
if len(msgs) > self.batch or len(msgs) == 0:
raise ValueError("just send one chunk")
last_val = self.last_i
if last_val is None:
next_ = msgs[0]['fields']['value']
if next_ != 0:
self.missed = next_
logger.warning(f"start: {self.missed}")
last_val = next_-1
# def influxdb_write2(self, msgs):
# if len(msgs) > self.batch or len(msgs) == 0:
# raise ValueError("just send one chunk")
for msg in msgs:
val = msg['fields']['value']
diff = val - last_val
# last_val = self.last_i
# if last_val is None:
# next_ = msgs[0]['fields']['value']
# if next_ != 0:
# self.missed = next_
# logger.warning(f"start: {self.missed}")
# last_val = next_-1
assert val > last_val, f"new value sould be larger, {val}>{last_val}"
assert diff == 1, f"{diff} == 1, val: {val}, last_val: {last_val}"
# for msg in msgs:
# val = msg['fields']['value']
# diff = val - last_val
# if diff == 0 and last_val == 0:
# # starting from 0 is a speial case
# pass
if self.fake_influxdb_errors:
if random.randint(0, 100) < random.randint(0, 10):
raise RequestException("simulating errors")
# assert val > last_val, f"new value sould be larger, {val}>{last_val}"
# assert diff == 1, f"{diff} == 1, val: {val}, last_val: {last_val}"
# if val % 1000 == 0:
# logger.info(f"{val}/{self.count}")
last_val = val
# # if diff == 0 and last_val == 0:
# # # starting from 0 is a speial case
# # pass
# if self.fake_influxdb_errors:
# if random.randint(0, 100) < random.randint(0, 10):
# raise RequestException("simulating errors")
if last_val >= self.count:
logger.info(f"got {val}/{self.count} messags in order")
raise StopTestingMe
# # if val % 1000 == 0:
# # logger.info(f"{val}/{self.count}")
# last_val = val
self.last_i = last_val
# if last_val >= self.count:
# logger.info(f"got {val}/{self.count} messags in order")
# raise StopTestingMe
return True
# self.last_i = last_val
# return True
class Job(threading.Thread):
def __init__(self, *args, **kwargs):
@ -144,9 +150,9 @@ class ZfluxJob(Job):
try:
self.zflux.run()
except StopTestingMe:
return True
except Exception:
# dont delete this
raise
class StressJob(Job):
@ -195,21 +201,20 @@ class StressJob(Job):
# run_test_threads(conf, count, poll_secs=1.0)
def test_counting_local():
conf = Config.read('test-zflux-local.yml')
count = random.randint(3000, 5000)
run_test_threads(conf, count)
run_test_threads(count)
def test_counting_local_noerrors():
conf = Config.read('test-zflux-local.yml')
count = random.randint(3000, 5000)
run_test_threads(conf, count, fake_influxdb_errors=False)
run_test_threads(count, fake_influxdb_errors=False)
def run_test_threads(conf, count, **kwargs):
zflux_thread = ZfluxJob(conf, count, **kwargs)
def run_test_threads(count, **kwargs):
conf = Config.read('test-zflux-local.yml')
zflux_thread = ZfluxJob(conf, count, max_age=0, **kwargs)
zflux_thread.start()
sleep(2)
@ -232,10 +237,7 @@ def test_zflux_version():
assert __version__ == '0.1.0'
if __name__ == "__main__":
from zflux.config import argparser
args = argparser().parse_args()
conf = Config.read(args.config)
count = random.randint(3000, 5000)
run_test_threads(conf, count)
run_test_threads(count)

11
zflux/config.py

@ -40,7 +40,6 @@ class Config:
name: str
zmq: ZmqConfig
influxdb: InfluxDBConfig
test: bool = False
@classmethod
def read(cls, path=None):
@ -48,10 +47,10 @@ class Config:
if not path:
locations = [
os.environ.get("ZFLUX_CONF", ""),
os.path.join(os.curdir, "zflux.yml"),
os.path.join(os.path.expanduser("~"), ".zflux.yml"),
"/usr/local/etc/zflux.yml",
'/etc/zflux.yml',
os.path.join(os.curdir, "zflux.yml")
]
else:
if path.startswith("/"):
@ -67,18 +66,14 @@ class Config:
yconfig = yaml.safe_load(cf)
zconf = yconfig['zmq']
inflconf = yconfig['influxdb']
testconf = yconfig.get('test', False)
logger.debug(f"using confg file {conffile}")
try:
if not testconf:
import sys
logger.remove()
logger.add(sys.stderr, backtrace=False, diagnose=False)
return cls(
name=conffile,
zmq=cls.ZmqConfig(**zconf),
influxdb=cls.InfluxDBConfig(**inflconf),
test=False
)
except TypeError as e:
logger.exception(e)

38
zflux/zflux.py

@ -15,6 +15,9 @@ from socket import gaierror
from requests.exceptions import RequestException
from influxdb.exceptions import InfluxDBServerError, InfluxDBClientError
def exc_str(exception):
return f"{type(exception).__name__}: {exception}"
class Zflux(object):
def __init__(self, topic, batch=4, max_age=5, poll_secs=10):
@ -34,7 +37,7 @@ class Zflux(object):
self.batch = batch
self.max_age=max_age
self.influx_at = time()
self.influx_at = time() + self.max_age
self.buffer = deque()
# PUSH/PULL is round-robin
@ -80,6 +83,7 @@ class Zflux(object):
msgs,
time_precision=self.precision,
batch_size=self.batch)
if not write:
raise ValueError("influxdb client write returned False")
return len(msgs)
@ -92,6 +96,7 @@ class Zflux(object):
if len(self.buffer) > 0:
self.handle_buffer()
except KeyboardInterrupt:
if len(self.buffer) > 0:
logger.info(f"nonempty buffer {len(self.buffer)}, flushing")
@ -111,6 +116,7 @@ class Zflux(object):
topic, msg = self.socket.recv_multipart()
# topic is not used but very probably will be
jmsg = json.loads(msg.decode())
#logger.debug(jmsg)
self.buffer.append(jmsg)
@ -118,8 +124,15 @@ class Zflux(object):
now = time()
count = len(self.buffer)
if (now > self.influx_at + self.max_age) or (count > self.batch):
self.send_buffer()
if (now > self.influx_at ) or (count > self.batch):
until_next = int(self.influx_at) - int(now)
if until_next <= self.max_age:
# if self.influx_at is more than a max_age away, an error handler
# is asking us to wait
self.send_buffer()
def send_buffer(self):
try:
@ -127,21 +140,18 @@ class Zflux(object):
thisbatch = list(islice(self.buffer, self.batch))
#thisbatch = self.buffer[:1]
self.influxdb_write(thisbatch)
self.influx_at = time()
self.influx_at = time() + self.max_age
for _ in range(len(thisbatch)):
self.buffer.popleft()
except (gaierror, RequestException, ValueError) as e:
logger.error(e)
logger.info(f"messages in buffer: {len(self.buffer)}")
except InfluxDBServerError as e:
if e.args[0]['error'] == 'timeout':
# influxdb.exceptions.InfluxDBServerError: {"error":"timeout"}
logger.error(e)
else:
raise
except (gaierror, RequestException, ValueError, InfluxDBServerError) as e:
#if isinstance(e, InfluxDBServerError) and e.args[0]['error'] == 'timeout':
# # influxdb.exceptions.InfluxDBServerError: {"error":"timeout"}
logger.warning(exc_str(e))
logger.warning(f"buffer size: {len(self.buffer)}, wait: {self.max_age*3}s")
self.influx_at = time() + self.max_age*3
except InfluxDBClientError as e:
logger.error(e)
logger.error(exc_str(e))
# exiting since we won't try to recover from influxdb client
# errors
raise SystemExit(e)

Loading…
Cancel
Save