refactoring and cleaning up proxy code #6

Merged
ben merged 2 commits from proxy-lint into main 2022-09-19 06:12:30 +00:00
1 changed files with 41 additions and 54 deletions

View File

@ -1,15 +1,15 @@
#!/usr/bin/python3 -u #!/usr/bin/python3
from collections import deque, defaultdict from collections import deque, defaultdict
import os import os
import json import json
import time import time
import base64 import base64
import sys
from loguru import logger from loguru import logger
import zmq import zmq
from sudoisbot.config import read_config
def dealer(dealer_addr, router_addr): def dealer(dealer_addr, router_addr):
print("dealer") print("dealer")
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
context.close() context.close()
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"): cache_dir="/tmp/proxy_cache"):
context = zmq.Context() context = zmq.Context()
disk_interval = 3 disk_interval = 3
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
for multipart_msg in list(cache[topic]): for multipart_msg in list(cache[topic]):
parts64 = [base64.b64encode(a) for a in multipart_msg] parts64 = [base64.b64encode(a) for a in multipart_msg]
#print(parts64)
f.write(b"|".join(parts64)) f.write(b"|".join(parts64))
f.write(b"\n") f.write(b"\n")
@ -60,7 +59,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
for line in f.readlines(): for line in f.readlines():
parts64 = line.split(b"|") parts64 = line.split(b"|")
yield [base64.b64decode(a) for a in parts64] yield [base64.b64decode(a) for a in parts64]
#os.remove(fullpath) # os.remove(fullpath)
def delete_cache_on_disk(topic, target_dir=cache_dir): def delete_cache_on_disk(topic, target_dir=cache_dir):
filename = topic.decode() + ".cache" filename = topic.decode() + ".cache"
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
try: try:
os.remove(fullpath) os.remove(fullpath)
except FileNotFoundError: except FileNotFoundError:
logger.warning(f"could not delete disk cache because {fullpath} does not exist") logger.warning(
f"could not delete disk cachem {fullpath} does not exist")
# facing publishers # facing publishers
frontend = context.socket(zmq.SUB) frontend = context.socket(zmq.SUB)
@ -80,28 +79,23 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
backend = context.socket(zmq.XPUB) backend = context.socket(zmq.XPUB)
backend.bind(backend_addr) backend.bind(backend_addr)
# infrom publishers of a new sink # infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) # backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}") logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
if capture_addr: if capture_addr:
capture = context.socket(zmq.PUB) capture = context.socket(zmq.PUB)
capture.bind(capture_addr) capture.bind(capture_addr)
logger.info(f"zmq capture: {capture_addr}") logger.info(f"zmq capture: {capture_addr}")
else: else:
capture = None capture = None
poller = zmq.Poller() poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN) poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN) poller.register(backend, zmq.POLLIN)
if capture: if capture:
poller.register(backend, zmq.POLLIN) poller.register(backend, zmq.POLLIN)
# send \x01 to all publishers when they connect # send \x01 to all publishers when they connect
lvc = dict() lvc = dict()
cache = defaultdict(deque) cache = defaultdict(deque)
cache_topics = set() cache_topics = set()
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
logger.info("saved cache") logger.info("saved cache")
break break
now = int(time.time()) now = int(time.time())
if now > disk_at: if now > disk_at:
save_cache_to_disk() save_cache_to_disk()
@ -144,15 +137,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
msg = frontend.recv_multipart() msg = frontend.recv_multipart()
topic = msg[0] topic = msg[0]
#frontend.send_multipart([b"\x00rain"]) # frontend.send_multipart([b"\x00rain"])
if topic not in lvc: if topic not in lvc:
logger.info(f"caching topic {topic} that hasnt seen a listener yet") logger.info(
f"caching topic {topic} that hasnt seen a listener yet")
cache_topics.add(topic) cache_topics.add(topic)
lvc[topic] = msg lvc[topic] = msg
if topic in cache_topics: if topic in cache_topics:
#logger.debug(f"[o] cached {msg}") # logger.debug(f"[o] cached {msg}")
cache[topic].append(msg) cache[topic].append(msg)
else: else:
backend.send_multipart(msg) backend.send_multipart(msg)
@ -160,20 +154,19 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
if capture: if capture:
capture.send_multipart(msg) capture.send_multipart(msg)
if backend in events: if backend in events:
msg = backend.recv_multipart() msg = backend.recv_multipart()
#logger.warning(f"[x] backend: {msg}") # logger.warning(f"[x] backend: {msg}")
if msg[0][0] == 0: if msg[0][0] == 0:
topic = msg[0][1:] topic = msg[0][1:]
cache_topics.add(topic) cache_topics.add(topic)
logger.info(f"[o] now caching {topic}") logger.info(f"[o] now caching {topic}")
if msg[0][0] == 1: #'\x01' if msg[0][0] == 1: # '\x01'
topic = msg[0][1:] topic = msg[0][1:]
if topic not in lvc: if topic not in lvc:
# the keys of the topic dir are also a list of "known topics" # the keys of the topic dir are also a list of
# "known topics"
logger.success(f"registered {topic}") logger.success(f"registered {topic}")
lvc[topic] = None lvc[topic] = None
@ -188,40 +181,33 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
save_cache_to_disk() save_cache_to_disk()
logger.success(f"stopped caching {topic}") logger.success(f"stopped caching {topic}")
cache_topics.discard(topic) cache_topics.discard(topic)
elif topic in lvc and lvc[topic] is not None: elif topic in lvc and lvc[topic] is not None:
cached = lvc[topic] cached = lvc[topic]
backend.send_multipart(cached + [b"cached"]) backend.send_multipart(cached + [b"cached"])
logger.success(f"[>] lvc sent for {topic}") logger.success(f"[>] lvc sent for {topic}")
# frontend.send(msg)
#frontend.send(msg) # logger.success(f"[>] backend: {msg}")
#logger.success(f"[>] backend: {msg}")
if capture in events: if capture in events:
logger.warning(f"capture: {capture.recv_mutlipart(msg)}") logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
# zmq.proxy(frontend, backend, capture)
#zmq.proxy(frontend, backend, capture)
#while True:
# we never used to get here # we never used to get here
frontend.close() frontend.close()
backend.close() backend.close()
context.close() context.close()
def proxy_forwarder(frontend_addr, backend_addr, capture_addr): def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
context = zmq.Context() context = zmq.Context()
# facing publishers # facing publishers
#frontend = context.socket(zmq.XSUB) # frontend = context.socket(zmq.XSUB)
frontend = context.socket(zmq.SUB) frontend = context.socket(zmq.SUB)
frontend.setsockopt(zmq.SUBSCRIBE, b'') frontend.setsockopt(zmq.SUBSCRIBE, b'')
@ -231,11 +217,10 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
backend = context.socket(zmq.XPUB) backend = context.socket(zmq.XPUB)
backend.bind(backend_addr) backend.bind(backend_addr)
# infrom publishers of a new sink # infrom publishers of a new sink
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1) # backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}") logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
if capture_addr: if capture_addr:
capture = context.socket(zmq.PUB) capture = context.socket(zmq.PUB)
capture.bind(capture_addr) capture.bind(capture_addr)
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
capture.close() capture.close()
context.close() context.close()
def capture(capture_addr): def capture(capture_addr):
capture_port = capture_addr.split(":")[-1] capture_port = capture_addr.split(":")[-1]
context = zmq.Context() context = zmq.Context()
@ -262,35 +248,32 @@ def capture(capture_addr):
socket.connect(addr) socket.connect(addr)
logger.info("connecting to " + addr) logger.info("connecting to " + addr)
import pprint
import sys
while True: while True:
r = socket.recv_multipart() r = socket.recv_multipart()
#pprint.pprint(r[1].decode()) # pprint.pprint(r[1].decode())
#print(r) # print(r)
jdata = json.loads(r[1].decode()) jdata = json.loads(r[1].decode())
if "cache_size" in jdata: if "cache_size" in jdata:
print(r[1].decode(), end="\n") print(r[1].decode(), end="\n")
sys.stdout.flush() sys.stdout.flush()
#print("") # print("")
def main_forwarder(config): def main_forwarder(config):
# zmq_in_connect = config['zmq_in_connect'] # zmq_in_connect = config['zmq_in_connect']
# zmq_frontend = config['zmq_frontend'] # zmq_frontend = config['zmq_frontend']
# zmq_capture = config['zmq_capture'] # zmq_capture = config['zmq_capture']
zmq_in_connect = "tcp://192.168.1.2:5560" # zmq_in_connect = "tcp://192.168.1.2:5560"
zmq_backend = "tcp://*:5560" # zmq_backend = "tcp://*:5560"
zmq_capture = "tcp://127.0.0.1:5561" # zmq_capture = "tcp://127.0.0.1:5561"
return proxy_forwarder(
return forwarder( config['frontend_addr'],
config['frontend_addr'], config['backend_addr'], config['capture_addr']) config['backend_addr'],
config['capture_addr']
)
def main_buffering(args, config): def main_buffering(args, config):
@ -299,4 +282,8 @@ def main_buffering(args, config):
return capture(capture_addr) return capture(capture_addr)
return proxy_buffering( return proxy_buffering(
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir']) config['frontend_addr'],
config['backend_addr'],
capture_addr,
config['cache_dir']
)