@@ -461,8 +461,11 @@ def __init__(
461
461
self .has_coordinator = addresses .coordinator_output is not None
462
462
self .frontend_stats_publish_address = (
463
463
addresses .frontend_stats_publish_address )
464
+ logger .debug ("Has DP Coordinator: %s, stats publish address: %s" ,
465
+ self .has_coordinator ,
466
+ self .frontend_stats_publish_address )
464
467
# Only publish request queue stats to coordinator for "internal"
465
- # LB mode .
468
+ # and "hybrid" LB modes .
466
469
self .publish_dp_lb_stats = (
467
470
self .has_coordinator
468
471
and not vllm_config .parallel_config .data_parallel_external_lb )
@@ -472,25 +475,38 @@ def __init__(
472
475
super ().__init__ (vllm_config , executor_class , log_stats ,
473
476
executor_fail_callback )
474
477
478
+ # Background Threads and Queues for IO. These enable us to
479
+ # overlap ZMQ socket IO with GPU since they release the GIL,
480
+ # and to overlap some serialization/deserialization with the
481
+ # model forward pass.
482
+ # Threads handle Socket <-> Queues and core_busy_loop uses Queue.
483
+ ready_event = threading .Event ()
484
+ input_thread = threading .Thread (target = self .process_input_sockets ,
485
+ args = (addresses .inputs ,
486
+ addresses .coordinator_input ,
487
+ identity , ready_event ),
488
+ daemon = True )
489
+ input_thread .start ()
490
+
491
+ self .output_thread = threading .Thread (
492
+ target = self .process_output_sockets ,
493
+ args = (addresses .outputs , addresses .coordinator_output ,
494
+ self .engine_index ),
495
+ daemon = True )
496
+ self .output_thread .start ()
497
+
498
+ # Don't complete handshake until DP coordinator ready message is
499
+ # received.
500
+ while not ready_event .wait (timeout = 10 ):
501
+ if not input_thread .is_alive ():
502
+ raise RuntimeError (
503
+ "Input socket thread died during startup" )
504
+ assert addresses .coordinator_input is not None
505
+ logger .info ("Waiting for READY message from DP Coordinator..." )
506
+
475
507
self .step_fn = (self .step if self .batch_queue is None else
476
508
self .step_with_batch_queue )
477
509
478
- # Background Threads and Queues for IO. These enable us to
479
- # overlap ZMQ socket IO with GPU since they release the GIL,
480
- # and to overlap some serialization/deserialization with the
481
- # model forward pass.
482
- # Threads handle Socket <-> Queues and core_busy_loop uses Queue.
483
- threading .Thread (target = self .process_input_sockets ,
484
- args = (addresses .inputs , addresses .coordinator_input ,
485
- identity ),
486
- daemon = True ).start ()
487
- self .output_thread = threading .Thread (
488
- target = self .process_output_sockets ,
489
- args = (addresses .outputs , addresses .coordinator_output ,
490
- self .engine_index ),
491
- daemon = True )
492
- self .output_thread .start ()
493
-
494
510
@contextmanager
495
511
def _perform_handshakes (
496
512
self ,
@@ -505,10 +521,10 @@ def _perform_handshakes(
505
521
506
522
For DP=1 or offline mode, this is with the colocated front-end process.
507
523
508
- For DP>1 with internal loadbalancing this is with the shared front-end
524
+ For DP>1 with internal load-balancing this is with the shared front-end
509
525
process which may reside on a different node.
510
526
511
- For DP>1 with external or hybrid loadbalancing , two handshakes are
527
+ For DP>1 with external or hybrid load-balancing , two handshakes are
512
528
performed:
513
529
- With the rank 0 front-end process which retrieves the
514
530
DP Coordinator ZMQ addresses and DP process group address.
@@ -772,7 +788,7 @@ def _send_engine_dead(self):
772
788
773
789
def process_input_sockets (self , input_addresses : list [str ],
774
790
coord_input_address : Optional [str ],
775
- identity : bytes ):
791
+ identity : bytes , ready_event : threading . Event ):
776
792
"""Input socket IO thread."""
777
793
778
794
# Msgpack serialization decoding.
@@ -809,9 +825,14 @@ def process_input_sockets(self, input_addresses: list[str],
809
825
# back to us.
810
826
input_socket .send (b'' )
811
827
poller .register (input_socket , zmq .POLLIN )
828
+
812
829
if coord_socket is not None :
830
+ # Wait for ready message from coordinator.
831
+ assert coord_socket .recv () == b"READY"
813
832
poller .register (coord_socket , zmq .POLLIN )
814
833
834
+ ready_event .set ()
835
+ del ready_event
815
836
while True :
816
837
for input_socket , _ in poller .poll ():
817
838
# (RequestType, RequestData)
0 commit comments