refactoring and cleaning up proxy code #6
|
@ -1,15 +1,15 @@
|
|||
#!/usr/bin/python3 -u
|
||||
#!/usr/bin/python3
|
||||
|
||||
from collections import deque, defaultdict
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import base64
|
||||
import sys
|
||||
|
||||
from loguru import logger
|
||||
import zmq
|
||||
|
||||
from sudoisbot.config import read_config
|
||||
|
||||
def dealer(dealer_addr, router_addr):
|
||||
print("dealer")
|
||||
|
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
|
|||
context.close()
|
||||
|
||||
|
||||
|
||||
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"):
|
||||
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
|
||||
cache_dir="/tmp/proxy_cache"):
|
||||
context = zmq.Context()
|
||||
|
||||
disk_interval = 3
|
||||
|
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
for multipart_msg in list(cache[topic]):
|
||||
parts64 = [base64.b64encode(a) for a in multipart_msg]
|
||||
|
||||
#print(parts64)
|
||||
f.write(b"|".join(parts64))
|
||||
f.write(b"\n")
|
||||
|
||||
|
@ -60,7 +59,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
for line in f.readlines():
|
||||
parts64 = line.split(b"|")
|
||||
yield [base64.b64decode(a) for a in parts64]
|
||||
#os.remove(fullpath)
|
||||
# os.remove(fullpath)
|
||||
|
||||
def delete_cache_on_disk(topic, target_dir=cache_dir):
|
||||
filename = topic.decode() + ".cache"
|
||||
|
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
try:
|
||||
os.remove(fullpath)
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
|
||||
|
||||
logger.warning(
|
||||
f"could not delete disk cachem {fullpath} does not exist")
|
||||
|
||||
# facing publishers
|
||||
frontend = context.socket(zmq.SUB)
|
||||
|
@ -80,28 +79,23 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
backend = context.socket(zmq.XPUB)
|
||||
backend.bind(backend_addr)
|
||||
# infrom publishers of a new sink
|
||||
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||
|
||||
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
||||
if capture_addr:
|
||||
capture = context.socket(zmq.PUB)
|
||||
capture.bind(capture_addr)
|
||||
logger.info(f"zmq capture: {capture_addr}")
|
||||
|
||||
|
||||
else:
|
||||
capture = None
|
||||
|
||||
|
||||
poller = zmq.Poller()
|
||||
poller.register(frontend, zmq.POLLIN)
|
||||
poller.register(backend, zmq.POLLIN)
|
||||
if capture:
|
||||
poller.register(backend, zmq.POLLIN)
|
||||
|
||||
|
||||
# send \x01 to all publishers when they connect
|
||||
|
||||
lvc = dict()
|
||||
cache = defaultdict(deque)
|
||||
cache_topics = set()
|
||||
|
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
logger.info("saved cache")
|
||||
break
|
||||
|
||||
|
||||
now = int(time.time())
|
||||
if now > disk_at:
|
||||
save_cache_to_disk()
|
||||
|
@ -144,15 +137,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
msg = frontend.recv_multipart()
|
||||
topic = msg[0]
|
||||
|
||||
#frontend.send_multipart([b"\x00rain"])
|
||||
# frontend.send_multipart([b"\x00rain"])
|
||||
|
||||
if topic not in lvc:
|
||||
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
|
||||
logger.info(
|
||||
f"caching topic {topic} that hasnt seen a listener yet")
|
||||
cache_topics.add(topic)
|
||||
lvc[topic] = msg
|
||||
|
||||
if topic in cache_topics:
|
||||
#logger.debug(f"[o] cached {msg}")
|
||||
# logger.debug(f"[o] cached {msg}")
|
||||
cache[topic].append(msg)
|
||||
else:
|
||||
backend.send_multipart(msg)
|
||||
|
@ -160,20 +154,19 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
if capture:
|
||||
capture.send_multipart(msg)
|
||||
|
||||
|
||||
if backend in events:
|
||||
|
||||
msg = backend.recv_multipart()
|
||||
#logger.warning(f"[x] backend: {msg}")
|
||||
# logger.warning(f"[x] backend: {msg}")
|
||||
if msg[0][0] == 0:
|
||||
topic = msg[0][1:]
|
||||
cache_topics.add(topic)
|
||||
logger.info(f"[o] now caching {topic}")
|
||||
|
||||
if msg[0][0] == 1: #'\x01'
|
||||
if msg[0][0] == 1: # '\x01'
|
||||
topic = msg[0][1:]
|
||||
if topic not in lvc:
|
||||
# the keys of the topic dir are also a list of "known topics"
|
||||
# the keys of the topic dir are also a list of
|
||||
# "known topics"
|
||||
logger.success(f"registered {topic}")
|
||||
lvc[topic] = None
|
||||
|
||||
|
@ -188,40 +181,33 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
|||
|
||||
save_cache_to_disk()
|
||||
|
||||
|
||||
logger.success(f"stopped caching {topic}")
|
||||
cache_topics.discard(topic)
|
||||
|
||||
|
||||
elif topic in lvc and lvc[topic] is not None:
|
||||
cached = lvc[topic]
|
||||
backend.send_multipart(cached + [b"cached"])
|
||||
logger.success(f"[>] lvc sent for {topic}")
|
||||
|
||||
|
||||
#frontend.send(msg)
|
||||
#logger.success(f"[>] backend: {msg}")
|
||||
|
||||
# frontend.send(msg)
|
||||
# logger.success(f"[>] backend: {msg}")
|
||||
|
||||
if capture in events:
|
||||
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
|
||||
|
||||
|
||||
#zmq.proxy(frontend, backend, capture)
|
||||
#while True:
|
||||
|
||||
|
||||
# zmq.proxy(frontend, backend, capture)
|
||||
|
||||
# we never used to get here
|
||||
frontend.close()
|
||||
backend.close()
|
||||
context.close()
|
||||
|
||||
|
||||
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
||||
context = zmq.Context()
|
||||
|
||||
# facing publishers
|
||||
#frontend = context.socket(zmq.XSUB)
|
||||
# frontend = context.socket(zmq.XSUB)
|
||||
|
||||
frontend = context.socket(zmq.SUB)
|
||||
frontend.setsockopt(zmq.SUBSCRIBE, b'')
|
||||
|
@ -231,11 +217,10 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
|||
backend = context.socket(zmq.XPUB)
|
||||
backend.bind(backend_addr)
|
||||
# infrom publishers of a new sink
|
||||
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||
|
||||
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
||||
|
||||
|
||||
if capture_addr:
|
||||
capture = context.socket(zmq.PUB)
|
||||
capture.bind(capture_addr)
|
||||
|
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
|||
capture.close()
|
||||
context.close()
|
||||
|
||||
|
||||
def capture(capture_addr):
|
||||
capture_port = capture_addr.split(":")[-1]
|
||||
context = zmq.Context()
|
||||
|
@ -262,35 +248,32 @@ def capture(capture_addr):
|
|||
socket.connect(addr)
|
||||
logger.info("connecting to " + addr)
|
||||
|
||||
import pprint
|
||||
import sys
|
||||
while True:
|
||||
|
||||
r = socket.recv_multipart()
|
||||
#pprint.pprint(r[1].decode())
|
||||
#print(r)
|
||||
# pprint.pprint(r[1].decode())
|
||||
# print(r)
|
||||
jdata = json.loads(r[1].decode())
|
||||
|
||||
if "cache_size" in jdata:
|
||||
print(r[1].decode(), end="\n")
|
||||
sys.stdout.flush()
|
||||
#print("")
|
||||
|
||||
# print("")
|
||||
|
||||
|
||||
def main_forwarder(config):
|
||||
|
||||
# zmq_in_connect = config['zmq_in_connect']
|
||||
# zmq_frontend = config['zmq_frontend']
|
||||
# zmq_capture = config['zmq_capture']
|
||||
|
||||
zmq_in_connect = "tcp://192.168.1.2:5560"
|
||||
zmq_backend = "tcp://*:5560"
|
||||
zmq_capture = "tcp://127.0.0.1:5561"
|
||||
# zmq_in_connect = "tcp://192.168.1.2:5560"
|
||||
# zmq_backend = "tcp://*:5560"
|
||||
# zmq_capture = "tcp://127.0.0.1:5561"
|
||||
|
||||
|
||||
return forwarder(
|
||||
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
|
||||
return proxy_forwarder(
|
||||
config['frontend_addr'],
|
||||
config['backend_addr'],
|
||||
config['capture_addr']
|
||||
)
|
||||
|
||||
|
||||
def main_buffering(args, config):
|
||||
|
@ -299,4 +282,8 @@ def main_buffering(args, config):
|
|||
return capture(capture_addr)
|
||||
|
||||
return proxy_buffering(
|
||||
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir'])
|
||||
config['frontend_addr'],
|
||||
config['backend_addr'],
|
||||
capture_addr,
|
||||
config['cache_dir']
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue