@@ -64,6 +64,10 @@ const double max_lastuse_age = 10.;
6464 */
6565const double max_namespace_age = 3600. ;
6666
67+ /* Transaction max ops
68+ */
69+ const uint64_t kvs_transaction_max_ops = 65536 ;
70+
6771struct kvs_ctx {
6872 struct cache * cache ; /* blobref => cache_entry */
6973 kvsroot_mgr_t * krm ;
@@ -74,6 +78,7 @@ struct kvs_ctx {
7478 flux_watcher_t * idle_w ;
7579 flux_watcher_t * check_w ;
7680 int transaction_merge ;
81+ uint64_t transaction_max_ops ;
7782 bool events_init ; /* flag */
7883 char * hash_name ;
7984 unsigned int seq ; /* for commit transactions */
@@ -103,6 +108,9 @@ static void start_root_remove (struct kvs_ctx *ctx, const char *ns);
103108static void work_queue_check_append (struct kvs_ctx * ctx ,
104109 struct kvsroot * root );
105110static void kvstxn_apply (kvstxn_t * kt );
111+ static int max_ops_parse (struct kvs_ctx * ctx ,
112+ const flux_conf_t * conf ,
113+ flux_error_t * errp );
106114
107115/*
108116 * kvs_ctx functions
@@ -182,6 +190,7 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
182190 goto error ;
183191 }
184192 ctx -> transaction_merge = 1 ;
193+ ctx -> transaction_max_ops = kvs_transaction_max_ops ;
185194 if (!(ctx -> requests = msg_hash_create (MSG_HASH_TYPE_UUID_MATCHTAG )))
186195 goto error ;
187196 list_head_init (& ctx -> work_queue );
@@ -1663,6 +1672,11 @@ static void commit_request_cb (flux_t *h,
16631672 goto error ;
16641673 }
16651674
1675+ if (json_array_size (ops ) > ctx -> transaction_max_ops ) {
1676+ errno = E2BIG ;
1677+ goto error ;
1678+ }
1679+
16661680 if (!(root = getroot (ctx , ns , mh , msg , & stall ))) {
16671681 if (stall ) {
16681682 request_tracking_add (ctx , msg );
@@ -2496,6 +2510,10 @@ static void config_reload_cb (flux_t *h,
24962510
24972511 if (flux_conf_reload_decode (msg , & conf ) < 0 )
24982512 goto error ;
2513+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
2514+ errstr = error .text ;
2515+ goto error ;
2516+ }
24992517 if (kvs_checkpoint_reload (ctx -> kcp , conf , & error ) < 0 ) {
25002518 errstr = error .text ;
25012519 goto error ;
@@ -2616,11 +2634,39 @@ static const struct flux_msg_handler_spec htab[] = {
26162634 FLUX_MSGHANDLER_TABLE_END ,
26172635};
26182636
2637+ static int max_ops_parse (struct kvs_ctx * ctx ,
2638+ const flux_conf_t * conf ,
2639+ flux_error_t * errp )
2640+ {
2641+ uint64_t t_max_ops = kvs_transaction_max_ops ;
2642+ flux_error_t error ;
2643+ if (flux_conf_unpack (conf ,
2644+ & error ,
2645+ "{s?{s?I}}" ,
2646+ "kvs" ,
2647+ "transaction-max-ops" , & t_max_ops ) < 0 ) {
2648+ errprintf (errp , "error reading config for kvs: %s" , error .text );
2649+ return -1 ;
2650+ }
2651+ if (t_max_ops <= 0 ) {
2652+ errprintf (errp , "kvs transaction-max-ops invalid" );
2653+ return -1 ;
2654+ }
2655+ ctx -> transaction_max_ops = t_max_ops ;
2656+ return 0 ;
2657+ }
2658+
26192659static int process_config (struct kvs_ctx * ctx )
26202660{
26212661 flux_error_t error ;
2662+ const flux_conf_t * conf = flux_get_conf (ctx -> h );
2663+
2664+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
2665+ flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
2666+ return -1 ;
2667+ }
26222668 if (kvs_checkpoint_config_parse (ctx -> kcp ,
2623- flux_get_conf ( ctx -> h ) ,
2669+ conf ,
26242670 & error ) < 0 ) {
26252671 flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
26262672 return -1 ;
0 commit comments