From 6cfb7fa738eb7d540c3272be430639626e1f5221 Mon Sep 17 00:00:00 2001 From: Ben Kristinsson Date: Sun, 18 Sep 2022 16:44:31 +0200 Subject: [PATCH 1/2] flake8 fixes --- sudoisbot/network/proxy.py | 93 ++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 53 deletions(-) diff --git a/sudoisbot/network/proxy.py b/sudoisbot/network/proxy.py index 0c7338f..dfa3e5a 100644 --- a/sudoisbot/network/proxy.py +++ b/sudoisbot/network/proxy.py @@ -5,11 +5,11 @@ import os import json import time import base64 +import sys from loguru import logger import zmq -from sudoisbot.config import read_config def dealer(dealer_addr, router_addr): print("dealer") @@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr): context.close() - -def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"): +def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, + cache_dir="/tmp/proxy_cache"): context = zmq.Context() disk_interval = 3 @@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ for multipart_msg in list(cache[topic]): parts64 = [base64.b64encode(a) for a in multipart_msg] - #print(parts64) f.write(b"|".join(parts64)) f.write(b"\n") @@ -60,7 +59,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ for line in f.readlines(): parts64 = line.split(b"|") yield [base64.b64decode(a) for a in parts64] - #os.remove(fullpath) + # os.remove(fullpath) def delete_cache_on_disk(topic, target_dir=cache_dir): filename = topic.decode() + ".cache" @@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ try: os.remove(fullpath) except FileNotFoundError: - logger.warning(f"could not delete disk cache because {fullpath} does not exist") - + logger.warning( + f"could not delete disk cachem {fullpath} does not exist") # facing publishers frontend = context.socket(zmq.SUB) @@ -80,28 +79,23 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ backend = context.socket(zmq.XPUB) backend.bind(backend_addr) # infrom publishers of a new sink - #backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) + # backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}") if capture_addr: capture = context.socket(zmq.PUB) capture.bind(capture_addr) logger.info(f"zmq capture: {capture_addr}") - - else: capture = None - poller = zmq.Poller() poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) if capture: poller.register(backend, zmq.POLLIN) - # send \x01 to all publishers when they connect - lvc = dict() cache = defaultdict(deque) cache_topics = set() @@ -110,7 +104,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ cache[item[0]].append(item) for topic in cache.keys(): - csize = len(cache[topic]) + csize = len(cache[topic]) if csize > 0: logger.warning(f"{topic} - {csize} cached items loaded") @@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ logger.info("saved cache") break - now = int(time.time()) if now > disk_at: save_cache_to_disk() @@ -135,7 +128,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ k.decode(): len(v) for (k, v) in cache.items() }, 'topics': [a.decode() for a in lvc.keys()], - 'cache_topics': [a.decode() for a in cache_topics], + 'cache_topics': [a.decode() for a in cache_topics], 'disk_at': disk_at } capture.send_multipart([b"meta:stats", json.dumps(stats).encode()]) @@ -144,15 +137,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ msg = frontend.recv_multipart() topic = msg[0] - #frontend.send_multipart([b"\x00rain"]) + # frontend.send_multipart([b"\x00rain"]) if topic not in lvc: - logger.info(f"caching topic {topic} that hasnt seen a listener yet") + logger.info( + f"caching topic {topic} that hasnt seen a listener yet") cache_topics.add(topic) lvc[topic] = msg if topic in cache_topics: - #logger.debug(f"[o] cached {msg}") + # logger.debug(f"[o] cached {msg}") cache[topic].append(msg) else: backend.send_multipart(msg) @@ -160,20 +154,19 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ if capture: capture.send_multipart(msg) - if backend in events: - msg = backend.recv_multipart() - #logger.warning(f"[x] backend: {msg}") + # logger.warning(f"[x] backend: {msg}") if msg[0][0] == 0: topic = msg[0][1:] cache_topics.add(topic) logger.info(f"[o] now caching {topic}") - if msg[0][0] == 1: #'\x01' + if msg[0][0] == 1: # '\x01' topic = msg[0][1:] if topic not in lvc: - # the keys of the topic dir are also a list of "known topics" + # the keys of the topic dir are also a list of + # "known topics" logger.success(f"registered {topic}") lvc[topic] = None @@ -188,40 +181,33 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/ save_cache_to_disk() - logger.success(f"stopped caching {topic}") cache_topics.discard(topic) - elif topic in lvc and lvc[topic] is not None: cached = lvc[topic] backend.send_multipart(cached + [b"cached"]) logger.success(f"[>] lvc sent for {topic}") - - #frontend.send(msg) - #logger.success(f"[>] backend: {msg}") - + # frontend.send(msg) + # logger.success(f"[>] backend: {msg}") if capture in events: logger.warning(f"capture: {capture.recv_mutlipart(msg)}") - - #zmq.proxy(frontend, backend, capture) - #while True: - - + # zmq.proxy(frontend, backend, capture) # we never used to get here frontend.close() backend.close() context.close() + def proxy_forwarder(frontend_addr, backend_addr, capture_addr): context = zmq.Context() # facing publishers - #frontend = context.socket(zmq.XSUB) + # frontend = context.socket(zmq.XSUB) frontend = context.socket(zmq.SUB) frontend.setsockopt(zmq.SUBSCRIBE, b'') @@ -231,11 +217,10 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr): backend = context.socket(zmq.XPUB) backend.bind(backend_addr) # infrom publishers of a new sink - #backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) + # backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}") - if capture_addr: capture = context.socket(zmq.PUB) capture.bind(capture_addr) @@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr): capture.close() context.close() + def capture(capture_addr): capture_port = capture_addr.split(":")[-1] context = zmq.Context() @@ -262,35 +248,32 @@ def capture(capture_addr): socket.connect(addr) logger.info("connecting to " + addr) - import pprint - import sys while True: - r = socket.recv_multipart() - #pprint.pprint(r[1].decode()) - #print(r) + # pprint.pprint(r[1].decode()) + # print(r) jdata = json.loads(r[1].decode()) if "cache_size" in jdata: print(r[1].decode(), end="\n") sys.stdout.flush() - #print("") - + # print("") def main_forwarder(config): - # zmq_in_connect = config['zmq_in_connect'] # zmq_frontend = config['zmq_frontend'] # zmq_capture = config['zmq_capture'] - zmq_in_connect = "tcp://192.168.1.2:5560" - zmq_backend = "tcp://*:5560" - zmq_capture = "tcp://127.0.0.1:5561" + # zmq_in_connect = "tcp://192.168.1.2:5560" + # zmq_backend = "tcp://*:5560" + # zmq_capture = "tcp://127.0.0.1:5561" - - return forwarder( - config['frontend_addr'], config['backend_addr'], config['capture_addr']) + return proxy_forwarder( + config['frontend_addr'], + config['backend_addr'], + config['capture_addr'] + ) def main_buffering(args, config): @@ -299,4 +282,8 @@ def main_buffering(args, config): return capture(capture_addr) return proxy_buffering( - config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir']) + config['frontend_addr'], + config['backend_addr'], + capture_addr, + config['cache_dir'] + ) -- 2.40.1 From 382739c0c749788c77cb77f45afa7da635ea09d2 Mon Sep 17 00:00:00 2001 From: Ben Kristinsson Date: Sun, 18 Sep 2022 16:46:31 +0200 Subject: [PATCH 2/2] remove old hashbang that forced python to run with unbuffered --- sudoisbot/network/proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sudoisbot/network/proxy.py b/sudoisbot/network/proxy.py index dfa3e5a..f4e9897 100644 --- a/sudoisbot/network/proxy.py +++ b/sudoisbot/network/proxy.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 -u +#!/usr/bin/python3 from collections import deque, defaultdict import os -- 2.40.1