refactoring and cleaning up proxy code #6
|
@ -1,15 +1,15 @@
|
|||
#!/usr/bin/python3 -u
|
||||
#!/usr/bin/python3
|
||||
|
||||
from collections import deque, defaultdict
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import base64
|
||||
import sys
|
||||
|
||||
from loguru import logger
|
||||
import zmq
|
||||
|
||||
from sudoisbot.config import read_config
|
||||
|
||||
def dealer(dealer_addr, router_addr):
|
||||
print("dealer")
|
||||
|
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
|
|||
context.close()
|
||||
|
||||
|
||||
|
||||
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"):
|
||||
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
|
||||
cache_dir="/tmp/proxy_cache"):
|
||||
context = zmq.Context()
|
||||
|
||||
disk_interval = 3
|
||||
|
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
for multipart_msg in list(cache[topic]):
|
||||
parts64 = [base64.b64encode(a) for a in multipart_msg]
|
||||
|
||||
#print(parts64)
|
||||
f.write(b"|".join(parts64))
|
||||
f.write(b"\n")
|
||||
|
||||
|
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
try:
|
||||
os.remove(fullpath)
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
|
||||
|
||||
logger.warning(
|
||||
f"could not delete disk cachem {fullpath} does not exist")
|
||||
|
||||
# facing publishers
|
||||
frontend = context.socket(zmq.SUB)
|
||||
|
@ -87,21 +86,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
capture = context.socket(zmq.PUB)
|
||||
capture.bind(capture_addr)
|
||||
logger.info(f"zmq capture: {capture_addr}")
|
||||
|
||||
|
||||
else:
|
||||
capture = None
|
||||
|
||||
|
||||
poller = zmq.Poller()
|
||||
poller.register(frontend, zmq.POLLIN)
|
||||
poller.register(backend, zmq.POLLIN)
|
||||
if capture:
|
||||
poller.register(backend, zmq.POLLIN)
|
||||
|
||||
|
||||
# send \x01 to all publishers when they connect
|
||||
|
||||
lvc = dict()
|
||||
cache = defaultdict(deque)
|
||||
cache_topics = set()
|
||||
|
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
logger.info("saved cache")
|
||||
break
|
||||
|
||||
|
||||
now = int(time.time())
|
||||
if now > disk_at:
|
||||
save_cache_to_disk()
|
||||
|
@ -147,7 +140,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
# frontend.send_multipart([b"\x00rain"])
|
||||
|
||||
if topic not in lvc:
|
||||
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
|
||||
logger.info(
|
||||
f"caching topic {topic} that hasnt seen a listener yet")
|
||||
cache_topics.add(topic)
|
||||
lvc[topic] = msg
|
||||
|
||||
|
@ -160,9 +154,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
if capture:
|
||||
capture.send_multipart(msg)
|
||||
|
||||
|
||||
if backend in events:
|
||||
|
||||
msg = backend.recv_multipart()
|
||||
# logger.warning(f"[x] backend: {msg}")
|
||||
if msg[0][0] == 0:
|
||||
|
@ -173,7 +165,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
if msg[0][0] == 1: # '\x01'
|
||||
topic = msg[0][1:]
|
||||
if topic not in lvc:
|
||||
# the keys of the topic dir are also a list of "known topics"
|
||||
# the keys of the topic dir are also a list of
|
||||
# "known topics"
|
||||
logger.success(f"registered {topic}")
|
||||
lvc[topic] = None
|
||||
|
||||
|
@ -188,35 +181,28 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
|
||||
save_cache_to_disk()
|
||||
|
||||
|
||||
logger.success(f"stopped caching {topic}")
|
||||
cache_topics.discard(topic)
|
||||
|
||||
|
||||
elif topic in lvc and lvc[topic] is not None:
|
||||
cached = lvc[topic]
|
||||
backend.send_multipart(cached + [b"cached"])
|
||||
logger.success(f"[>] lvc sent for {topic}")
|
||||
|
||||
|
||||
# frontend.send(msg)
|
||||
# logger.success(f"[>] backend: {msg}")
|
||||
|
||||
|
||||
if capture in events:
|
||||
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
|
||||
|
||||
|
||||
# zmq.proxy(frontend, backend, capture)
|
||||
#while True:
|
||||
|
||||
|
||||
|
||||
# we never used to get here
|
||||
frontend.close()
|
||||
backend.close()
|
||||
context.close()
|
||||
|
||||
|
||||
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
||||
context = zmq.Context()
|
||||
|
||||
|
@ -235,7 +221,6 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
|||
|
||||
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
||||
|
||||
|
||||
if capture_addr:
|
||||
capture = context.socket(zmq.PUB)
|
||||
capture.bind(capture_addr)
|
||||
|
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
|||
capture.close()
|
||||
context.close()
|
||||
|
||||
|
||||
def capture(capture_addr):
|
||||
capture_port = capture_addr.split(":")[-1]
|
||||
context = zmq.Context()
|
||||
|
@ -262,10 +248,7 @@ def capture(capture_addr):
|
|||
socket.connect(addr)
|
||||
logger.info("connecting to " + addr)
|
||||
|
||||
import pprint
|
||||
import sys
|
||||
while True:
|
||||
|
||||
r = socket.recv_multipart()
|
||||
# pprint.pprint(r[1].decode())
|
||||
# print(r)
|
||||
|
@ -277,20 +260,20 @@ def capture(capture_addr):
|
|||
# print("")
|
||||
|
||||
|
||||
|
||||
def main_forwarder(config):
|
||||
|
||||
# zmq_in_connect = config['zmq_in_connect']
|
||||
# zmq_frontend = config['zmq_frontend']
|
||||
# zmq_capture = config['zmq_capture']
|
||||
|
||||
zmq_in_connect = "tcp://192.168.1.2:5560"
|
||||
zmq_backend = "tcp://*:5560"
|
||||
zmq_capture = "tcp://127.0.0.1:5561"
|
||||
# zmq_in_connect = "tcp://192.168.1.2:5560"
|
||||
# zmq_backend = "tcp://*:5560"
|
||||
# zmq_capture = "tcp://127.0.0.1:5561"
|
||||
|
||||
|
||||
return forwarder(
|
||||
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
|
||||
return proxy_forwarder(
|
||||
config['frontend_addr'],
|
||||
config['backend_addr'],
|
||||
config['capture_addr']
|
||||
)
|
||||
|
||||
|
||||
def main_buffering(args, config):
|
||||
|
@ -299,4 +282,8 @@ def main_buffering(args, config):
|
|||
return capture(capture_addr)
|
||||
|
||||
return proxy_buffering(
|
||||
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir'])
|
||||
config['frontend_addr'],
|
||||
config['backend_addr'],
|
||||
capture_addr,
|
||||
config['cache_dir']
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue