22
22
from multiprocessing .managers import SyncManager
23
23
from multiprocessing .synchronize import Event as ProcessingEvent
24
24
from threading import Event as ThreadingEvent
25
- from typing import Any , Callable , cast , Generic , List , Protocol , TypeVar
25
+ from typing import Any , Callable , Generic , Protocol , TypeVar , cast
26
26
27
27
import culsans
28
28
from pydantic import BaseModel
50
50
51
51
CheckStopCallableT = Callable [[bool , int ], bool ]
52
52
53
+
53
54
class MessagingStopCallback (Protocol ):
54
55
"""Protocol for evaluating stop conditions in messaging operations."""
55
56
@@ -248,14 +249,17 @@ async def stop(self):
248
249
if self .shutdown_event is not None :
249
250
self .shutdown_event .set ()
250
251
else :
251
- raise RuntimeError ("shutdown_event is not set; was start() not called or is this a redundant stop() call?" )
252
+ raise RuntimeError (
253
+ "shutdown_event is not set; was start() not called or "
254
+ "is this a redundant stop() call?"
255
+ )
252
256
tasks = [self .send_task , self .receive_task ]
253
- tasks_to_run : List [asyncio .Task [Any ]] = [task for task in tasks if task is not None ]
257
+ tasks_to_run : list [asyncio .Task [Any ]] = [
258
+ task for task in tasks if task is not None
259
+ ]
254
260
if len (tasks_to_run ) > 0 :
255
261
with contextlib .suppress (asyncio .CancelledError ):
256
- await asyncio .gather (
257
- * tasks_to_run , return_exceptions = True
258
- )
262
+ await asyncio .gather (* tasks_to_run , return_exceptions = True )
259
263
self .send_task = None
260
264
self .receive_task = None
261
265
if self .worker_index is None :
@@ -354,7 +358,9 @@ async def get(self, timeout: float | None = None) -> ReceiveMessageT:
354
358
:return: Decoded message from the receive buffer
355
359
"""
356
360
if self .buffer_receive_queue is None :
357
- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
361
+ raise RuntimeError (
362
+ "buffer receive queue is None; check start()/stop() calls"
363
+ )
358
364
return await asyncio .wait_for (
359
365
self .buffer_receive_queue .async_get (), timeout = timeout
360
366
)
@@ -367,7 +373,9 @@ def get_sync(self, timeout: float | None = None) -> ReceiveMessageT:
367
373
:return: Decoded message from the receive buffer
368
374
"""
369
375
if self .buffer_receive_queue is None :
370
- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
376
+ raise RuntimeError (
377
+ "buffer receive queue is None; check start()/stop() calls"
378
+ )
371
379
if timeout is not None and timeout <= 0 :
372
380
return self .buffer_receive_queue .get_nowait ()
373
381
else :
@@ -381,7 +389,9 @@ async def put(self, item: SendMessageT, timeout: float | None = None):
381
389
:param timeout: Maximum time to wait for buffer space
382
390
"""
383
391
if self .buffer_send_queue is None :
384
- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
392
+ raise RuntimeError (
393
+ "buffer receive queue is None; check start()/stop() calls"
394
+ )
385
395
await asyncio .wait_for (self .buffer_send_queue .async_put (item ), timeout = timeout )
386
396
387
397
def put_sync (self , item : SendMessageT , timeout : float | None = None ):
@@ -392,7 +402,9 @@ def put_sync(self, item: SendMessageT, timeout: float | None = None):
392
402
:param timeout: Maximum time to wait for buffer space, if <=0 uses put_nowait
393
403
"""
394
404
if self .buffer_send_queue is None :
395
- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
405
+ raise RuntimeError (
406
+ "buffer receive queue is None; check start()/stop() calls"
407
+ )
396
408
if timeout is not None and timeout <= 0 :
397
409
self .buffer_send_queue .put_nowait (item )
398
410
else :
@@ -457,6 +469,7 @@ class InterProcessMessagingQueue(InterProcessMessaging[SendMessageT, ReceiveMess
457
469
# Create worker copy for distributed processing
458
470
worker_messaging = messaging.create_worker_copy(worker_index=0)
459
471
"""
472
+
460
473
pending_queue : multiprocessing .Queue | queue .Queue [Any ] | None
461
474
done_queue : multiprocessing .Queue | queue .Queue [Any ] | None
462
475
@@ -545,15 +558,15 @@ async def stop(self):
545
558
with contextlib .suppress (queue .Empty ):
546
559
while True :
547
560
self .pending_queue .get_nowait ()
548
- if hasattr (self .pending_queue , ' close' ):
561
+ if hasattr (self .pending_queue , " close" ):
549
562
self .pending_queue .close ()
550
563
551
564
if self .done_queue is None :
552
565
raise RuntimeError ("done_queue is None; was stop() already called?" )
553
566
with contextlib .suppress (queue .Empty ):
554
567
while True :
555
568
self .done_queue .get_nowait ()
556
- if hasattr (self .done_queue , ' close' ):
569
+ if hasattr (self .done_queue , " close" ):
557
570
self .done_queue .close ()
558
571
559
572
self .pending_queue = None
@@ -618,7 +631,9 @@ def _send_messages_task_thread( # noqa: C901, PLR0912
618
631
item = next (send_items_iter )
619
632
else :
620
633
if self .buffer_send_queue is None :
621
- raise RuntimeError ("buffer_send_queue is None; was stop() already called?" )
634
+ raise RuntimeError (
635
+ "buffer_send_queue is None; was stop() already called?"
636
+ )
622
637
item = self .buffer_send_queue .sync_get (
623
638
timeout = self .poll_interval
624
639
)
@@ -632,16 +647,22 @@ def _send_messages_task_thread( # noqa: C901, PLR0912
632
647
if self .worker_index is None :
633
648
# Main publisher
634
649
if self .pending_queue is None :
635
- raise RuntimeError ("pending_queue is None; was stop() already called?" )
650
+ raise RuntimeError (
651
+ "pending_queue is None; was stop() already called?"
652
+ )
636
653
self .pending_queue .put (pending_item , timeout = self .poll_interval )
637
654
else :
638
655
# Worker
639
656
if self .done_queue is None :
640
- raise RuntimeError ("done_queue is None; was stop() already called?" )
657
+ raise RuntimeError (
658
+ "done_queue is None; was stop() already called?"
659
+ )
641
660
self .done_queue .put (pending_item , timeout = self .poll_interval )
642
661
if send_items_iter is None :
643
662
if self .buffer_send_queue is None :
644
- raise RuntimeError ("buffer_send_queue is None; was stop() already called?" )
663
+ raise RuntimeError (
664
+ "buffer_send_queue is None; was stop() already called?"
665
+ )
645
666
self .buffer_send_queue .task_done ()
646
667
pending_item = None
647
668
except (culsans .QueueFull , queue .Full ):
@@ -663,12 +684,16 @@ def _receive_messages_task_thread( # noqa: C901
663
684
if self .worker_index is None :
664
685
# Main publisher
665
686
if self .done_queue is None :
666
- raise RuntimeError ("done_queue is None; check start()/stop() calls" )
687
+ raise RuntimeError (
688
+ "done_queue is None; check start()/stop() calls"
689
+ )
667
690
item = self .done_queue .get (timeout = self .poll_interval )
668
691
else :
669
692
# Worker
670
693
if self .pending_queue is None :
671
- raise RuntimeError ("pending_queue is None; check start()/stop() calls" )
694
+ raise RuntimeError (
695
+ "pending_queue is None; check start()/stop() calls"
696
+ )
672
697
item = self .pending_queue .get (timeout = self .poll_interval )
673
698
pending_item = message_encoding .decode (item )
674
699
queue_empty_count = 0
@@ -685,8 +710,12 @@ def _receive_messages_task_thread( # noqa: C901
685
710
)
686
711
687
712
if self .buffer_receive_queue is None :
688
- raise RuntimeError ("buffer_receive_queue is None; check start()/stop() calls" )
689
- self .buffer_receive_queue .sync_put (cast (ReceiveMessageT , received_item ))
713
+ raise RuntimeError (
714
+ "buffer_receive_queue is None; check start()/stop() calls"
715
+ )
716
+ self .buffer_receive_queue .sync_put (
717
+ cast ("ReceiveMessageT" , received_item )
718
+ )
690
719
pending_item = None
691
720
received_item = None
692
721
except (culsans .QueueFull , queue .Full ):
@@ -863,9 +892,7 @@ def __init__(
863
892
864
893
self .pipes : list [tuple [Connection , Connection ]]
865
894
if pipe is None :
866
- self .pipes = [
867
- self .mp_context .Pipe (duplex = True ) for _ in range (num_workers )
868
- ]
895
+ self .pipes = [self .mp_context .Pipe (duplex = True ) for _ in range (num_workers )]
869
896
else :
870
897
self .pipes = [pipe ]
871
898
@@ -969,7 +996,7 @@ def create_receive_messages_threads(
969
996
)
970
997
]
971
998
972
- def _send_messages_task_thread ( # noqa: C901, PLR0912
999
+ def _send_messages_task_thread ( # noqa: C901, PLR0912, PLR0915
973
1000
self ,
974
1001
pipe : tuple [Connection , Connection ],
975
1002
send_items : Iterable [Any ] | None ,
@@ -1009,7 +1036,9 @@ def _background_pipe_recv():
1009
1036
item = next (send_items_iter )
1010
1037
else :
1011
1038
if self .buffer_send_queue is None :
1012
- raise RuntimeError ("buffer_send_queue is None; check start()/stop() calls" )
1039
+ raise RuntimeError (
1040
+ "buffer_send_queue is None; check start()/stop() calls" # noqa: E501
1041
+ )
1013
1042
item = self .buffer_send_queue .sync_get (
1014
1043
timeout = self .poll_interval
1015
1044
)
@@ -1028,7 +1057,9 @@ def _background_pipe_recv():
1028
1057
pipe_item = pending_item
1029
1058
if send_items_iter is None :
1030
1059
if self .buffer_send_queue is None :
1031
- raise RuntimeError ("buffer_send_queue is None; check start()/stop() calls" )
1060
+ raise RuntimeError (
1061
+ "buffer_send_queue is None; check start()/stop() calls" # noqa: E501
1062
+ )
1032
1063
self .buffer_send_queue .task_done ()
1033
1064
pending_item = None
1034
1065
except (culsans .QueueFull , queue .Full ):
@@ -1071,8 +1102,12 @@ def _receive_messages_task_thread( # noqa: C901
1071
1102
else receive_callback (pending_item )
1072
1103
)
1073
1104
if self .buffer_receive_queue is None :
1074
- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
1075
- self .buffer_receive_queue .sync_put (cast (ReceiveMessageT , received_item ))
1105
+ raise RuntimeError (
1106
+ "buffer receive queue is None; check start()/stop() calls"
1107
+ )
1108
+ self .buffer_receive_queue .sync_put (
1109
+ cast ("ReceiveMessageT" , received_item )
1110
+ )
1076
1111
pending_item = None
1077
1112
received_item = None
1078
1113
except (culsans .QueueFull , queue .Full ):
0 commit comments