@@ -38,6 +38,7 @@ struct sdbus_ctx {
3838 flux_t * h ;
3939
4040 flux_future_t * f_subscribe ;
41+ uint32_t rank ;
4142};
4243
4344struct call_info {
@@ -53,6 +54,17 @@ static void sdbus_recover (struct sdbus_ctx *ctx, const char *reason);
5354static const double retry_min = 2 ;
5455static const double retry_max = 60 ;
5556
57+ static int authorize_request (const flux_msg_t * msg ,
58+ uint32_t rank ,
59+ flux_error_t * error )
60+ {
61+ if (rank != 0 || flux_msg_is_local (msg ))
62+ return 0 ;
63+ errprintf (error , "Remote sdbus requests are not allowed on rank 0" );
64+ errno = EPERM ;
65+ return -1 ;
66+ }
67+
5668static void bulk_respond_error (flux_t * h ,
5769 struct flux_msglist * msglist ,
5870 int errnum ,
@@ -346,6 +358,10 @@ static void call_cb (flux_t *h,
346358
347359 if (flux_request_decode (msg , NULL , NULL ) < 0 )
348360 goto error ;
361+ if (authorize_request (msg , ctx -> rank , & error ) < 0 ) {
362+ errmsg = error .text ;
363+ goto error ;
364+ }
349365 if (ctx -> bus ) { // defer request if bus is not yet connected
350366 if (handle_call_request (ctx , msg , & error ) < 0 ) {
351367 errmsg = error .text ;
@@ -368,9 +384,15 @@ static void subscribe_cb (flux_t *h,
368384 void * arg )
369385{
370386 struct sdbus_ctx * ctx = arg ;
387+ flux_error_t error ;
388+ const char * errmsg = NULL ;
371389
372390 if (flux_request_decode (msg , NULL , NULL ) < 0 )
373391 goto error ;
392+ if (authorize_request (msg , ctx -> rank , & error ) < 0 ) {
393+ errmsg = error .text ;
394+ goto error ;
395+ }
374396 if (!flux_msg_is_streaming (msg )) {
375397 errno = EPROTO ;
376398 goto error ;
@@ -379,7 +401,7 @@ static void subscribe_cb (flux_t *h,
379401 goto error ;
380402 return ;
381403error :
382- if (flux_respond_error (h , msg , errno , NULL ) < 0 )
404+ if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
383405 flux_log_error (h , "error responding to sdbus.subscribe request" );
384406}
385407
@@ -391,7 +413,9 @@ static void subscribe_cancel_cb (flux_t *h,
391413 void * arg )
392414{
393415 struct sdbus_ctx * ctx = arg ;
394- flux_msglist_cancel (h , ctx -> subscribers , msg );
416+
417+ if (authorize_request (msg , ctx -> rank , NULL ) == 0 )
418+ flux_msglist_cancel (h , ctx -> subscribers , msg );
395419}
396420
397421/* Handle disconnection of a client as described in RFC 6.
@@ -403,8 +427,10 @@ static void disconnect_cb (flux_t *h,
403427{
404428 struct sdbus_ctx * ctx = arg ;
405429
406- (void )flux_msglist_disconnect (ctx -> requests , msg );
407- (void )flux_msglist_disconnect (ctx -> subscribers , msg );
430+ if (authorize_request (msg , ctx -> rank , NULL ) == 0 ) {
431+ (void )flux_msglist_disconnect (ctx -> requests , msg );
432+ (void )flux_msglist_disconnect (ctx -> subscribers , msg );
433+ }
408434}
409435
410436/* Handle a request to force bus disconnection and recovery for testing.
@@ -415,10 +441,15 @@ static void reconnect_cb (flux_t *h,
415441 void * arg )
416442{
417443 struct sdbus_ctx * ctx = arg ;
444+ flux_error_t error ;
418445 const char * errmsg = NULL ;
419446
420447 if (flux_request_decode (msg , NULL , NULL ) < 0 )
421448 goto error ;
449+ if (authorize_request (msg , ctx -> rank , & error ) < 0 ) {
450+ errmsg = error .text ;
451+ goto error ;
452+ }
422453 if (!ctx -> bus ) {
423454 errmsg = "bus is not connected" ;
424455 errno = EINVAL ;
@@ -634,7 +665,8 @@ struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error)
634665 || flux_future_then (ctx -> f_conn , -1 , connect_continuation , ctx ) < 0
635666 || flux_msg_handler_addvec (h , htab , ctx , & ctx -> handlers ) < 0
636667 || !(ctx -> requests = flux_msglist_create ())
637- || !(ctx -> subscribers = flux_msglist_create ()))
668+ || !(ctx -> subscribers = flux_msglist_create ())
669+ || flux_get_rank (h , & ctx -> rank ) < 0 )
638670 goto error ;
639671 ctx -> h = h ;
640672 return ctx ;
0 commit comments