mirror of https://github.com/zeromq/pyzmq.git
99 lines
2.2 KiB
Python
99 lines
2.2 KiB
Python
#!/usr/bin/env python
|
|
|
|
'''
|
|
Allow or deny clients based on IP address.
|
|
|
|
Strawhouse, which is plain text with filtering on IP addresses. It still
|
|
uses the NULL mechanism, but we install an authentication hook that checks
|
|
the IP address against a list and allows or denies it
|
|
accordingly.
|
|
|
|
Author: Chris Laws
|
|
'''
|
|
|
|
import logging
|
|
import sys
|
|
|
|
import zmq
|
|
import zmq.auth
|
|
from zmq.auth.thread import ThreadAuthenticator
|
|
|
|
|
|
def run() -> None:
|
|
'''Run strawhouse client'''
|
|
|
|
allow_test_pass = False
|
|
deny_test_pass = False
|
|
|
|
ctx = zmq.Context.instance()
|
|
|
|
# Start an authenticator for this context.
|
|
auth = ThreadAuthenticator(ctx)
|
|
auth.start()
|
|
|
|
# Part 1 - demonstrate allowing clients based on IP address
|
|
auth.allow('127.0.0.1')
|
|
|
|
server = ctx.socket(zmq.PUSH)
|
|
server.zap_domain = b'global' # must come before bind
|
|
server.bind('tcp://*:9000')
|
|
|
|
client_allow = ctx.socket(zmq.PULL)
|
|
client_allow.connect('tcp://127.0.0.1:9000')
|
|
|
|
server.send(b"Hello")
|
|
|
|
msg = client_allow.recv()
|
|
if msg == b"Hello":
|
|
allow_test_pass = True
|
|
|
|
client_allow.close()
|
|
|
|
# Part 2 - demonstrate denying clients based on IP address
|
|
auth.stop()
|
|
|
|
auth = ThreadAuthenticator(ctx)
|
|
auth.start()
|
|
|
|
auth.deny('127.0.0.1')
|
|
|
|
client_deny = ctx.socket(zmq.PULL)
|
|
client_deny.connect('tcp://127.0.0.1:9000')
|
|
|
|
if server.poll(50, zmq.POLLOUT):
|
|
server.send(b"Hello")
|
|
|
|
if client_deny.poll(50):
|
|
msg = client_deny.recv()
|
|
else:
|
|
deny_test_pass = True
|
|
else:
|
|
deny_test_pass = True
|
|
|
|
client_deny.close()
|
|
|
|
auth.stop() # stop auth thread
|
|
|
|
if allow_test_pass and deny_test_pass:
|
|
logging.info("Strawhouse test OK")
|
|
else:
|
|
logging.error("Strawhouse test FAIL")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if zmq.zmq_version_info() < (4, 0):
|
|
raise RuntimeError(
|
|
"Security is not supported in libzmq version < 4.0. libzmq version {}".format(
|
|
zmq.zmq_version()
|
|
)
|
|
)
|
|
|
|
if '-v' in sys.argv:
|
|
level = logging.DEBUG
|
|
else:
|
|
level = logging.INFO
|
|
|
|
logging.basicConfig(level=level, format="[%(levelname)s] %(message)s")
|
|
|
|
run()
|