3636#include "src/common/libkvs/kvs_util_private.h"
3737#include "src/common/libcontent/content.h"
3838#include "src/common/libutil/fsd.h"
39+ #include "src/common/librouter/msg_hash.h"
3940
4041#include "waitqueue.h"
4142#include "cache.h"
@@ -77,6 +78,7 @@ struct kvs_ctx {
7778 char * hash_name ;
7879 unsigned int seq ; /* for commit transactions */
7980 kvs_checkpoint_t * kcp ;
81+ zhashx_t * requests ; /* track unfinished requests */
8082 struct list_head work_queue ;
8183};
8284
@@ -111,11 +113,23 @@ static void kvs_ctx_destroy (struct kvs_ctx *ctx)
111113 flux_watcher_destroy (ctx -> idle_w );
112114 kvs_checkpoint_destroy (ctx -> kcp );
113115 free (ctx -> hash_name );
116+ zhashx_destroy (& ctx -> requests );
114117 free (ctx );
115118 errno = saved_errno ;
116119 }
117120}
118121
122+ static void request_tracking_add (struct kvs_ctx * ctx , const flux_msg_t * msg )
123+ {
124+ /* ignore if item already tracked */
125+ zhashx_insert (ctx -> requests , msg , (flux_msg_t * )msg );
126+ }
127+
128+ static void request_tracking_remove (struct kvs_ctx * ctx , const flux_msg_t * msg )
129+ {
130+ zhashx_delete (ctx -> requests , msg );
131+ }
132+
119133static void work_queue_check_append_wrapper (struct kvsroot * root ,
120134 void * arg )
121135{
@@ -164,6 +178,8 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
164178 goto error ;
165179 }
166180 ctx -> transaction_merge = 1 ;
181+ if (!(ctx -> requests = msg_hash_create (MSG_HASH_TYPE_UUID_MATCHTAG )))
182+ goto error ;
167183 list_head_init (& ctx -> work_queue );
168184 return ctx ;
169185error :
@@ -364,6 +380,13 @@ static void getroot_completion (flux_future_t *f, void *arg)
364380error :
365381 if (flux_respond_error (ctx -> h , msg , errno , NULL ) < 0 )
366382 flux_log_error (ctx -> h , "%s: flux_respond_error" , __FUNCTION__ );
383+ /* N.B. getroot request from other requests (e.g. lookup, commit)
384+ * may stall and be tracked. So we need to remove tracking of the
385+ * request if there is an error. We do not remove tracking on
386+ * getroot success, as the original request (e.g. lookup, commit)
387+ * will deal with the success case.
388+ */
389+ request_tracking_remove (ctx , msg );
367390 flux_msg_destroy (msg );
368391 flux_future_destroy (f );
369392}
@@ -1273,11 +1296,10 @@ static void lookup_wait_error_cb (wait_t *w, int errnum, void *arg)
12731296}
12741297
12751298static lookup_t * lookup_common (flux_t * h , flux_msg_handler_t * mh ,
1276- const flux_msg_t * msg , void * arg ,
1299+ const flux_msg_t * msg , struct kvs_ctx * ctx ,
12771300 flux_msg_handler_f replay_cb ,
12781301 bool * stall )
12791302{
1280- struct kvs_ctx * ctx = arg ;
12811303 int flags ;
12821304 const char * ns = NULL ;
12831305 const char * key ;
@@ -1431,14 +1453,17 @@ static lookup_t *lookup_common (flux_t *h, flux_msg_handler_t *mh,
14311453static void lookup_request_cb (flux_t * h , flux_msg_handler_t * mh ,
14321454 const flux_msg_t * msg , void * arg )
14331455{
1456+ struct kvs_ctx * ctx = arg ;
14341457 lookup_t * lh ;
14351458 json_t * val ;
14361459 bool stall = false;
14371460
1438- if (!(lh = lookup_common (h , mh , msg , arg , lookup_request_cb ,
1461+ if (!(lh = lookup_common (h , mh , msg , ctx , lookup_request_cb ,
14391462 & stall ))) {
1440- if (stall )
1463+ if (stall ) {
1464+ request_tracking_add (ctx , msg );
14411465 return ;
1466+ }
14421467 goto error ;
14431468 }
14441469
@@ -1450,10 +1475,12 @@ static void lookup_request_cb (flux_t *h, flux_msg_handler_t *mh,
14501475 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
14511476 lookup_destroy (lh );
14521477 json_decref (val );
1478+ request_tracking_remove (ctx , msg );
14531479 return ;
14541480error :
14551481 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
14561482 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1483+ request_tracking_remove (ctx , msg );
14571484 lookup_destroy (lh );
14581485}
14591486
@@ -1467,16 +1494,19 @@ static void lookup_request_cb (flux_t *h, flux_msg_handler_t *mh,
14671494static void lookup_plus_request_cb (flux_t * h , flux_msg_handler_t * mh ,
14681495 const flux_msg_t * msg , void * arg )
14691496{
1497+ struct kvs_ctx * ctx = arg ;
14701498 lookup_t * lh ;
14711499 json_t * val = NULL ;
14721500 const char * root_ref ;
14731501 int root_seq ;
14741502 bool stall = false;
14751503
1476- if (!(lh = lookup_common (h , mh , msg , arg , lookup_plus_request_cb ,
1504+ if (!(lh = lookup_common (h , mh , msg , ctx , lookup_plus_request_cb ,
14771505 & stall ))) {
1478- if (stall )
1506+ if (stall ) {
1507+ request_tracking_add (ctx , msg );
14791508 return ;
1509+ }
14801510 goto error ;
14811511 }
14821512
@@ -1501,10 +1531,12 @@ static void lookup_plus_request_cb (flux_t *h, flux_msg_handler_t *mh,
15011531 }
15021532 lookup_destroy (lh );
15031533 json_decref (val );
1534+ request_tracking_remove (ctx , msg );
15041535 return ;
15051536error :
15061537 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
15071538 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1539+ request_tracking_remove (ctx , msg );
15081540}
15091541
15101542
@@ -1525,6 +1557,7 @@ static int finalize_transaction_req (treq_t *tr,
15251557 flux_log_error (cbd -> ctx -> h , "%s: flux_respond_pack" , __FUNCTION__ );
15261558 }
15271559
1560+ request_tracking_remove (cbd -> ctx , req );
15281561 return 0 ;
15291562}
15301563
@@ -1623,6 +1656,9 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
16231656 goto error ;
16241657 }
16251658
1659+ /* N.B. no request tracking for relay. The relay does not get a
1660+ * response, only the original via finalize_transaction_bynames().
1661+ */
16261662 work_queue_check_append (ctx , root );
16271663 return ;
16281664
@@ -1665,8 +1701,10 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
16651701 }
16661702
16671703 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1668- if (stall )
1704+ if (stall ) {
1705+ request_tracking_add (ctx , msg );
16691706 return ;
1707+ }
16701708 goto error ;
16711709 }
16721710
@@ -1723,11 +1761,13 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
17231761 }
17241762 flux_future_destroy (f );
17251763 }
1764+ request_tracking_add (ctx , msg );
17261765 return ;
17271766
17281767error :
17291768 if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
17301769 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1770+ request_tracking_remove (ctx , msg );
17311771}
17321772
17331773
@@ -1808,6 +1848,9 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh,
18081848 work_queue_check_append (ctx , root );
18091849 }
18101850
1851+ /* N.B. no request tracking for relay. The relay does not get a
1852+ * response, only the original via finalize_transaction_bynames().
1853+ */
18111854 return ;
18121855
18131856error :
@@ -1853,8 +1896,10 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
18531896 }
18541897
18551898 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1856- if (stall )
1899+ if (stall ) {
1900+ request_tracking_add (ctx , msg );
18571901 goto stall ;
1902+ }
18581903 goto error ;
18591904 }
18601905
@@ -1934,11 +1979,13 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
19341979 }
19351980 flux_future_destroy (f );
19361981 }
1982+ request_tracking_add (ctx , msg );
19371983 return ;
19381984
19391985error :
19401986 if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
19411987 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1988+ request_tracking_remove (ctx , msg );
19421989stall :
19431990 return ;
19441991}
@@ -1960,8 +2007,10 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19602007 }
19612008
19622009 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1963- if (stall )
2010+ if (stall ) {
2011+ request_tracking_add (ctx , msg );
19642012 return ;
2013+ }
19652014 goto error ;
19662015 }
19672016
@@ -1976,6 +2025,7 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19762025 flux_log_error (h , "%s: kvs_wait_version_add" , __FUNCTION__ );
19772026 goto error ;
19782027 }
2028+ request_tracking_add (ctx , msg );
19792029 return ; /* stall */
19802030 }
19812031
@@ -1984,11 +2034,13 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19842034 "rootref" , root -> ref ) < 0 )
19852035 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
19862036
2037+ request_tracking_remove (ctx , msg );
19872038 return ;
19882039
19892040error :
19902041 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
19912042 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
2043+ request_tracking_remove (ctx , msg );
19922044}
19932045
19942046static void getroot_request_cb (flux_t * h , flux_msg_handler_t * mh ,
@@ -2020,8 +2072,10 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
20202072 */
20212073 bool stall = false;
20222074 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
2023- if (stall )
2075+ if (stall ) {
2076+ request_tracking_add (ctx , msg );
20242077 return ;
2078+ }
20252079 goto error ;
20262080 }
20272081 }
@@ -2034,10 +2088,12 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
20342088 "flags" , root -> flags ,
20352089 "namespace" , root -> ns_name ) < 0 )
20362090 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
2091+ request_tracking_remove (ctx , msg );
20372092 return ;
20382093error :
20392094 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
20402095 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
2096+ request_tracking_remove (ctx , msg );
20412097}
20422098
20432099static void error_event_cb (flux_t * h , flux_msg_handler_t * mh ,
@@ -2257,9 +2313,10 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *mh,
22572313 }
22582314
22592315 if (flux_respond_pack (h , msg ,
2260- "{ s:O s:O }" ,
2316+ "{ s:O s:O s:i }" ,
22612317 "cache" , cstats ,
2262- "namespace" , nsstats ) < 0 )
2318+ "namespace" , nsstats ,
2319+ "pending_requests" , zhashx_size (ctx -> requests )) < 0 )
22632320 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
22642321 json_decref (tstats );
22652322 json_decref (cstats );
@@ -3025,6 +3082,19 @@ int mod_main (flux_t *h, int argc, char **argv)
30253082 flux_log_error (h , "flux_reactor_run" );
30263083 goto done ;
30273084 }
3085+ if (zhashx_size (ctx -> requests ) > 0 ) {
3086+ /* anything that has not yet completed gets an ENOSYS */
3087+ const flux_msg_t * msg = zhashx_first (ctx -> requests );
3088+ while (msg ) {
3089+ const char * topic = "unknown" ;
3090+ if (flux_msg_get_topic (msg , & topic ) < 0 )
3091+ flux_log_error (ctx -> h , "%s: flux_msg_get_topic" , __FUNCTION__ );
3092+ if (flux_respond_error (ctx -> h , msg , ENOSYS , NULL ) < 0 )
3093+ flux_log_error (ctx -> h , "%s: flux_respond_error" , __FUNCTION__ );
3094+ flux_log (ctx -> h , LOG_ERR , "failing pending '%s' request" , topic );
3095+ msg = zhashx_next (ctx -> requests );
3096+ }
3097+ }
30283098 /* Checkpoint the KVS root to the content backing store.
30293099 * If backing store is not loaded, silently proceed without checkpoint.
30303100 */
0 commit comments