@@ -64,6 +64,10 @@ const double max_lastuse_age = 10.;
6464 */
6565const double max_namespace_age = 3600. ;
6666
67+ /* Transaction max ops
68+ */
69+ const uint64_t kvs_transaction_max_ops = 65536 ;
70+
6771struct kvs_ctx {
6872 struct cache * cache ; /* blobref => cache_entry */
6973 kvsroot_mgr_t * krm ;
@@ -74,6 +78,7 @@ struct kvs_ctx {
7478 flux_watcher_t * idle_w ;
7579 flux_watcher_t * check_w ;
7680 int transaction_merge ;
81+ uint64_t transaction_max_ops ;
7782 bool events_init ; /* flag */
7883 char * hash_name ;
7984 unsigned int seq ; /* for commit transactions */
@@ -104,6 +109,9 @@ static void start_root_remove (struct kvs_ctx *ctx, const char *ns);
104109static void work_queue_check_append (struct kvs_ctx * ctx ,
105110 struct kvsroot * root );
106111static void kvstxn_apply (kvstxn_t * kt );
112+ static int max_ops_parse (struct kvs_ctx * ctx ,
113+ const flux_conf_t * conf ,
114+ flux_error_t * errp );
107115
108116/*
109117 * kvs_ctx functions
@@ -183,6 +191,7 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
183191 goto error ;
184192 }
185193 ctx -> transaction_merge = 1 ;
194+ ctx -> transaction_max_ops = kvs_transaction_max_ops ;
186195 if (!(ctx -> requests = msg_hash_create (MSG_HASH_TYPE_UUID_MATCHTAG )))
187196 goto error ;
188197 list_head_init (& ctx -> work_queue );
@@ -1773,6 +1782,11 @@ static void commit_request_cb (flux_t *h,
17731782 goto error ;
17741783 }
17751784
1785+ if (json_array_size (ops ) > ctx -> transaction_max_ops ) {
1786+ errno = E2BIG ;
1787+ goto error ;
1788+ }
1789+
17761790 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
17771791 if (stall ) {
17781792 request_tracking_add (ctx , msg );
@@ -1983,6 +1997,11 @@ static void fence_request_cb (flux_t *h,
19831997 goto error ;
19841998 }
19851999
2000+ if (json_array_size (ops ) > ctx -> transaction_max_ops ) {
2001+ errno = E2BIG ;
2002+ goto error ;
2003+ }
2004+
19862005 if (!(root = getroot (ctx , ns , mh , msg , NULL , & stall ))) {
19872006 if (stall ) {
19882007 request_tracking_add (ctx , msg );
@@ -2956,6 +2975,10 @@ static void config_reload_cb (flux_t *h,
29562975
29572976 if (flux_conf_reload_decode (msg , & conf ) < 0 )
29582977 goto error ;
2978+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
2979+ errstr = error .text ;
2980+ goto error ;
2981+ }
29592982 if (kvs_checkpoint_reload (ctx -> kcp , conf , & error ) < 0 ) {
29602983 errstr = error .text ;
29612984 goto error ;
@@ -3112,11 +3135,39 @@ static const struct flux_msg_handler_spec htab[] = {
31123135 FLUX_MSGHANDLER_TABLE_END ,
31133136};
31143137
3138+ static int max_ops_parse (struct kvs_ctx * ctx ,
3139+ const flux_conf_t * conf ,
3140+ flux_error_t * errp )
3141+ {
3142+ uint64_t t_max_ops = kvs_transaction_max_ops ;
3143+ flux_error_t error ;
3144+ if (flux_conf_unpack (conf ,
3145+ & error ,
3146+ "{s?{s?I}}" ,
3147+ "kvs" ,
3148+ "transaction-max-ops" , & t_max_ops ) < 0 ) {
3149+ errprintf (errp , "error reading config for kvs: %s" , error .text );
3150+ return -1 ;
3151+ }
3152+ if (t_max_ops <= 0 ) {
3153+ errprintf (errp , "kvs transaction-max-ops invalid" );
3154+ return -1 ;
3155+ }
3156+ ctx -> transaction_max_ops = t_max_ops ;
3157+ return 0 ;
3158+ }
3159+
31153160static int process_config (struct kvs_ctx * ctx )
31163161{
31173162 flux_error_t error ;
3163+ const flux_conf_t * conf = flux_get_conf (ctx -> h );
3164+
3165+ if (max_ops_parse (ctx , conf , & error ) < 0 ) {
3166+ flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
3167+ return -1 ;
3168+ }
31183169 if (kvs_checkpoint_config_parse (ctx -> kcp ,
3119- flux_get_conf ( ctx -> h ) ,
3170+ conf ,
31203171 & error ) < 0 ) {
31213172 flux_log (ctx -> h , LOG_ERR , "%s" , error .text );
31223173 return -1 ;
0 commit comments