pyzmq/zmq/backend/__init__.pyi

123 lines
3.3 KiB
Python

from typing import Any, Callable, List, Optional, Set, Tuple, TypeVar, Union, overload
from typing_extensions import Literal
import zmq
from .select import select_backend
# avoid collision in Frame.bytes
_bytestr = bytes
T = TypeVar("T")
class Frame:
buffer: Any
bytes: bytes
more: bool
tracker: Any
def __init__(
self,
data: Any = None,
track: bool = False,
copy: bool | None = None,
copy_threshold: int | None = None,
): ...
def copy_fast(self: T) -> T: ...
def get(self, option: int) -> int | _bytestr | str: ...
def set(self, option: int, value: int | _bytestr | str) -> None: ...
class Socket:
underlying: int
context: zmq.Context
copy_threshold: int
# specific option types
FD: int
def __init__(
self,
context: Context | None = None,
socket_type: int = 0,
shadow: int = 0,
copy_threshold: int | None = zmq.COPY_THRESHOLD,
) -> None: ...
def close(self, linger: int | None = ...) -> None: ...
def get(self, option: int) -> int | bytes | str: ...
def set(self, option: int, value: int | bytes | str) -> None: ...
def connect(self, url: str): ...
def disconnect(self, url: str) -> None: ...
def bind(self, url: str): ...
def unbind(self, url: str) -> None: ...
def send(
self,
data: Any,
flags: int = ...,
copy: bool = ...,
track: bool = ...,
) -> zmq.MessageTracker | None: ...
@overload
def recv(
self,
flags: int = ...,
*,
copy: Literal[False],
track: bool = ...,
) -> zmq.Frame: ...
@overload
def recv(
self,
flags: int = ...,
*,
copy: Literal[True],
track: bool = ...,
) -> bytes: ...
@overload
def recv(
self,
flags: int = ...,
track: bool = False,
) -> bytes: ...
@overload
def recv(
self,
flags: int | None = ...,
copy: bool = ...,
track: bool | None = False,
) -> zmq.Frame | bytes: ...
def monitor(self, addr: str | None, events: int) -> None: ...
# draft methods
def join(self, group: str) -> None: ...
def leave(self, group: str) -> None: ...
class Context:
underlying: int
def __init__(self, io_threads: int = 1, shadow: int = 0): ...
def get(self, option: int) -> int | bytes | str: ...
def set(self, option: int, value: int | bytes | str) -> None: ...
def socket(self, socket_type: int) -> Socket: ...
def term(self) -> None: ...
IPC_PATH_MAX_LEN: int
def has(capability: str) -> bool: ...
def curve_keypair() -> tuple[bytes, bytes]: ...
def curve_public(secret_key: bytes) -> bytes: ...
def strerror(errno: int | None = ...) -> str: ...
def zmq_errno() -> int: ...
def zmq_version() -> str: ...
def zmq_version_info() -> tuple[int, int, int]: ...
def zmq_poll(
sockets: list[Any], timeout: int | None = ...
) -> list[tuple[Socket, int]]: ...
def device(device_type: int, frontend: Socket, backend: Socket | None = ...) -> int: ...
def proxy(frontend: Socket, backend: Socket, capture: Socket | None = None) -> int: ...
def proxy_steerable(
frontend: Socket,
backend: Socket,
capture: Socket | None = ...,
control: Socket | None = ...,
) -> int: ...
monitored_queue = Callable | None