@@ -49,7 +49,7 @@ struct broker_module {
4949
5050 double lastseen ;
5151
52- flux_t * h_broker ; /* broker end of interthread channel */
52+ flux_t * h_broker_end ; /* broker end of interthread channel */
5353 char uri [128 ];
5454
5555 uuid_t uuid ; /* uuid for unique request sender identity */
@@ -80,7 +80,7 @@ struct broker_module {
8080 struct flux_msglist * insmod_requests ;
8181 struct flux_msglist * deferred_messages ;
8282
83- flux_t * h ; /* module's handle */
83+ flux_t * h_module_end ; /* module end of interthread_channel */
8484 struct subhash * sub ;
8585};
8686
@@ -150,14 +150,15 @@ static int module_finalizing (module_t *p)
150150{
151151 flux_future_t * f ;
152152
153- if (!(f = flux_rpc_pack (p -> h ,
153+ if (!(f = flux_rpc_pack (p -> h_module_end ,
154154 "broker.module-status" ,
155155 FLUX_NODEID_ANY ,
156156 0 ,
157157 "{s:i}" ,
158158 "status" , FLUX_MODSTATE_FINALIZING ))
159159 || flux_rpc_get (f , NULL )) {
160- flux_log_error (p -> h , "broker.module-status FINALIZING error" );
160+ flux_log_error (p -> h_module_end ,
161+ "broker.module-status FINALIZING error" );
161162 flux_future_destroy (f );
162163 return -1 ;
163164 }
@@ -181,21 +182,21 @@ static void *module_thread (void *arg)
181182
182183 /* Connect to broker socket, enable logging, register built-in services
183184 */
184- if (!(p -> h = flux_open (p -> uri , 0 ))) {
185+ if (!(p -> h_module_end = flux_open (p -> uri , 0 ))) {
185186 log_err ("flux_open %s" , uri );
186187 goto done ;
187188 }
188- if (attr_cache_from_json (p -> h , p -> attr_cache ) < 0 ) {
189+ if (attr_cache_from_json (p -> h_module_end , p -> attr_cache ) < 0 ) {
189190 log_err ("%s: error priming broker attribute cache" , p -> name );
190191 goto done ;
191192 }
192- flux_log_set_appname (p -> h , p -> name );
193- if (flux_set_conf (p -> h , p -> conf ) < 0 ) {
193+ flux_log_set_appname (p -> h_module_end , p -> name );
194+ if (flux_set_conf (p -> h_module_end , p -> conf ) < 0 ) {
194195 log_err ("%s: error setting config object" , p -> name );
195196 goto done ;
196197 }
197- p -> conf = NULL ; // flux_set_conf() transfers ownership to p->h
198- if (modservice_register (p -> h , p ) < 0 ) {
198+ p -> conf = NULL ; // flux_set_conf() transfers ownership to p->h_module_end
199+ if (modservice_register (p -> h_module_end , p ) < 0 ) {
199200 log_err ("%s: modservice_register" , p -> name );
200201 goto done ;
201202 }
@@ -219,11 +220,11 @@ static void *module_thread (void *arg)
219220 goto done ;
220221 }
221222 argz_extract (p -> argz , p -> argz_len , av );
222- if (p -> main (p -> h , ac , av ) < 0 ) {
223+ if (p -> main (p -> h_module_end , ac , av ) < 0 ) {
223224 mod_main_errno = errno ;
224225 if (mod_main_errno == 0 )
225226 mod_main_errno = ECONNRESET ;
226- flux_log (p -> h , LOG_CRIT , "module exiting abnormally" );
227+ flux_log (p -> h_module_end , LOG_CRIT , "module exiting abnormally" );
227228 }
228229
229230 /* Before processing unhandled requests, ensure that this module
@@ -232,34 +233,42 @@ static void *module_thread (void *arg)
232233 * which could cause the broker to block.
233234 */
234235 if (module_finalizing (p ) < 0 )
235- flux_log_error (p -> h , "failed to set module state to finalizing" );
236+ flux_log_error (p -> h_module_end ,
237+ "failed to set module state to finalizing" );
236238
237239 /* If any unhandled requests were received during shutdown,
238240 * respond to them now with ENOSYS.
239241 */
240- while ((msg = flux_recv (p -> h , FLUX_MATCH_REQUEST , FLUX_O_NONBLOCK ))) {
242+ while ((msg = flux_recv (p -> h_module_end ,
243+ FLUX_MATCH_REQUEST ,
244+ FLUX_O_NONBLOCK ))) {
241245 const char * topic = "unknown" ;
242246 (void )flux_msg_get_topic (msg , & topic );
243- flux_log (p -> h , LOG_DEBUG , "responding to post-shutdown %s" , topic );
244- if (flux_respond_error (p -> h , msg , ENOSYS , NULL ) < 0 )
245- flux_log_error (p -> h , "responding to post-shutdown %s" , topic );
247+ flux_log (p -> h_module_end ,
248+ LOG_DEBUG ,
249+ "responding to post-shutdown %s" ,
250+ topic );
251+ if (flux_respond_error (p -> h_module_end , msg , ENOSYS , NULL ) < 0 )
252+ flux_log_error (p -> h_module_end ,
253+ "responding to post-shutdown %s" ,
254+ topic );
246255 flux_msg_destroy (msg );
247256 }
248- if (!(f = flux_rpc_pack (p -> h ,
257+ if (!(f = flux_rpc_pack (p -> h_module_end ,
249258 "broker.module-status" ,
250259 FLUX_NODEID_ANY ,
251260 FLUX_RPC_NORESPONSE ,
252261 "{s:i s:i}" ,
253262 "status" , FLUX_MODSTATE_EXITED ,
254263 "errnum" , mod_main_errno ))) {
255- flux_log_error (p -> h , "broker.module-status EXITED error" );
264+ flux_log_error (p -> h_module_end , "broker.module-status EXITED error" );
256265 goto done ;
257266 }
258267 flux_future_destroy (f );
259268done :
260269 free (av );
261- flux_close (p -> h );
262- p -> h = NULL ;
270+ flux_close (p -> h_module_end );
271+ p -> h_module_end = NULL ;
263272 return NULL ;
264273}
265274
@@ -372,17 +381,17 @@ module_t *module_create (flux_t *h,
372381 */
373382 // copying 13 + 37 + 1 = 51 bytes into 128 byte buffer cannot fail
374383 (void )snprintf (p -> uri , sizeof (p -> uri ), "interthread://%s" , p -> uuid_str );
375- if (!(p -> h_broker = flux_open (p -> uri , FLUX_O_NOREQUEUE ))
376- || flux_opt_set (p -> h_broker ,
384+ if (!(p -> h_broker_end = flux_open (p -> uri , FLUX_O_NOREQUEUE ))
385+ || flux_opt_set (p -> h_broker_end ,
377386 FLUX_OPT_ROUTER_NAME ,
378387 parent_uuid ,
379388 strlen (parent_uuid ) + 1 ) < 0
380- || flux_set_reactor (p -> h_broker , r ) < 0 ) {
389+ || flux_set_reactor (p -> h_broker_end , r ) < 0 ) {
381390 errprintf (error , "could not create %s interthread handle" , p -> name );
382391 goto cleanup ;
383392 }
384393 if (!(p -> broker_w = flux_handle_watcher_create (r ,
385- p -> h_broker ,
394+ p -> h_broker_end ,
386395 FLUX_POLLIN ,
387396 module_cb ,
388397 p ))) {
@@ -431,7 +440,7 @@ int module_get_status (module_t *p)
431440
432441flux_msg_t * module_recvmsg (module_t * p )
433442{
434- return flux_recv (p -> h_broker , FLUX_MATCH_ANY , FLUX_O_NONBLOCK );
443+ return flux_recv (p -> h_broker_end , FLUX_MATCH_ANY , FLUX_O_NONBLOCK );
435444}
436445
437446int module_sendmsg_new (module_t * p , flux_msg_t * * msg )
@@ -460,7 +469,7 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg)
460469 * msg = NULL ;
461470 return 0 ;
462471 }
463- return flux_send_new (p -> h_broker , msg , 0 );
472+ return flux_send_new (p -> h_broker_end , msg , 0 );
464473}
465474
466475int module_disconnect_arm (module_t * p ,
@@ -506,7 +515,7 @@ void module_destroy (module_t *p)
506515
507516 flux_watcher_stop (p -> broker_w );
508517 flux_watcher_destroy (p -> broker_w );
509- flux_close (p -> h_broker );
518+ flux_close (p -> h_broker_end );
510519
511520#ifndef __SANITIZE_ADDRESS__
512521 dlclose (p -> dso );
@@ -562,7 +571,7 @@ int module_set_defer (module_t *p, bool flag)
562571 if (!flag && p -> deferred_messages ) {
563572 const flux_msg_t * msg ;
564573 while ((msg = flux_msglist_pop (p -> deferred_messages ))) {
565- if (flux_send_new (p -> h_broker , (flux_msg_t * * )& msg , 0 ) < 0 ) {
574+ if (flux_send_new (p -> h_broker_end , (flux_msg_t * * )& msg , 0 ) < 0 ) {
566575 flux_msg_decref (msg );
567576 return -1 ;
568577 }
@@ -688,7 +697,7 @@ int module_event_cast (module_t *p, const flux_msg_t *msg)
688697ssize_t module_get_send_queue_count (module_t * p )
689698{
690699 size_t count ;
691- if (flux_opt_get (p -> h_broker ,
700+ if (flux_opt_get (p -> h_broker_end ,
692701 FLUX_OPT_SEND_QUEUE_COUNT ,
693702 & count ,
694703 sizeof (count )) < 0 )
@@ -699,7 +708,7 @@ ssize_t module_get_send_queue_count (module_t *p)
699708ssize_t module_get_recv_queue_count (module_t * p )
700709{
701710 size_t count ;
702- if (flux_opt_get (p -> h_broker ,
711+ if (flux_opt_get (p -> h_broker_end ,
703712 FLUX_OPT_RECV_QUEUE_COUNT ,
704713 & count ,
705714 sizeof (count )) < 0 )
0 commit comments