@@ -110,6 +110,7 @@ struct fr_schedule_s {
110110
111111 fr_sem_t * worker_sem ; //!< for inter-thread signaling
112112 fr_sem_t * network_sem ; //!< for inter-thread signaling
113+ fr_sem_t * coord_sem ; //!< for inter-thread signaling
113114
114115 fr_schedule_thread_instantiate_t worker_thread_instantiate ; //!< thread instantiation callback
115116 fr_schedule_thread_detach_t worker_thread_detach ;
@@ -492,6 +493,14 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
492493 return NULL ;
493494 }
494495
496+ if (fr_coords_create (sc , el ) < 0 ) {
497+ PERROR ("Failed creating coordinators" );
498+ if (unlikely (fr_network_destroy (sc -> single_network ) < 0 )) {
499+ PERROR ("Failed destroying network" );
500+ }
501+ goto pre_instantiate_st_fail ;
502+ }
503+
495504 sc -> single_worker = fr_worker_alloc (sc , el , "Worker" , sc -> log , sc -> lvl , & sc -> config -> worker );
496505 if (!sc -> single_worker ) {
497506 PERROR ("Failed creating worker" );
@@ -545,6 +554,11 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
545554 goto st_fail ;
546555 }
547556
557+ if (fr_coord_pre_event_insert (el ) < 0 ) {
558+ fr_strerror_const ("Failed adding coordinator pre-check to event list" );
559+ goto st_fail ;
560+ }
561+
548562 /*
549563 * Add the event which processes request_t packets.
550564 */
@@ -553,6 +567,11 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
553567 goto st_fail ;
554568 }
555569
570+ if (fr_coord_post_event_insert (el ) < 0 ) {
571+ fr_strerror_const ("Failed adding coordinator post-processing to event list" );
572+ goto st_fail ;
573+ }
574+
556575 return sc ;
557576 }
558577
@@ -567,13 +586,17 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
567586 sem_fail :
568587 ERROR ("Failed creating semaphore: %s" , fr_syserror (errno ));
569588 fr_sem_free (sc -> network_sem );
589+ fr_sem_free (sc -> worker_sem );
570590 talloc_free (sc );
571591 return NULL ;
572592 }
573593
574594 sc -> worker_sem = fr_sem_alloc ();
575595 if (!sc -> worker_sem ) goto sem_fail ;
576596
597+ sc -> coord_sem = fr_sem_alloc ();
598+ if (!sc -> coord_sem ) goto sem_fail ;
599+
577600 /*
578601 * Create the network threads first.
579602 */
@@ -635,6 +658,14 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
635658 return NULL ;
636659 }
637660
661+ /*
662+ * Create the coordination threads
663+ */
664+ if (fr_coord_start (sc -> config -> max_workers , sc -> coord_sem ) < 0 ) {
665+ fr_schedule_destroy (& sc );
666+ return NULL ;
667+ };
668+
638669 /*
639670 * Create all of the workers.
640671 */
@@ -768,6 +799,7 @@ int fr_schedule_destroy(fr_schedule_t **sc_to_free)
768799 ERROR ("Failed destroying network" );
769800 }
770801 fr_worker_destroy (sc -> single_worker );
802+ fr_coords_destroy ();
771803 goto done ;
772804 }
773805
@@ -841,6 +873,7 @@ int fr_schedule_destroy(fr_schedule_t **sc_to_free)
841873 }
842874 }
843875
876+ fr_sem_free (sc -> coord_sem );
844877 fr_sem_free (sc -> network_sem );
845878 fr_sem_free (sc -> worker_sem );
846879done :
0 commit comments