refactoring and cleaning up proxy code #6

Merged
ben merged 2 commits from proxy-lint into main 2022-09-19 06:12:30 +00:00
1 changed files with 41 additions and 54 deletions

View File

@ -1,15 +1,15 @@
#!/usr/bin/python3 -u
#!/usr/bin/python3
from collections import deque, defaultdict
import os
import json
import time
import base64
import sys
from loguru import logger
import zmq
from sudoisbot.config import read_config
def dealer(dealer_addr, router_addr):
print("dealer")
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
context.close()
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"):
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
cache_dir="/tmp/proxy_cache"):
context = zmq.Context()
disk_interval = 3
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
for multipart_msg in list(cache[topic]):
parts64 = [base64.b64encode(a) for a in multipart_msg]
#print(parts64)
f.write(b"|".join(parts64))
f.write(b"\n")
@ -60,7 +59,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
for line in f.readlines():
parts64 = line.split(b"|")
yield [base64.b64decode(a) for a in parts64]
#os.remove(fullpath)
# os.remove(fullpath)
def delete_cache_on_disk(topic, target_dir=cache_dir):
filename = topic.decode() + ".cache"
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
try:
os.remove(fullpath)
except FileNotFoundError:
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
logger.warning(
f"could not delete disk cachem {fullpath} does not exist")
# facing publishers
frontend = context.socket(zmq.SUB)
@ -80,28 +79,23 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
backend = context.socket(zmq.XPUB)
backend.bind(backend_addr)
# infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
if capture_addr:
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
logger.info(f"zmq capture: {capture_addr}")
else:
capture = None
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
if capture:
poller.register(backend, zmq.POLLIN)
# send \x01 to all publishers when they connect
lvc = dict()
cache = defaultdict(deque)
cache_topics = set()
@ -110,7 +104,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
cache[item[0]].append(item)
for topic in cache.keys():
csize = len(cache[topic])
csize = len(cache[topic])
if csize > 0:
logger.warning(f"{topic} - {csize} cached items loaded")
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
logger.info("saved cache")
break
now = int(time.time())
if now > disk_at:
save_cache_to_disk()
@ -135,7 +128,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
k.decode(): len(v) for (k, v) in cache.items()
},
'topics': [a.decode() for a in lvc.keys()],
'cache_topics': [a.decode() for a in cache_topics],
'cache_topics': [a.decode() for a in cache_topics],
'disk_at': disk_at
}
capture.send_multipart([b"meta:stats", json.dumps(stats).encode()])
@ -144,15 +137,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
msg = frontend.recv_multipart()
topic = msg[0]
#frontend.send_multipart([b"\x00rain"])
# frontend.send_multipart([b"\x00rain"])
if topic not in lvc:
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
logger.info(
f"caching topic {topic} that hasnt seen a listener yet")
cache_topics.add(topic)
lvc[topic] = msg
if topic in cache_topics:
#logger.debug(f"[o] cached {msg}")
# logger.debug(f"[o] cached {msg}")
cache[topic].append(msg)
else:
backend.send_multipart(msg)
@ -160,20 +154,19 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
if capture:
capture.send_multipart(msg)
if backend in events:
msg = backend.recv_multipart()
#logger.warning(f"[x] backend: {msg}")
# logger.warning(f"[x] backend: {msg}")
if msg[0][0] == 0:
topic = msg[0][1:]
cache_topics.add(topic)
logger.info(f"[o] now caching {topic}")
if msg[0][0] == 1: #'\x01'
if msg[0][0] == 1: # '\x01'
topic = msg[0][1:]
if topic not in lvc:
# the keys of the topic dir are also a list of "known topics"
# the keys of the topic dir are also a list of
# "known topics"
logger.success(f"registered {topic}")
lvc[topic] = None
@ -188,40 +181,33 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
save_cache_to_disk()
logger.success(f"stopped caching {topic}")
cache_topics.discard(topic)
elif topic in lvc and lvc[topic] is not None:
cached = lvc[topic]
backend.send_multipart(cached + [b"cached"])
logger.success(f"[>] lvc sent for {topic}")
#frontend.send(msg)
#logger.success(f"[>] backend: {msg}")
# frontend.send(msg)
# logger.success(f"[>] backend: {msg}")
if capture in events:
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
#zmq.proxy(frontend, backend, capture)
#while True:
# zmq.proxy(frontend, backend, capture)
# we never used to get here
frontend.close()
backend.close()
context.close()
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
context = zmq.Context()
# facing publishers
#frontend = context.socket(zmq.XSUB)
# frontend = context.socket(zmq.XSUB)
frontend = context.socket(zmq.SUB)
frontend.setsockopt(zmq.SUBSCRIBE, b'')
@ -231,11 +217,10 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
backend = context.socket(zmq.XPUB)
backend.bind(backend_addr)
# infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
if capture_addr:
capture = context.socket(zmq.PUB)
capture.bind(capture_addr)
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
capture.close()
context.close()
def capture(capture_addr):
capture_port = capture_addr.split(":")[-1]
context = zmq.Context()
@ -262,35 +248,32 @@ def capture(capture_addr):
socket.connect(addr)
logger.info("connecting to " + addr)
import pprint
import sys
while True:
r = socket.recv_multipart()
#pprint.pprint(r[1].decode())
#print(r)
# pprint.pprint(r[1].decode())
# print(r)
jdata = json.loads(r[1].decode())
if "cache_size" in jdata:
print(r[1].decode(), end="\n")
sys.stdout.flush()
#print("")
# print("")
def main_forwarder(config):
# zmq_in_connect = config['zmq_in_connect']
# zmq_frontend = config['zmq_frontend']
# zmq_capture = config['zmq_capture']
zmq_in_connect = "tcp://192.168.1.2:5560"
zmq_backend = "tcp://*:5560"
zmq_capture = "tcp://127.0.0.1:5561"
# zmq_in_connect = "tcp://192.168.1.2:5560"
# zmq_backend = "tcp://*:5560"
# zmq_capture = "tcp://127.0.0.1:5561"
return forwarder(
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
return proxy_forwarder(
config['frontend_addr'],
config['backend_addr'],
config['capture_addr']
)
def main_buffering(args, config):
@ -299,4 +282,8 @@ def main_buffering(args, config):
return capture(capture_addr)
return proxy_buffering(
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir'])
config['frontend_addr'],
config['backend_addr'],
capture_addr,
config['cache_dir']
)