33
33
"""
34
34
raise ImportError (msg )
35
35
36
- import time
37
36
import warnings
38
37
from threading import Event
38
+ from time import monotonic
39
39
from weakref import ref
40
40
41
41
import cython as C
143
143
144
144
import zmq
145
145
from zmq .constants import SocketOption , _OptType
146
- from zmq .error import InterruptedSystemCall , ZMQError , _check_version
146
+ from zmq .error import (
147
+ Again ,
148
+ ContextTerminated ,
149
+ InterruptedSystemCall ,
150
+ ZMQError ,
151
+ _check_version ,
152
+ )
147
153
148
154
IPC_PATH_MAX_LEN = get_ipc_path_max_len ()
149
155
@@ -162,20 +168,12 @@ def _check_rc(rc: C.int, error_without_errno: bint = False) -> C.int:
162
168
return 0
163
169
if rc == - 1 : # if rc < -1, it's a bug in libzmq. Should we warn?
164
170
if errno == EINTR :
165
- from zmq .error import InterruptedSystemCall
166
-
167
171
raise InterruptedSystemCall (errno )
168
172
elif errno == EAGAIN :
169
- from zmq .error import Again
170
-
171
173
raise Again (errno )
172
174
elif errno == ZMQ_ETERM :
173
- from zmq .error import ContextTerminated
174
-
175
175
raise ContextTerminated (errno )
176
176
else :
177
- from zmq .error import ZMQError
178
-
179
177
raise ZMQError (errno )
180
178
return 0
181
179
@@ -272,6 +270,10 @@ def __init__(
272
270
if copy_threshold is None :
273
271
copy_threshold = zmq .COPY_THRESHOLD
274
272
273
+ c_copy_threshold : C .size_t = 0
274
+ if copy_threshold is not None :
275
+ c_copy_threshold = copy_threshold
276
+
275
277
zmq_msg_ptr : pointer (zmq_msg_t ) = address (self .zmq_msg )
276
278
# init more as False
277
279
self .more = False
@@ -301,13 +303,16 @@ def __init__(
301
303
data_len_c = _asbuffer (data , cast (pointer (p_void ), address (data_c )))
302
304
303
305
# copy unspecified, apply copy_threshold
306
+ c_copy : bint = True
304
307
if copy is None :
305
- if copy_threshold and data_len_c < copy_threshold :
306
- copy = True
308
+ if c_copy_threshold and data_len_c < c_copy_threshold :
309
+ c_copy = True
307
310
else :
308
- copy = False
311
+ c_copy = False
312
+ else :
313
+ c_copy = copy
309
314
310
- if copy :
315
+ if c_copy :
311
316
# copy message data instead of sharing memory
312
317
rc = zmq_msg_init_size (zmq_msg_ptr , data_len_c )
313
318
_check_rc (rc )
@@ -710,7 +715,7 @@ def closed(self):
710
715
"""Whether the socket is closed"""
711
716
return _check_closed_deep (self )
712
717
713
- def close (self , linger = None ):
718
+ def close (self , linger : int | None = None ):
714
719
"""
715
720
Close the socket.
716
721
@@ -732,7 +737,7 @@ def close(self, linger=None):
732
737
if setlinger :
733
738
zmq_setsockopt (self .handle , ZMQ_LINGER , address (linger_c ), sizeof (int ))
734
739
rc = zmq_close (self .handle )
735
- if rc < 0 and zmq_errno () != ENOTSOCK :
740
+ if rc < 0 and _zmq_errno () != ENOTSOCK :
736
741
# ignore ENOTSOCK (closed by Context)
737
742
_check_rc (rc )
738
743
self ._closed = True
@@ -877,7 +882,7 @@ def get(self, option: C.int):
877
882
878
883
return result
879
884
880
- def bind (self , addr ):
885
+ def bind (self , addr : str ):
881
886
"""
882
887
Bind the socket to an address.
883
888
@@ -894,21 +899,13 @@ def bind(self, addr):
894
899
encoded to utf-8 first.
895
900
"""
896
901
rc : C .int
897
- c_addr : p_char
902
+ b_addr : bytes = addr .encode ('utf-8' )
903
+ c_addr : p_char = b_addr
898
904
899
905
_check_closed (self )
900
- addr_b = addr
901
- if isinstance (addr , str ):
902
- addr_b = addr .encode ('utf-8' )
903
- elif isinstance (addr_b , bytes ):
904
- addr = addr_b .decode ('utf-8' )
905
-
906
- if not isinstance (addr_b , bytes ):
907
- raise TypeError (f'expected str, got: { addr !r} ' )
908
- c_addr = addr_b
909
906
rc = zmq_bind (self .handle , c_addr )
910
907
if rc != 0 :
911
- if IPC_PATH_MAX_LEN and zmq_errno () == ENAMETOOLONG :
908
+ if IPC_PATH_MAX_LEN and _zmq_errno () == ENAMETOOLONG :
912
909
path = addr .split ('://' , 1 )[- 1 ]
913
910
msg = (
914
911
f'ipc path "{ path } " is longer than { IPC_PATH_MAX_LEN } '
@@ -917,7 +914,7 @@ def bind(self, addr):
917
914
'to check addr length (if it is defined).'
918
915
)
919
916
raise ZMQError (msg = msg )
920
- elif zmq_errno () == ENOENT :
917
+ elif _zmq_errno () == ENOENT :
921
918
path = addr .split ('://' , 1 )[- 1 ]
922
919
msg = f'No such file or directory for ipc path "{ path } ".'
923
920
raise ZMQError (msg = msg )
@@ -930,7 +927,7 @@ def bind(self, addr):
930
927
else :
931
928
break
932
929
933
- def connect (self , addr ) :
930
+ def connect (self , addr : str ) -> None :
934
931
"""
935
932
Connect to a remote 0MQ socket.
936
933
@@ -943,14 +940,10 @@ def connect(self, addr):
943
940
encoded to utf-8 first.
944
941
"""
945
942
rc : C .int
946
- c_addr : p_char
943
+ b_addr : bytes = addr .encode ('utf-8' )
944
+ c_addr : p_char = b_addr
947
945
948
946
_check_closed (self )
949
- if isinstance (addr , str ):
950
- addr = addr .encode ('utf-8' )
951
- if not isinstance (addr , bytes ):
952
- raise TypeError (f'expected str, got: { addr !r} ' )
953
- c_addr = addr
954
947
955
948
while True :
956
949
try :
@@ -1061,7 +1054,7 @@ def monitor(self, addr, events: C.int = ZMQ_EVENT_ALL):
1061
1054
1062
1055
_check_rc (zmq_socket_monitor (self .handle , c_addr , events ))
1063
1056
1064
- def join (self , group ):
1057
+ def join (self , group : str | bytes ):
1065
1058
"""
1066
1059
Join a RADIO-DISH group
1067
1060
@@ -1076,7 +1069,8 @@ def join(self, group):
1076
1069
raise RuntimeError ("libzmq must be built with draft support" )
1077
1070
if isinstance (group , str ):
1078
1071
group = group .encode ('utf8' )
1079
- rc : C .int = zmq_join (self .handle , group )
1072
+ c_group : bytes = group
1073
+ rc : C .int = zmq_join (self .handle , c_group )
1080
1074
_check_rc (rc )
1081
1075
1082
1076
def leave (self , group ):
@@ -1152,8 +1146,10 @@ def send(self, data, flags=0, copy: bint = True, track: bint = False):
1152
1146
else :
1153
1147
if self .copy_threshold :
1154
1148
buf = memoryview (data )
1149
+ nbytes : C .int = buf .nbytes
1150
+ copy_threshold : C .int = self .copy_threshold
1155
1151
# always copy messages smaller than copy_threshold
1156
- if buf . nbytes < self . copy_threshold :
1152
+ if nbytes < copy_threshold :
1157
1153
_send_copy (self .handle , buf , flags )
1158
1154
return zmq ._FINISHED_TRACKER
1159
1155
msg = Frame (data , track = track , copy_threshold = self .copy_threshold )
@@ -1310,7 +1306,7 @@ def _check_closed_deep(s: Socket) -> bint:
1310
1306
s .handle , ZMQ_TYPE , cast (p_void , address (stype )), address (sz )
1311
1307
)
1312
1308
if rc < 0 :
1313
- errno = zmq_errno ()
1309
+ errno = _zmq_errno ()
1314
1310
if errno == ENOTSOCK :
1315
1311
s ._closed = True
1316
1312
return True
@@ -1465,7 +1461,7 @@ def _setsockopt(handle: p_void, option: C.int, optval: p_void, sz: size_t):
1465
1461
# General utility functions
1466
1462
1467
1463
1468
- def zmq_errno ():
1464
+ def zmq_errno () -> C . int :
1469
1465
"""Return the integer errno of the most recent zmq error."""
1470
1466
return _zmq_errno ()
1471
1467
@@ -1487,17 +1483,14 @@ def zmq_version_info() -> tuple[int, int, int]:
1487
1483
return (major , minor , patch )
1488
1484
1489
1485
1490
- def has (capability ) -> bool :
1486
+ def has (capability : str ) -> bool :
1491
1487
"""Check for zmq capability by name (e.g. 'ipc', 'curve')
1492
1488
1493
1489
.. versionadded:: libzmq-4.1
1494
1490
.. versionadded:: 14.1
1495
1491
"""
1496
1492
_check_version ((4 , 1 ), 'zmq.has' )
1497
- ccap : bytes
1498
- if isinstance (capability , str ):
1499
- capability = capability .encode ('utf8' )
1500
- ccap = capability
1493
+ ccap : bytes = capability .encode ('utf8' )
1501
1494
return bool (zmq_has (ccap ))
1502
1495
1503
1496
@@ -1578,6 +1571,8 @@ def zmq_poll(sockets, timeout: C.int = -1):
1578
1571
"""
1579
1572
rc : C .int
1580
1573
i : C .int
1574
+ fileno : fd_t
1575
+ events : C .int
1581
1576
pollitems : pointer (zmq_pollitem_t ) = NULL
1582
1577
nsockets : C .int = len (sockets )
1583
1578
@@ -1596,8 +1591,9 @@ def zmq_poll(sockets, timeout: C.int = -1):
1596
1591
pollitems [i ].events = events
1597
1592
pollitems [i ].revents = 0
1598
1593
elif isinstance (s , int ):
1594
+ fileno = s
1599
1595
pollitems [i ].socket = NULL
1600
- pollitems [i ].fd = s
1596
+ pollitems [i ].fd = fileno
1601
1597
pollitems [i ].events = events
1602
1598
pollitems [i ].revents = 0
1603
1599
elif hasattr (s , 'fileno' ):
@@ -1618,17 +1614,19 @@ def zmq_poll(sockets, timeout: C.int = -1):
1618
1614
f"a fileno() method: { s !r} "
1619
1615
)
1620
1616
1621
- ms_passed : int = 0
1617
+ ms_passed : C .int = 0
1618
+ tic : C .int
1622
1619
try :
1623
1620
while True :
1624
- start = time . monotonic ()
1621
+ start : C . int = monotonic ()
1625
1622
with nogil :
1626
1623
rc = zmq_poll_c (pollitems , nsockets , timeout )
1627
1624
try :
1628
1625
_check_rc (rc )
1629
1626
except InterruptedSystemCall :
1630
1627
if timeout > 0 :
1631
- ms_passed = int (1000 * (time .monotonic () - start ))
1628
+ tic = monotonic ()
1629
+ ms_passed = int (1000 * (tic - start ))
1632
1630
if ms_passed < 0 :
1633
1631
# don't allow negative ms_passed,
1634
1632
# which can happen on old Python versions without time.monotonic.
0 commit comments