refactoring and cleaning up proxy code #6
|
@ -1,15 +1,15 @@
|
||||||
#!/usr/bin/python3 -u
|
#!/usr/bin/python3
|
||||||
|
|
||||||
from collections import deque, defaultdict
|
from collections import deque, defaultdict
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import base64
|
import base64
|
||||||
|
import sys
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import zmq
|
import zmq
|
||||||
|
|
||||||
from sudoisbot.config import read_config
|
|
||||||
|
|
||||||
def dealer(dealer_addr, router_addr):
|
def dealer(dealer_addr, router_addr):
|
||||||
print("dealer")
|
print("dealer")
|
||||||
|
@ -31,8 +31,8 @@ def dealer(dealer_addr, router_addr):
|
||||||
context.close()
|
context.close()
|
||||||
|
|
||||||
|
|
||||||
|
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None,
|
||||||
def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/tmp/proxy_cache/"):
|
cache_dir="/tmp/proxy_cache"):
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
|
|
||||||
disk_interval = 3
|
disk_interval = 3
|
||||||
|
@ -48,7 +48,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
for multipart_msg in list(cache[topic]):
|
for multipart_msg in list(cache[topic]):
|
||||||
parts64 = [base64.b64encode(a) for a in multipart_msg]
|
parts64 = [base64.b64encode(a) for a in multipart_msg]
|
||||||
|
|
||||||
#print(parts64)
|
|
||||||
f.write(b"|".join(parts64))
|
f.write(b"|".join(parts64))
|
||||||
f.write(b"\n")
|
f.write(b"\n")
|
||||||
|
|
||||||
|
@ -60,7 +59,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
for line in f.readlines():
|
for line in f.readlines():
|
||||||
parts64 = line.split(b"|")
|
parts64 = line.split(b"|")
|
||||||
yield [base64.b64decode(a) for a in parts64]
|
yield [base64.b64decode(a) for a in parts64]
|
||||||
#os.remove(fullpath)
|
# os.remove(fullpath)
|
||||||
|
|
||||||
def delete_cache_on_disk(topic, target_dir=cache_dir):
|
def delete_cache_on_disk(topic, target_dir=cache_dir):
|
||||||
filename = topic.decode() + ".cache"
|
filename = topic.decode() + ".cache"
|
||||||
|
@ -68,8 +67,8 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
try:
|
try:
|
||||||
os.remove(fullpath)
|
os.remove(fullpath)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.warning(f"could not delete disk cache because {fullpath} does not exist")
|
logger.warning(
|
||||||
|
f"could not delete disk cachem {fullpath} does not exist")
|
||||||
|
|
||||||
# facing publishers
|
# facing publishers
|
||||||
frontend = context.socket(zmq.SUB)
|
frontend = context.socket(zmq.SUB)
|
||||||
|
@ -80,28 +79,23 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
backend = context.socket(zmq.XPUB)
|
backend = context.socket(zmq.XPUB)
|
||||||
backend.bind(backend_addr)
|
backend.bind(backend_addr)
|
||||||
# infrom publishers of a new sink
|
# infrom publishers of a new sink
|
||||||
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||||
|
|
||||||
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
||||||
if capture_addr:
|
if capture_addr:
|
||||||
capture = context.socket(zmq.PUB)
|
capture = context.socket(zmq.PUB)
|
||||||
capture.bind(capture_addr)
|
capture.bind(capture_addr)
|
||||||
logger.info(f"zmq capture: {capture_addr}")
|
logger.info(f"zmq capture: {capture_addr}")
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
capture = None
|
capture = None
|
||||||
|
|
||||||
|
|
||||||
poller = zmq.Poller()
|
poller = zmq.Poller()
|
||||||
poller.register(frontend, zmq.POLLIN)
|
poller.register(frontend, zmq.POLLIN)
|
||||||
poller.register(backend, zmq.POLLIN)
|
poller.register(backend, zmq.POLLIN)
|
||||||
if capture:
|
if capture:
|
||||||
poller.register(backend, zmq.POLLIN)
|
poller.register(backend, zmq.POLLIN)
|
||||||
|
|
||||||
|
|
||||||
# send \x01 to all publishers when they connect
|
# send \x01 to all publishers when they connect
|
||||||
|
|
||||||
lvc = dict()
|
lvc = dict()
|
||||||
cache = defaultdict(deque)
|
cache = defaultdict(deque)
|
||||||
cache_topics = set()
|
cache_topics = set()
|
||||||
|
@ -110,7 +104,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
cache[item[0]].append(item)
|
cache[item[0]].append(item)
|
||||||
|
|
||||||
for topic in cache.keys():
|
for topic in cache.keys():
|
||||||
csize = len(cache[topic])
|
csize = len(cache[topic])
|
||||||
if csize > 0:
|
if csize > 0:
|
||||||
logger.warning(f"{topic} - {csize} cached items loaded")
|
logger.warning(f"{topic} - {csize} cached items loaded")
|
||||||
|
|
||||||
|
@ -123,7 +117,6 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
logger.info("saved cache")
|
logger.info("saved cache")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
if now > disk_at:
|
if now > disk_at:
|
||||||
save_cache_to_disk()
|
save_cache_to_disk()
|
||||||
|
@ -135,7 +128,7 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
k.decode(): len(v) for (k, v) in cache.items()
|
k.decode(): len(v) for (k, v) in cache.items()
|
||||||
},
|
},
|
||||||
'topics': [a.decode() for a in lvc.keys()],
|
'topics': [a.decode() for a in lvc.keys()],
|
||||||
'cache_topics': [a.decode() for a in cache_topics],
|
'cache_topics': [a.decode() for a in cache_topics],
|
||||||
'disk_at': disk_at
|
'disk_at': disk_at
|
||||||
}
|
}
|
||||||
capture.send_multipart([b"meta:stats", json.dumps(stats).encode()])
|
capture.send_multipart([b"meta:stats", json.dumps(stats).encode()])
|
||||||
|
@ -144,15 +137,16 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
msg = frontend.recv_multipart()
|
msg = frontend.recv_multipart()
|
||||||
topic = msg[0]
|
topic = msg[0]
|
||||||
|
|
||||||
#frontend.send_multipart([b"\x00rain"])
|
# frontend.send_multipart([b"\x00rain"])
|
||||||
|
|
||||||
if topic not in lvc:
|
if topic not in lvc:
|
||||||
logger.info(f"caching topic {topic} that hasnt seen a listener yet")
|
logger.info(
|
||||||
|
f"caching topic {topic} that hasnt seen a listener yet")
|
||||||
cache_topics.add(topic)
|
cache_topics.add(topic)
|
||||||
lvc[topic] = msg
|
lvc[topic] = msg
|
||||||
|
|
||||||
if topic in cache_topics:
|
if topic in cache_topics:
|
||||||
#logger.debug(f"[o] cached {msg}")
|
# logger.debug(f"[o] cached {msg}")
|
||||||
cache[topic].append(msg)
|
cache[topic].append(msg)
|
||||||
else:
|
else:
|
||||||
backend.send_multipart(msg)
|
backend.send_multipart(msg)
|
||||||
|
@ -160,20 +154,19 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
if capture:
|
if capture:
|
||||||
capture.send_multipart(msg)
|
capture.send_multipart(msg)
|
||||||
|
|
||||||
|
|
||||||
if backend in events:
|
if backend in events:
|
||||||
|
|
||||||
msg = backend.recv_multipart()
|
msg = backend.recv_multipart()
|
||||||
#logger.warning(f"[x] backend: {msg}")
|
# logger.warning(f"[x] backend: {msg}")
|
||||||
if msg[0][0] == 0:
|
if msg[0][0] == 0:
|
||||||
topic = msg[0][1:]
|
topic = msg[0][1:]
|
||||||
cache_topics.add(topic)
|
cache_topics.add(topic)
|
||||||
logger.info(f"[o] now caching {topic}")
|
logger.info(f"[o] now caching {topic}")
|
||||||
|
|
||||||
if msg[0][0] == 1: #'\x01'
|
if msg[0][0] == 1: # '\x01'
|
||||||
topic = msg[0][1:]
|
topic = msg[0][1:]
|
||||||
if topic not in lvc:
|
if topic not in lvc:
|
||||||
# the keys of the topic dir are also a list of "known topics"
|
# the keys of the topic dir are also a list of
|
||||||
|
# "known topics"
|
||||||
logger.success(f"registered {topic}")
|
logger.success(f"registered {topic}")
|
||||||
lvc[topic] = None
|
lvc[topic] = None
|
||||||
|
|
||||||
|
@ -188,40 +181,33 @@ def proxy_buffering(frontend_addr, backend_addr, capture_addr=None, cache_dir="/
|
||||||
|
|
||||||
save_cache_to_disk()
|
save_cache_to_disk()
|
||||||
|
|
||||||
|
|
||||||
logger.success(f"stopped caching {topic}")
|
logger.success(f"stopped caching {topic}")
|
||||||
cache_topics.discard(topic)
|
cache_topics.discard(topic)
|
||||||
|
|
||||||
|
|
||||||
elif topic in lvc and lvc[topic] is not None:
|
elif topic in lvc and lvc[topic] is not None:
|
||||||
cached = lvc[topic]
|
cached = lvc[topic]
|
||||||
backend.send_multipart(cached + [b"cached"])
|
backend.send_multipart(cached + [b"cached"])
|
||||||
logger.success(f"[>] lvc sent for {topic}")
|
logger.success(f"[>] lvc sent for {topic}")
|
||||||
|
|
||||||
|
# frontend.send(msg)
|
||||||
#frontend.send(msg)
|
# logger.success(f"[>] backend: {msg}")
|
||||||
#logger.success(f"[>] backend: {msg}")
|
|
||||||
|
|
||||||
|
|
||||||
if capture in events:
|
if capture in events:
|
||||||
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
|
logger.warning(f"capture: {capture.recv_mutlipart(msg)}")
|
||||||
|
|
||||||
|
# zmq.proxy(frontend, backend, capture)
|
||||||
#zmq.proxy(frontend, backend, capture)
|
|
||||||
#while True:
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# we never used to get here
|
# we never used to get here
|
||||||
frontend.close()
|
frontend.close()
|
||||||
backend.close()
|
backend.close()
|
||||||
context.close()
|
context.close()
|
||||||
|
|
||||||
|
|
||||||
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
|
|
||||||
# facing publishers
|
# facing publishers
|
||||||
#frontend = context.socket(zmq.XSUB)
|
# frontend = context.socket(zmq.XSUB)
|
||||||
|
|
||||||
frontend = context.socket(zmq.SUB)
|
frontend = context.socket(zmq.SUB)
|
||||||
frontend.setsockopt(zmq.SUBSCRIBE, b'')
|
frontend.setsockopt(zmq.SUBSCRIBE, b'')
|
||||||
|
@ -231,11 +217,10 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
||||||
backend = context.socket(zmq.XPUB)
|
backend = context.socket(zmq.XPUB)
|
||||||
backend.bind(backend_addr)
|
backend.bind(backend_addr)
|
||||||
# infrom publishers of a new sink
|
# infrom publishers of a new sink
|
||||||
#backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
# backend.setsockopt(ZMQ_XPUB_VERBOSE, 1)
|
||||||
|
|
||||||
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
logger.info(f"zmq pubsub proxy: {frontend_addr} -> {backend_addr}")
|
||||||
|
|
||||||
|
|
||||||
if capture_addr:
|
if capture_addr:
|
||||||
capture = context.socket(zmq.PUB)
|
capture = context.socket(zmq.PUB)
|
||||||
capture.bind(capture_addr)
|
capture.bind(capture_addr)
|
||||||
|
@ -253,6 +238,7 @@ def proxy_forwarder(frontend_addr, backend_addr, capture_addr):
|
||||||
capture.close()
|
capture.close()
|
||||||
context.close()
|
context.close()
|
||||||
|
|
||||||
|
|
||||||
def capture(capture_addr):
|
def capture(capture_addr):
|
||||||
capture_port = capture_addr.split(":")[-1]
|
capture_port = capture_addr.split(":")[-1]
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
|
@ -262,35 +248,32 @@ def capture(capture_addr):
|
||||||
socket.connect(addr)
|
socket.connect(addr)
|
||||||
logger.info("connecting to " + addr)
|
logger.info("connecting to " + addr)
|
||||||
|
|
||||||
import pprint
|
|
||||||
import sys
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
r = socket.recv_multipart()
|
r = socket.recv_multipart()
|
||||||
#pprint.pprint(r[1].decode())
|
# pprint.pprint(r[1].decode())
|
||||||
#print(r)
|
# print(r)
|
||||||
jdata = json.loads(r[1].decode())
|
jdata = json.loads(r[1].decode())
|
||||||
|
|
||||||
if "cache_size" in jdata:
|
if "cache_size" in jdata:
|
||||||
print(r[1].decode(), end="\n")
|
print(r[1].decode(), end="\n")
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
#print("")
|
# print("")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main_forwarder(config):
|
def main_forwarder(config):
|
||||||
|
|
||||||
# zmq_in_connect = config['zmq_in_connect']
|
# zmq_in_connect = config['zmq_in_connect']
|
||||||
# zmq_frontend = config['zmq_frontend']
|
# zmq_frontend = config['zmq_frontend']
|
||||||
# zmq_capture = config['zmq_capture']
|
# zmq_capture = config['zmq_capture']
|
||||||
|
|
||||||
zmq_in_connect = "tcp://192.168.1.2:5560"
|
# zmq_in_connect = "tcp://192.168.1.2:5560"
|
||||||
zmq_backend = "tcp://*:5560"
|
# zmq_backend = "tcp://*:5560"
|
||||||
zmq_capture = "tcp://127.0.0.1:5561"
|
# zmq_capture = "tcp://127.0.0.1:5561"
|
||||||
|
|
||||||
|
return proxy_forwarder(
|
||||||
return forwarder(
|
config['frontend_addr'],
|
||||||
config['frontend_addr'], config['backend_addr'], config['capture_addr'])
|
config['backend_addr'],
|
||||||
|
config['capture_addr']
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def main_buffering(args, config):
|
def main_buffering(args, config):
|
||||||
|
@ -299,4 +282,8 @@ def main_buffering(args, config):
|
||||||
return capture(capture_addr)
|
return capture(capture_addr)
|
||||||
|
|
||||||
return proxy_buffering(
|
return proxy_buffering(
|
||||||
config['frontend_addr'], config['backend_addr'], capture_addr, config['cache_dir'])
|
config['frontend_addr'],
|
||||||
|
config['backend_addr'],
|
||||||
|
capture_addr,
|
||||||
|
config['cache_dir']
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue