pyzmq/zmq/backend/cython/socket.pyx

849 lines
26 KiB
Cython

"""0MQ Socket class."""
#
# Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
#
# This file is part of pyzmq.
#
# pyzmq is free software; you can redistribute it and/or modify it under
# the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# pyzmq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Lesser GNU General Public License for more details.
#
# You should have received a copy of the Lesser GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#-----------------------------------------------------------------------------
# Cython Imports
#-----------------------------------------------------------------------------
# get version-independent aliases:
cdef extern from "pyversion_compat.h":
pass
from cpython cimport (
PY_VERSION_HEX,
Py_DECREF,
Py_INCREF,
PyBytes_AsString,
PyBytes_FromStringAndSize,
PyBytes_Size,
)
from libc.errno cimport ENAMETOOLONG, ENOENT, ENOTSOCK
from libc.string cimport memcpy
from zmq.utils.buffers cimport asbuffer_r
from .context cimport Context
from .libzmq cimport (
ZMQ_ETERM,
ZMQ_EVENT_ALL,
ZMQ_IDENTITY,
ZMQ_LINGER,
ZMQ_TYPE,
fd_t,
int64_t,
zmq_bind,
zmq_close,
zmq_connect,
zmq_disconnect,
zmq_errno,
zmq_getsockopt,
zmq_join,
zmq_leave,
zmq_msg_close,
zmq_msg_data,
zmq_msg_init,
zmq_msg_init_size,
zmq_msg_recv,
zmq_msg_send,
zmq_msg_size,
zmq_msg_t,
zmq_setsockopt,
zmq_socket,
zmq_socket_monitor,
zmq_unbind,
)
from .message cimport Frame, copy_zmq_msg_bytes
cdef extern from "Python.h":
ctypedef int Py_ssize_t
cdef extern from "ipcmaxlen.h":
int get_ipc_path_max_len()
cdef extern from "getpid_compat.h":
int getpid()
#-----------------------------------------------------------------------------
# Python Imports
#-----------------------------------------------------------------------------
import codecs
import copy as copy_mod
import random
import struct
import sys
import time
try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
import zmq
from zmq.constants import SocketOption, _OptType
from .checkrc cimport _check_rc
from zmq.error import InterruptedSystemCall, ZMQBindError, ZMQError, _check_version
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
IPC_PATH_MAX_LEN = get_ipc_path_max_len()
# inline some small socket submethods:
# true methods frequently cannot be inlined, acc. Cython docs
cdef inline Py_ssize_t nbytes(buf) except -1:
"""get n bytes"""
if PY_VERSION_HEX >= 0x03030000:
return buf.nbytes
cdef Py_ssize_t n = buf.itemsize
cdef Py_ssize_t ndim = buf.ndim
cdef Py_ssize_t i
for i in range(ndim):
n *= buf.shape[i]
return n
cdef inline _check_closed(Socket s):
"""raise ENOTSUP if socket is closed
Does not do a deep check
"""
if s._closed:
raise ZMQError(ENOTSOCK)
cdef inline _check_closed_deep(Socket s):
"""thorough check of whether the socket has been closed,
even if by another entity (e.g. ctx.destroy).
Only used by the `closed` property.
returns True if closed, False otherwise
"""
cdef int rc
cdef int errno
cdef int stype
cdef size_t sz=sizeof(int)
if s._closed:
return True
else:
rc = zmq_getsockopt(s.handle, ZMQ_TYPE, <void *>&stype, &sz)
if rc < 0:
errno = zmq_errno()
if errno == ENOTSOCK:
s._closed = True
return True
elif errno == ZMQ_ETERM:
# don't raise ETERM when checking if we're closed
return False
else:
_check_rc(rc)
return False
cdef inline Frame _recv_frame(void *handle, int flags=0, track=False):
"""Receive a message in a non-copying manner and return a Frame."""
cdef int rc
msg = zmq.Frame(track=track)
cdef Frame cmsg = msg
while True:
with nogil:
rc = zmq_msg_recv(&cmsg.zmq_msg, handle, flags)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
return msg
cdef inline object _recv_copy(void *handle, int flags=0):
"""Receive a message and return a copy"""
cdef zmq_msg_t zmq_msg
rc = zmq_msg_init (&zmq_msg)
_check_rc(rc)
while True:
with nogil:
rc = zmq_msg_recv(&zmq_msg, handle, flags)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
except Exception:
zmq_msg_close(&zmq_msg) # ensure msg is closed on failure
raise
else:
break
msg_bytes = copy_zmq_msg_bytes(&zmq_msg)
zmq_msg_close(&zmq_msg)
return msg_bytes
cdef inline object _send_frame(void *handle, Frame msg, int flags=0):
"""Send a Frame on this socket in a non-copy manner."""
cdef int rc
cdef Frame msg_copy
# Always copy so the original message isn't garbage collected.
# This doesn't do a real copy, just a reference.
msg_copy = msg.fast_copy()
while True:
with nogil:
rc = zmq_msg_send(&msg_copy.zmq_msg, handle, flags)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
return msg.tracker
cdef inline object _send_copy(void *handle, object msg, int flags=0):
"""Send a message on this socket by copying its content."""
cdef int rc
cdef zmq_msg_t data
cdef char *msg_c
cdef Py_ssize_t msg_c_len=0
# copy to c array:
asbuffer_r(msg, <void **>&msg_c, &msg_c_len)
# Copy the msg before sending. This avoids any complications with
# the GIL, etc.
# If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error)
rc = zmq_msg_init_size(&data, msg_c_len)
_check_rc(rc)
while True:
with nogil:
memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
rc = zmq_msg_send(&data, handle, flags)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
except Exception:
zmq_msg_close(&data) # close the unused msg
raise # raise original exception
else:
rc = zmq_msg_close(&data)
_check_rc(rc)
break
cdef inline object _getsockopt(void *handle, int option, void *optval, size_t *sz):
"""getsockopt, retrying interrupted calls
checks rc, raising ZMQError on failure.
"""
cdef int rc=0
while True:
rc = zmq_getsockopt(handle, option, optval, sz)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
cdef inline object _setsockopt(void *handle, int option, void *optval, size_t sz):
"""setsockopt, retrying interrupted calls
checks rc, raising ZMQError on failure.
"""
cdef int rc=0
while True:
rc = zmq_setsockopt(handle, option, optval, sz)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
cdef class Socket:
"""Socket(context, socket_type)
A 0MQ socket.
These objects will generally be constructed via the socket() method of a Context object.
Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads.
Parameters
----------
context : Context
The 0MQ Context this Socket belongs to.
socket_type : int
The socket type, which can be any of the 0MQ socket types:
REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB.
See Also
--------
.Context.socket : method for creating a socket bound to a Context.
"""
def __init__(self, context=None, socket_type=-1, size_t shadow=0, copy_threshold=None):
if copy_threshold is None:
copy_threshold = zmq.COPY_THRESHOLD
self.copy_threshold = copy_threshold
self.handle = NULL
self.context = context
if shadow:
self._shadow = True
self.handle = <void *>shadow
else:
if context is None:
raise TypeError("context must be specified")
if socket_type < 0:
raise TypeError("socket_type must be specified")
self._shadow = False
self.handle = zmq_socket(self.context.handle, socket_type)
if self.handle == NULL:
raise ZMQError()
self._closed = False
self._pid = getpid()
def __cinit__(self, *args, **kwargs):
# basic init
self.handle = NULL
self._pid = 0
self._shadow = False
self.context = None
@property
def underlying(self):
"""The address of the underlying libzmq socket"""
return <size_t> self.handle
@property
def closed(self):
return _check_closed_deep(self)
def close(self, linger=None):
"""s.close(linger=None)
Close the socket.
If linger is specified, LINGER sockopt will be set prior to closing.
This can be called to close the socket by hand. If this is not
called, the socket will automatically be closed when it is
garbage collected.
"""
cdef int rc=0
cdef int linger_c
cdef bint setlinger=False
if linger is not None:
linger_c = linger
setlinger=True
if self.handle != NULL and not self._closed and getpid() == self._pid:
if setlinger:
zmq_setsockopt(self.handle, ZMQ_LINGER, &linger_c, sizeof(int))
self._c_close()
cdef void _c_close(self):
"""Close underlying socket and unregister with self.context"""
rc = zmq_close(self.handle)
if rc < 0 and zmq_errno() != ENOTSOCK:
# ignore ENOTSOCK (closed by Context)
_check_rc(rc)
self._closed = True
self.handle = NULL
def set(self, int option, optval):
"""s.set(option, optval)
Set socket options.
See the 0MQ API documentation for details on specific options.
Parameters
----------
option : int
The option to set. Available values will depend on your
version of libzmq. Examples include::
zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
optval : int or bytes
The value of the option to set.
Notes
-----
.. warning::
All options other than zmq.SUBSCRIBE, zmq.UNSUBSCRIBE and
zmq.LINGER only take effect for subsequent socket bind/connects.
"""
cdef int64_t optval_int64_c
cdef int optval_int_c
cdef char* optval_c
cdef Py_ssize_t sz
_check_closed(self)
if isinstance(optval, str):
raise TypeError("unicode not allowed, use setsockopt_string")
try:
sopt = SocketOption(option)
except ValueError:
# unrecognized option,
# assume from the future,
# let EINVAL raise
opt_type = _OptType.int
else:
opt_type = sopt._opt_type
if opt_type == _OptType.bytes:
if not isinstance(optval, bytes):
raise TypeError('expected bytes, got: %r' % optval)
optval_c = PyBytes_AsString(optval)
sz = PyBytes_Size(optval)
_setsockopt(self.handle, option, optval_c, sz)
elif opt_type == _OptType.int64:
if not isinstance(optval, int):
raise TypeError('expected int, got: %r' % optval)
optval_int64_c = optval
_setsockopt(self.handle, option, &optval_int64_c, sizeof(int64_t))
else:
# default is to assume int, which is what most new sockopts will be
# this lets pyzmq work with newer libzmq which may add constants
# pyzmq has not yet added, rather than artificially raising. Invalid
# sockopts will still raise just the same, but it will be libzmq doing
# the raising.
if not isinstance(optval, int):
raise TypeError('expected int, got: %r' % optval)
optval_int_c = optval
_setsockopt(self.handle, option, &optval_int_c, sizeof(int))
def get(self, int option):
"""s.get(option)
Get the value of a socket option.
See the 0MQ API documentation for details on specific options.
Parameters
----------
option : int
The option to get. Available values will depend on your
version of libzmq. Examples include::
zmq.IDENTITY, HWM, LINGER, FD, EVENTS
Returns
-------
optval : int or bytes
The value of the option as a bytestring or int.
"""
cdef int64_t optval_int64_c
cdef int optval_int_c
cdef fd_t optval_fd_c
cdef char identity_str_c [255]
cdef size_t sz
cdef int rc
_check_closed(self)
try:
sopt = SocketOption(option)
except ValueError:
# unrecognized option,
# assume from the future,
# let EINVAL raise
opt_type = _OptType.int
else:
opt_type = sopt._opt_type
if opt_type == _OptType.bytes:
sz = 255
_getsockopt(self.handle, option, <void *>identity_str_c, &sz)
# strip null-terminated strings *except* identity
if option != ZMQ_IDENTITY and sz > 0 and (<char *>identity_str_c)[sz-1] == b'\0':
sz -= 1
result = PyBytes_FromStringAndSize(<char *>identity_str_c, sz)
elif opt_type == _OptType.int64:
sz = sizeof(int64_t)
_getsockopt(self.handle, option, <void *>&optval_int64_c, &sz)
result = optval_int64_c
elif opt_type == _OptType.fd:
sz = sizeof(fd_t)
_getsockopt(self.handle, option, <void *>&optval_fd_c, &sz)
result = optval_fd_c
else:
# default is to assume int, which is what most new sockopts will be
# this lets pyzmq work with newer libzmq which may add constants
# pyzmq has not yet added, rather than artificially raising. Invalid
# sockopts will still raise just the same, but it will be libzmq doing
# the raising.
sz = sizeof(int)
_getsockopt(self.handle, option, <void *>&optval_int_c, &sz)
result = optval_int_c
return result
def bind(self, addr):
"""s.bind(addr)
Bind the socket to an address.
This causes the socket to listen on a network port. Sockets on the
other side of this connection will use ``Socket.connect(addr)`` to
connect to this socket.
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported include
tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_closed(self)
addr_b = addr
if isinstance(addr, str):
addr_b = addr.encode('utf-8')
elif isinstance(addr_b, bytes):
addr = addr_b.decode('utf-8')
if not isinstance(addr_b, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr_b
rc = zmq_bind(self.handle, c_addr)
if rc != 0:
if IPC_PATH_MAX_LEN and zmq_errno() == ENAMETOOLONG:
path = addr.split('://', 1)[-1]
msg = ('ipc path "{0}" is longer than {1} '
'characters (sizeof(sockaddr_un.sun_path)). '
'zmq.IPC_PATH_MAX_LEN constant can be used '
'to check addr length (if it is defined).'
.format(path, IPC_PATH_MAX_LEN))
raise ZMQError(msg=msg)
elif zmq_errno() == ENOENT:
path = addr.split('://', 1)[-1]
msg = ('No such file or directory for ipc path "{0}".'.format(
path))
raise ZMQError(msg=msg)
while True:
try:
_check_rc(rc)
except InterruptedSystemCall:
rc = zmq_bind(self.handle, c_addr)
continue
else:
break
def connect(self, addr):
"""s.connect(addr)
Connect to a remote 0MQ socket.
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_closed(self)
if isinstance(addr, str):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
while True:
try:
rc = zmq_connect(self.handle, c_addr)
_check_rc(rc)
except InterruptedSystemCall:
# retry syscall
continue
else:
break
def unbind(self, addr):
"""s.unbind(addr)
Unbind from an address (undoes a call to bind).
.. versionadded:: libzmq-3.2
.. versionadded:: 13.0
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_version((3,2), "unbind")
_check_closed(self)
if isinstance(addr, str):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_unbind(self.handle, c_addr)
if rc != 0:
raise ZMQError()
def disconnect(self, addr):
"""s.disconnect(addr)
Disconnect from a remote 0MQ socket (undoes a call to connect).
.. versionadded:: libzmq-3.2
.. versionadded:: 13.0
Parameters
----------
addr : str
The address string. This has the form 'protocol://interface:port',
for example 'tcp://127.0.0.1:5555'. Protocols supported are
tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
encoded to utf-8 first.
"""
cdef int rc
cdef char* c_addr
_check_version((3,2), "disconnect")
_check_closed(self)
if isinstance(addr, str):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
rc = zmq_disconnect(self.handle, c_addr)
if rc != 0:
raise ZMQError()
def monitor(self, addr, int events=ZMQ_EVENT_ALL):
"""s.monitor(addr, flags)
Start publishing socket events on inproc.
See libzmq docs for zmq_monitor for details.
While this function is available from libzmq 3.2,
pyzmq cannot parse monitor messages from libzmq prior to 4.0.
.. versionadded: libzmq-3.2
.. versionadded: 14.0
Parameters
----------
addr : str
The inproc url used for monitoring. Passing None as
the addr will cause an existing socket monitor to be
deregistered.
events : int [default: zmq.EVENT_ALL]
The zmq event bitmask for which events will be sent to the monitor.
"""
cdef int rc, c_flags
cdef char* c_addr = NULL
_check_version((3,2), "monitor")
if addr is not None:
if isinstance(addr, str):
addr = addr.encode('utf-8')
if not isinstance(addr, bytes):
raise TypeError('expected str, got: %r' % addr)
c_addr = addr
c_flags = events
rc = zmq_socket_monitor(self.handle, c_addr, c_flags)
_check_rc(rc)
def join(self, group):
"""join(group)
Join a RADIO-DISH group
Only for DISH sockets.
libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
.. versionadded:: 17
"""
_check_version((4,2), "RADIO-DISH")
if not zmq.has('draft'):
raise RuntimeError("libzmq must be built with draft support")
if isinstance(group, str):
group = group.encode('utf8')
cdef int rc = zmq_join(self.handle, group)
_check_rc(rc)
def leave(self, group):
"""leave(group)
Leave a RADIO-DISH group
Only for DISH sockets.
libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
.. versionadded:: 17
"""
_check_version((4,2), "RADIO-DISH")
if not zmq.has('draft'):
raise RuntimeError("libzmq must be built with draft support")
cdef int rc = zmq_leave(self.handle, group)
_check_rc(rc)
#-------------------------------------------------------------------------
# Sending and receiving messages
#-------------------------------------------------------------------------
cpdef send(self, object data, int flags=0, copy=True, track=False):
"""Send a single zmq message frame on this socket.
This queues the message to be sent by the IO thread at a later time.
With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
otherwise, this waits until space is available.
See :class:`Poller` for more general non-blocking I/O.
Parameters
----------
data : bytes, Frame, memoryview
The content of the message. This can be any object that provides
the Python buffer API (`memoryview(data)` can be called).
flags : int
0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
copy : bool
Should the message be sent in a copying or non-copying manner.
track : bool
Should the message be tracked for notification that ZMQ has
finished with it? (ignored if copy=True)
Returns
-------
None : if `copy` or not track
None if message was sent, raises an exception otherwise.
MessageTracker : if track and not copy
a MessageTracker object, whose `pending` property will
be True until the send is completed.
Raises
------
TypeError
If a unicode object is passed
ValueError
If `track=True`, but an untracked Frame is passed.
ZMQError
for any of the reasons zmq_msg_send might fail (including
if NOBLOCK is set and the outgoing queue is full).
"""
_check_closed(self)
if isinstance(data, str):
raise TypeError("unicode not allowed, use send_string")
if copy and not isinstance(data, Frame):
return _send_copy(self.handle, data, flags)
else:
if isinstance(data, Frame):
if track and not data.tracker:
raise ValueError('Not a tracked message')
msg = data
else:
if self.copy_threshold:
buf = memoryview(data)
# always copy messages smaller than copy_threshold
if nbytes(buf) < self.copy_threshold:
_send_copy(self.handle, buf, flags)
return zmq._FINISHED_TRACKER
msg = Frame(data, track=track, copy_threshold=self.copy_threshold)
return _send_frame(self.handle, msg, flags)
cpdef recv(self, int flags=0, copy=True, track=False):
"""s.recv(flags=0, copy=True, track=False)
Receive a message.
With flags=NOBLOCK, this raises :class:`ZMQError` if no messages have
arrived; otherwise, this waits until a message arrives.
See :class:`Poller` for more general non-blocking I/O.
Parameters
----------
flags : int
0 or NOBLOCK.
copy : bool
Should the message be received in a copying or non-copying manner?
If False a Frame object is returned, if True a string copy of
message is returned.
track : bool
Should the message be tracked for notification that ZMQ has
finished with it? (ignored if copy=True)
Returns
-------
msg : bytes or Frame
The received message frame. If `copy` is False, then it will be a Frame,
otherwise it will be bytes.
Raises
------
ZMQError
for any of the reasons zmq_msg_recv might fail (including if
NOBLOCK is set and no new messages have arrived).
"""
_check_closed(self)
if copy:
return _recv_copy(self.handle, flags)
else:
frame = _recv_frame(self.handle, flags, track)
frame.more = self.get(zmq.RCVMORE)
return frame
__all__ = ['Socket', 'IPC_PATH_MAX_LEN']