@@ -63,6 +63,10 @@ const double max_lastuse_age = 10.;
6363 */
6464const double max_namespace_age = 3600. ;
6565
66+ /* Transaction max ops
67+ */
68+ const uint64_t kvs_transaction_max_ops = 131072 ;
69+
6670struct kvs_ctx {
6771 struct cache * cache ; /* blobref => cache_entry */
6872 kvsroot_mgr_t * krm ;
@@ -73,6 +77,7 @@ struct kvs_ctx {
7377 flux_watcher_t * idle_w ;
7478 flux_watcher_t * check_w ;
7579 int transaction_merge ;
80+ uint64_t transaction_max_ops ;
7681 bool events_init ; /* flag */
7782 char * hash_name ;
7883 unsigned int seq ; /* for commit transactions */
@@ -102,6 +107,9 @@ static void start_root_remove (struct kvs_ctx *ctx, const char *ns);
102107static void work_queue_check_append (struct kvs_ctx * ctx ,
103108 struct kvsroot * root );
104109static void kvstxn_apply (kvstxn_t * kt );
110+ static int max_ops_parse (struct kvs_ctx * ctx ,
111+ const flux_conf_t * conf ,
112+ flux_error_t * errp );
105113
106114/*
107115 * kvs_ctx functions
@@ -181,6 +189,7 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
181189 goto error ;
182190 }
183191 ctx -> transaction_merge = 1 ;
192+ ctx -> transaction_max_ops = kvs_transaction_max_ops ;
184193 if (!(ctx -> requests = msg_hash_create (MSG_HASH_TYPE_UUID_MATCHTAG )))
185194 goto error ;
186195 list_head_init (& ctx -> work_queue );
@@ -1662,6 +1671,11 @@ static void commit_request_cb (flux_t *h,
16621671 goto error ;
16631672 }
16641673
1674+ if (json_array_size (ops ) > ctx -> transaction_max_ops ) {
1675+ errno = E2BIG ;
1676+ goto error ;
1677+ }
1678+
16651679 if (!(root = getroot (ctx , ns , mh , msg , & stall ))) {
16661680 if (stall ) {
16671681 request_tracking_add (ctx , msg );
@@ -2495,6 +2509,10 @@ static void config_reload_cb (flux_t *h,
24952509
24962510 if (flux_conf_reload_decode (msg , & conf ) < 0 )
24972511 goto error ;
2512+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
2513+ errstr = error .text ;
2514+ goto error ;
2515+ }
24982516 if (kvs_checkpoint_reload (ctx -> kcp , conf , & error ) < 0 ) {
24992517 errstr = error .text ;
25002518 goto error ;
@@ -2615,11 +2633,39 @@ static const struct flux_msg_handler_spec htab[] = {
26152633 FLUX_MSGHANDLER_TABLE_END ,
26162634};
26172635
2636+ static int max_ops_parse (struct kvs_ctx * ctx ,
2637+ const flux_conf_t * conf ,
2638+ flux_error_t * errp )
2639+ {
2640+ uint64_t t_max_ops = kvs_transaction_max_ops ;
2641+ flux_error_t error ;
2642+ if (flux_conf_unpack (conf ,
2643+ & error ,
2644+ "{s?{s?I}}" ,
2645+ "kvs" ,
2646+ "transaction-max-ops" , & t_max_ops ) < 0 ) {
2647+ errprintf (errp , "error reading config for kvs: %s" , error .text );
2648+ return -1 ;
2649+ }
2650+ if (t_max_ops <= 0 ) {
2651+ errprintf (errp , "kvs transaction-max-ops invalid" );
2652+ return -1 ;
2653+ }
2654+ ctx -> transaction_max_ops = t_max_ops ;
2655+ return 0 ;
2656+ }
2657+
26182658static int process_config (struct kvs_ctx * ctx )
26192659{
26202660 flux_error_t error ;
2661+ const flux_conf_t * conf = flux_get_conf (ctx -> h );
2662+
2663+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
2664+ flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
2665+ return -1 ;
2666+ }
26212667 if (kvs_checkpoint_config_parse (ctx -> kcp ,
2622- flux_get_conf ( ctx -> h ) ,
2668+ conf ,
26232669 & error ) < 0 ) {
26242670 flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
26252671 return -1 ;
0 commit comments