refactoring and cleaning up proxy code #6

Merged
ben merged 2 commits from proxy-lint into main 2022-09-19 06:12:30 +00:00
1 changed files with 41 additions and 54 deletions

View File

@ -1,15 +1,15 @@
#!/usr/bin/python3 -u
#!/usr/bin/python3
from collections import deque, defaultdict
import os
import json
import time
import base64
import sys
from loguru import logger
import zmq
from sudoisbot.config import read_config
def dealer(dealer_addr, router_addr):
print("dealer")
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
context.close()
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"):
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
cache_dir="/tmp/proxy_cache"):
context = zmq.Context()
disk_interval = 3
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
for multipart_msg in list(cache[topic]):
parts64 = [base64.b64encode(a) for a in multipart_msg]
#print(parts64)
f.write(b"|".join(parts64))
f.write(b"\n")
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
try:
os.remove(fullpath)
except FileNotFoundError:
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
logger.warning(
f"could not delete disk cachem {fullpath} does not exist")
# facing publishers
frontend = context.socket(zmq.SUB)
@ -87,21 +86,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
logger.info(f"zmq capture: {capture_addr}")
else:
capture = None
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
if capture:
poller.register(backend, zmq.POLLIN)
# send \x01 to all publishers when they connect
lvc = dict()
cache = defaultdict(deque)
cache_topics = set()
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
logger.info("saved cache")
break
now = int(time.time())
if now > disk_at:
save_cache_to_disk()
@ -147,7 +140,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
# frontend.send_multipart([b"\x00rain"])
if topic not in lvc:
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
logger.info(
f"caching topic {topic} that hasnt seen a listener yet")
cache_topics.add(topic)
lvc[topic] = msg
@ -160,9 +154,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
if capture:
capture.send_multipart(msg)
if backend in events:
msg = backend.recv_multipart()
# logger.warning(f"[x] backend: {msg}")
if msg[0][0] == 0:
@ -173,7 +165,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
if msg[0][0] == 1: # '\x01'
topic = msg[0][1:]
if topic not in lvc:
# the keys of the topic dir are also a list of "known topics"
# the keys of the topic dir are also a list of
# "known topics"
logger.success(f"registered {topic}")
lvc[topic] = None
@ -188,35 +181,28 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
save_cache_to_disk()
logger.success(f"stopped caching {topic}")
cache_topics.discard(topic)
elif topic in lvc and lvc[topic] is not None:
cached = lvc[topic]
backend.send_multipart(cached + [b"cached"])
logger.success(f"[>] lvc sent for {topic}")
# frontend.send(msg)
# logger.success(f"[>] backend: {msg}")
if capture in events:
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
# zmq.proxy(frontend, backend, capture)
#while True:
# we never used to get here
frontend.close()
backend.close()
context.close()
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
context = zmq.Context()
@ -235,7 +221,6 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
if capture_addr:
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
capture.close()
context.close()
def capture(capture_addr):
capture_port = capture_addr.split(":")[-1]
context = zmq.Context()
@ -262,10 +248,7 @@ def capture(capture_addr):
socket.connect(addr)
logger.info("connecting to " + addr)
import pprint
import sys
while True:
r = socket.recv_multipart()
# pprint.pprint(r[1].decode())
# print(r)
@ -277,20 +260,20 @@ def capture(capture_addr):
# print("")
def main_forwarder(config):
# zmq_in_connect = config['zmq_in_connect']
# zmq_frontend = config['zmq_frontend']
# zmq_capture = config['zmq_capture']
zmq_in_connect = "tcp://192.168.1.2:5560"
zmq_backend = "tcp://*:5560"
zmq_capture = "tcp://127.0.0.1:5561"
# zmq_in_connect = "tcp://192.168.1.2:5560"
# zmq_backend = "tcp://*:5560"
# zmq_capture = "tcp://127.0.0.1:5561"
return forwarder(
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
return proxy_forwarder(
config['frontend_addr'],
config['backend_addr'],
config['capture_addr']
)
def main_buffering(args, config):
@ -299,4 +282,8 @@ def main_buffering(args, config):
return capture(capture_addr)
return proxy_buffering(
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir'])
config['frontend_addr'],
config['backend_addr'],
capture_addr,
config['cache_dir']
)