3636#include "src/common/libkvs/kvs_util_private.h"
3737#include "src/common/libcontent/content.h"
3838#include "src/common/libutil/fsd.h"
39+ #include "src/common/librouter/msg_hash.h"
3940
4041#include "waitqueue.h"
4142#include "cache.h"
@@ -77,6 +78,7 @@ struct kvs_ctx {
7778 char * hash_name ;
7879 unsigned int seq ; /* for commit transactions */
7980 kvs_checkpoint_t * kcp ;
81+ zhashx_t * requests ; /* track unfinished requests */
8082 struct list_head work_queue ;
8183};
8284
@@ -111,11 +113,23 @@ static void kvs_ctx_destroy (struct kvs_ctx *ctx)
111113 flux_watcher_destroy (ctx -> idle_w );
112114 kvs_checkpoint_destroy (ctx -> kcp );
113115 free (ctx -> hash_name );
116+ zhashx_destroy (& ctx -> requests );
114117 free (ctx );
115118 errno = saved_errno ;
116119 }
117120}
118121
122+ static void request_tracking_add (struct kvs_ctx * ctx , const flux_msg_t * msg )
123+ {
124+ /* ignore if item already tracked */
125+ zhashx_insert (ctx -> requests , msg , (flux_msg_t * )msg );
126+ }
127+
128+ static void request_tracking_remove (struct kvs_ctx * ctx , const flux_msg_t * msg )
129+ {
130+ zhashx_delete (ctx -> requests , msg );
131+ }
132+
119133static void work_queue_check_append_wrapper (struct kvsroot * root ,
120134 void * arg )
121135{
@@ -164,6 +178,8 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
164178 goto error ;
165179 }
166180 ctx -> transaction_merge = 1 ;
181+ if (!(ctx -> requests = msg_hash_create (MSG_HASH_TYPE_UUID_MATCHTAG )))
182+ goto error ;
167183 list_head_init (& ctx -> work_queue );
168184 return ctx ;
169185error :
@@ -364,6 +380,13 @@ static void getroot_completion (flux_future_t *f, void *arg)
364380error :
365381 if (flux_respond_error (ctx -> h , msg , errno , NULL ) < 0 )
366382 flux_log_error (ctx -> h , "%s: flux_respond_error" , __FUNCTION__ );
383+ /* N.B. getroot request from other requests (e.g. lookup, commit)
384+ * may stall and be tracked. So we need to remove tracking of the
385+ * request if there is an error. We do not remove tracking on
386+ * getroot success, as the original request (e.g. lookup, commit)
387+ * will deal with the success case.
388+ */
389+ request_tracking_remove (ctx , msg );
367390 flux_msg_destroy (msg );
368391 flux_future_destroy (f );
369392}
@@ -1437,8 +1460,10 @@ static void lookup_request_cb (flux_t *h, flux_msg_handler_t *mh,
14371460
14381461 if (!(lh = lookup_common (h , mh , msg , ctx , lookup_request_cb ,
14391462 & stall ))) {
1440- if (stall )
1463+ if (stall ) {
1464+ request_tracking_add (ctx , msg );
14411465 return ;
1466+ }
14421467 goto error ;
14431468 }
14441469
@@ -1450,10 +1475,12 @@ static void lookup_request_cb (flux_t *h, flux_msg_handler_t *mh,
14501475 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
14511476 lookup_destroy (lh );
14521477 json_decref (val );
1478+ request_tracking_remove (ctx , msg );
14531479 return ;
14541480error :
14551481 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
14561482 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1483+ request_tracking_remove (ctx , msg );
14571484 lookup_destroy (lh );
14581485}
14591486
@@ -1476,8 +1503,10 @@ static void lookup_plus_request_cb (flux_t *h, flux_msg_handler_t *mh,
14761503
14771504 if (!(lh = lookup_common (h , mh , msg , ctx , lookup_plus_request_cb ,
14781505 & stall ))) {
1479- if (stall )
1506+ if (stall ) {
1507+ request_tracking_add (ctx , msg );
14801508 return ;
1509+ }
14811510 goto error ;
14821511 }
14831512
@@ -1502,10 +1531,12 @@ static void lookup_plus_request_cb (flux_t *h, flux_msg_handler_t *mh,
15021531 }
15031532 lookup_destroy (lh );
15041533 json_decref (val );
1534+ request_tracking_remove (ctx , msg );
15051535 return ;
15061536error :
15071537 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
15081538 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1539+ request_tracking_remove (ctx , msg );
15091540}
15101541
15111542
@@ -1526,6 +1557,7 @@ static int finalize_transaction_req (treq_t *tr,
15261557 flux_log_error (cbd -> ctx -> h , "%s: flux_respond_pack" , __FUNCTION__ );
15271558 }
15281559
1560+ request_tracking_remove (cbd -> ctx , req );
15291561 return 0 ;
15301562}
15311563
@@ -1624,6 +1656,9 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
16241656 goto error ;
16251657 }
16261658
1659+ /* N.B. no request tracking for relay. The relay does not get a
1660+ * response, only the original via finalize_transaction_bynames().
1661+ */
16271662 work_queue_check_append (ctx , root );
16281663 return ;
16291664
@@ -1666,8 +1701,10 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
16661701 }
16671702
16681703 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1669- if (stall )
1704+ if (stall ) {
1705+ request_tracking_add (ctx , msg );
16701706 return ;
1707+ }
16711708 goto error ;
16721709 }
16731710
@@ -1724,11 +1761,13 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
17241761 }
17251762 flux_future_destroy (f );
17261763 }
1764+ request_tracking_add (ctx , msg );
17271765 return ;
17281766
17291767error :
17301768 if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
17311769 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1770+ request_tracking_remove (ctx , msg );
17321771}
17331772
17341773
@@ -1809,6 +1848,9 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh,
18091848 work_queue_check_append (ctx , root );
18101849 }
18111850
1851+ /* N.B. no request tracking for relay. The relay does not get a
1852+ * response, only the original via finalize_transaction_bynames().
1853+ */
18121854 return ;
18131855
18141856error :
@@ -1854,8 +1896,10 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
18541896 }
18551897
18561898 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1857- if (stall )
1899+ if (stall ) {
1900+ request_tracking_add (ctx , msg );
18581901 goto stall ;
1902+ }
18591903 goto error ;
18601904 }
18611905
@@ -1935,11 +1979,13 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
19351979 }
19361980 flux_future_destroy (f );
19371981 }
1982+ request_tracking_add (ctx , msg );
19381983 return ;
19391984
19401985error :
19411986 if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
19421987 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
1988+ request_tracking_remove (ctx , msg );
19431989stall :
19441990 return ;
19451991}
@@ -1961,8 +2007,10 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19612007 }
19622008
19632009 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
1964- if (stall )
2010+ if (stall ) {
2011+ request_tracking_add (ctx , msg );
19652012 return ;
2013+ }
19662014 goto error ;
19672015 }
19682016
@@ -1977,6 +2025,7 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19772025 flux_log_error (h , "%s: kvs_wait_version_add" , __FUNCTION__ );
19782026 goto error ;
19792027 }
2028+ request_tracking_add (ctx , msg );
19802029 return ; /* stall */
19812030 }
19822031
@@ -1985,11 +2034,13 @@ static void wait_version_request_cb (flux_t *h, flux_msg_handler_t *mh,
19852034 "rootref" , root -> ref ) < 0 )
19862035 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
19872036
2037+ request_tracking_remove (ctx , msg );
19882038 return ;
19892039
19902040error :
19912041 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
19922042 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
2043+ request_tracking_remove (ctx , msg );
19932044}
19942045
19952046static void getroot_request_cb (flux_t * h , flux_msg_handler_t * mh ,
@@ -2021,8 +2072,10 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
20212072 */
20222073 bool stall = false;
20232074 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
2024- if (stall )
2075+ if (stall ) {
2076+ request_tracking_add (ctx , msg );
20252077 return ;
2078+ }
20262079 goto error ;
20272080 }
20282081 }
@@ -2035,10 +2088,12 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh,
20352088 "flags" , root -> flags ,
20362089 "namespace" , root -> ns_name ) < 0 )
20372090 flux_log_error (h , "%s: flux_respond_pack" , __FUNCTION__ );
2091+ request_tracking_remove (ctx , msg );
20382092 return ;
20392093error :
20402094 if (flux_respond_error (h , msg , errno , NULL ) < 0 )
20412095 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
2096+ request_tracking_remove (ctx , msg );
20422097}
20432098
20442099static void error_event_cb (flux_t * h , flux_msg_handler_t * mh ,
@@ -3026,6 +3081,19 @@ int mod_main (flux_t *h, int argc, char **argv)
30263081 flux_log_error (h , "flux_reactor_run" );
30273082 goto done ;
30283083 }
3084+ if (zhashx_size (ctx -> requests ) > 0 ) {
3085+ /* anything that has not yet completed gets an ENOSYS */
3086+ const flux_msg_t * msg = zhashx_first (ctx -> requests );
3087+ while (msg ) {
3088+ const char * topic = "unknown" ;
3089+ if (flux_msg_get_topic (msg , & topic ) < 0 )
3090+ flux_log_error (ctx -> h , "%s: flux_msg_get_topic" , __FUNCTION__ );
3091+ if (flux_respond_error (ctx -> h , msg , ENOSYS , NULL ) < 0 )
3092+ flux_log_error (ctx -> h , "%s: flux_respond_error" , __FUNCTION__ );
3093+ flux_log (ctx -> h , LOG_ERR , "failing pending '%s' request" , topic );
3094+ msg = zhashx_next (ctx -> requests );
3095+ }
3096+ }
30293097 /* Checkpoint the KVS root to the content backing store.
30303098 * If backing store is not loaded, silently proceed without checkpoint.
30313099 */
0 commit comments