@@ -176,7 +176,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
176176 {
177177 // Get current raw value for the provided key from the RocksDB instance.
178178 db_get_context get_ctx;
179- const int err = _rocksdb_wrapper->get (req.key . to_string_view () , &get_ctx);
179+ const int err = _rocksdb_wrapper->get (req.key , &get_ctx);
180180 if (dsn_unlikely (err != rocksdb::Status::kOk )) {
181181 // Failed to read current raw value.
182182 LOG_ERROR_PREFIX (" failed to get current raw value for incr while making "
@@ -239,13 +239,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
239239 const dsn::apps::update_request &update,
240240 dsn::apps::incr_response &resp)
241241 {
242- const auto pid = get_gpid ();
243- resp.app_id = pid.get_app_id ();
244- resp.partition_index = pid.get_partition_index ();
245- resp.decree = ctx.decree ;
246- resp.server = _primary_host_port;
242+ make_basic_response (ctx.decree , resp);
247243
248- auto cleanup = dsn::defer ([this ]() { _rocksdb_wrapper->clear_up_write_batch (); });
244+ const auto cleanup = dsn::defer ([this ]() { _rocksdb_wrapper->clear_up_write_batch (); });
249245
250246 resp.error = _rocksdb_wrapper->write_batch_put_ctx (
251247 ctx, update.key , update.value , update.expire_ts_seconds );
@@ -369,7 +365,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
369365
370366 // Get the check value.
371367 db_get_context get_ctx;
372- const int err = _rocksdb_wrapper->get (check_key. to_string_view () , &get_ctx);
368+ const int err = _rocksdb_wrapper->get (check_key, &get_ctx);
373369 if (dsn_unlikely (err != rocksdb::Status::kOk )) {
374370 // Failed to read the check value.
375371 LOG_ERROR_PREFIX (" failed to get the check value for check_and_set while making "
@@ -422,17 +418,13 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
422418 const dsn::apps::update_request &update,
423419 dsn::apps::check_and_set_response &resp)
424420 {
425- const auto pid = get_gpid ();
426- resp.app_id = pid.get_app_id ();
427- resp.partition_index = pid.get_partition_index ();
428- resp.decree = ctx.decree ;
429- resp.server = _primary_host_port;
421+ make_basic_response (ctx.decree , resp);
430422
431423 // Copy check_value's fields from the single-put request to the check_and_set
432424 // response to reply to the client.
433425 copy_check_value (update, resp);
434426
435- auto cleanup = dsn::defer ([this ]() { _rocksdb_wrapper->clear_up_write_batch (); });
427+ const auto cleanup = dsn::defer ([this ]() { _rocksdb_wrapper->clear_up_write_batch (); });
436428
437429 resp.error = _rocksdb_wrapper->write_batch_put_ctx (
438430 ctx, update.key , update.value , update.expire_ts_seconds );
@@ -540,6 +532,165 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
540532 return rocksdb::Status::kOk ;
541533 }
542534
535+ // Used to call make_idempotent() for incr and check_and_set to get the idempotent single-put
536+ // request which is stored as the unique element of `updates`.
537+ //
538+ // This interface is provided to ensure consistency between the make_idempotent() interfaces
539+ // of incr/check_and_set operations and that of check_and_mutate (both using std::vector for
540+ // `updates`), thereby facilitating uniform templated function invocation.
541+ template <typename TRequest, typename TResponse>
542+ inline int make_idempotent (const TRequest &req,
543+ TResponse &err_resp,
544+ std::vector<dsn::apps::update_request> &updates)
545+ {
546+ updates.clear ();
547+ updates.emplace_back ();
548+ return make_idempotent (req, err_resp, updates.front ());
549+ }
550+
551+ // Tranlate a check_and_mutate request into multiple single-put and single-remove requests
552+ // which are certainly idempotent. Return current status for RocksDB. Only called by primary
553+ // replicas.
554+ int make_idempotent (const dsn::apps::check_and_mutate_request &req,
555+ dsn::apps::check_and_mutate_response &err_resp,
556+ std::vector<dsn::apps::update_request> &updates)
557+ {
558+ if (dsn_unlikely (req.mutate_list .empty ())) {
559+ LOG_ERROR_PREFIX (" mutate_list is empty for check_and_mutate while making idempotent" );
560+
561+ return make_error_response (rocksdb::Status::kInvalidArgument , err_resp);
562+ }
563+
564+ // Verify operation type for each mutate.
565+ for (size_t i = 0 ; i < req.mutate_list .size (); ++i) {
566+ const auto &mu = req.mutate_list [i];
567+ if (dsn_likely (mu.operation == dsn::apps::mutate_operation::MO_PUT ||
568+ mu.operation == dsn::apps::mutate_operation::MO_DELETE)) {
569+ continue ;
570+ }
571+
572+ LOG_ERROR_PREFIX (" mutate_list[{}]'s operation {} is invalid for check_and_mutate "
573+ " while making idempotent" ,
574+ i,
575+ mu.operation );
576+
577+ return make_error_response (rocksdb::Status::kInvalidArgument , err_resp);
578+ }
579+
580+ if (dsn_unlikely (!is_check_type_supported (req.check_type ))) {
581+ LOG_ERROR_PREFIX (" check type {} is not supported for check_and_mutate " ,
582+ " while making idempotent" ,
583+ cas_check_type_to_string (req.check_type ));
584+
585+ return make_error_response (rocksdb::Status::kInvalidArgument , err_resp);
586+ }
587+
588+ dsn::blob check_key;
589+ pegasus_generate_key (check_key, req.hash_key , req.check_sort_key );
590+
591+ // Get the check value.
592+ db_get_context get_ctx;
593+ const int err = _rocksdb_wrapper->get (check_key, &get_ctx);
594+ if (dsn_unlikely (err != rocksdb::Status::kOk )) {
595+ // Failed to read the check value.
596+ LOG_ERROR_PREFIX (" failed to get the check value for check_and_mutate while making "
597+ " idempotent: rocksdb_status = {}, hash_key = {}, "
598+ " check_sort_key = {}" ,
599+ err,
600+ utils::c_escape_sensitive_string (req.hash_key ),
601+ utils::c_escape_sensitive_string (req.check_sort_key ));
602+
603+ return make_error_response (err, err_resp);
604+ }
605+
606+ dsn::blob check_value;
607+ const bool value_exist = !get_ctx.expired && get_ctx.found ;
608+ if (value_exist) {
609+ pegasus_extract_user_data (
610+ _pegasus_data_version, std::move (get_ctx.raw_value ), check_value);
611+ }
612+
613+ bool invalid_argument = false ;
614+ const bool passed = validate_check (
615+ req.check_type , req.check_operand , value_exist, check_value, invalid_argument);
616+ if (!passed) {
617+ make_check_value (req, value_exist, check_value, err_resp);
618+ return make_error_response (invalid_argument ? rocksdb::Status::kInvalidArgument
619+ : rocksdb::Status::kTryAgain ,
620+ err_resp);
621+ }
622+
623+ // Check passed.
624+ updates.clear ();
625+ for (const auto &mu : req.mutate_list ) {
626+ // Generate new RocksDB key.
627+ dsn::blob set_key;
628+ pegasus_generate_key (set_key, req.hash_key , mu.sort_key );
629+
630+ // Add a new put request.
631+ updates.emplace_back ();
632+
633+ if (mu.operation == dsn::apps::mutate_operation::MO_PUT) {
634+ make_idempotent_request_for_check_and_mutate_put (
635+ set_key, mu.value , mu.set_expire_ts_seconds , updates.back ());
636+ continue ;
637+ }
638+
639+ if (mu.operation == dsn::apps::mutate_operation::MO_DELETE) {
640+ make_idempotent_request_for_check_and_mutate_remove (set_key, updates.back ());
641+ continue ;
642+ }
643+
644+ // It must have returned and replied to the client once there is some invalid
645+ // mutate_operation. Here is just a defensive assertion.
646+ LOG_FATAL (" invalid mutate_operation {} for check_and_mutate while making idempotent" ,
647+ mu.operation );
648+ __builtin_unreachable ();
649+ }
650+
651+ // Add check value to the first generated idempotent request, for the future response to
652+ // the client.
653+ make_check_value (req, value_exist, check_value, updates.front ());
654+
655+ return rocksdb::Status::kOk ;
656+ }
657+
658+ // Apply the single-put and single-remove requests translated from a check_and_mutate request
659+ // into RocksDB, and build response for the check_and_mutate request. Return current status
660+ // for RocksDB. Only called by primary replicas.
661+ int put (const db_write_context &ctx,
662+ const std::vector<dsn::apps::update_request> &updates,
663+ dsn::apps::check_and_mutate_response &resp)
664+ {
665+ make_basic_response (ctx.decree , resp);
666+
667+ // Copy check_value's fields from the first idempotent request to the check_and_mutate
668+ // response to reply to the client.
669+ copy_check_value (updates.front (), resp);
670+
671+ const auto cleanup = dsn::defer ([this ]() { _rocksdb_wrapper->clear_up_write_batch (); });
672+
673+ for (const auto &update : updates) {
674+ if (update.type == dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT) {
675+ resp.error = _rocksdb_wrapper->write_batch_put_ctx (
676+ ctx, update.key , update.value , update.expire_ts_seconds );
677+ } else if (update.type == dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE) {
678+ resp.error = _rocksdb_wrapper->write_batch_delete (ctx.decree , update.key );
679+ } else {
680+ LOG_FATAL (" invalid update_type for check_and_mutate {} while making idempotent" ,
681+ update.type );
682+ __builtin_unreachable ();
683+ }
684+
685+ if (dsn_unlikely (resp.error != rocksdb::Status::kOk )) {
686+ return resp.error ;
687+ }
688+ }
689+
690+ resp.error = _rocksdb_wrapper->write (ctx.decree );
691+ return resp.error ;
692+ }
693+
543694 int check_and_mutate (int64_t decree,
544695 const dsn::apps::check_and_mutate_request &update,
545696 dsn::apps::check_and_mutate_response &resp)
@@ -890,6 +1041,38 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
8901041 return rocksdb::Status::kOk ;
8911042 }
8921043
1044+ // Build a single-put `update` for a mutate of MO_PUT in a check_and_mutate request based
1045+ // on `key`, `value` and `expire_ts_seconds`.
1046+ static inline void
1047+ make_idempotent_request_for_check_and_mutate_put (const dsn::blob &key,
1048+ const dsn::blob &value,
1049+ int32_t expire_ts_seconds,
1050+ dsn::apps::update_request &update)
1051+ {
1052+ make_idempotent_request (
1053+ key, value, expire_ts_seconds, dsn::apps::update_type::UT_CHECK_AND_MUTATE_PUT, update);
1054+ }
1055+
1056+ // Build a single-remove `update` for a mutate of MO_DELETE in a check_and_mutate request
1057+ // based on `key`.
1058+ static inline void
1059+ make_idempotent_request_for_check_and_mutate_remove (const dsn::blob &key,
1060+ dsn::apps::update_request &update)
1061+ {
1062+ make_idempotent_request (key, dsn::apps::update_type::UT_CHECK_AND_MUTATE_REMOVE, update);
1063+ }
1064+
1065+ // Build response `resp` with basic info.
1066+ template <typename TResponse>
1067+ inline void make_basic_response (int64_t decree, TResponse &resp)
1068+ {
1069+ const auto pid = get_gpid ();
1070+ resp.app_id = pid.get_app_id ();
1071+ resp.partition_index = pid.get_partition_index ();
1072+ resp.decree = decree;
1073+ resp.server = _primary_host_port;
1074+ }
1075+
8931076 // Build response `resp` based on `err` only for the error case (i.e. the current status
8941077 // `err` for RocksDB is not rocksdb::Status::kOk). Return `err`.
8951078 //
0 commit comments