@@ -97,9 +97,11 @@ Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) {
9797 master_context_.port = port;
9898}
9999
100- Replica::Replica (const MasterContext& context, uint32_t dfly_flow_id, Service* service)
100+ Replica::Replica (const MasterContext& context, uint32_t dfly_flow_id, Service* service,
101+ std::shared_ptr<Replica::MultiShardExecution> shared_exe_data)
101102 : service_(*service), master_context_(context) {
102103 master_context_.dfly_flow_id = dfly_flow_id;
104+ multi_shard_exe_ = shared_exe_data;
103105}
104106
105107Replica::~Replica () {
@@ -427,13 +429,13 @@ error_code Replica::InitiatePSync() {
427429// Initialize and start sub-replica for each flow.
428430error_code Replica::InitiateDflySync () {
429431 DCHECK_GT (num_df_flows_, 0u );
430-
432+ multi_shard_exe_. reset ( new MultiShardExecution ());
431433 shard_flows_.resize (num_df_flows_);
432434 for (unsigned i = 0 ; i < num_df_flows_; ++i) {
433- shard_flows_[i].reset (new Replica (master_context_, i, &service_));
435+ shard_flows_[i].reset (new Replica (master_context_, i, &service_, multi_shard_exe_ ));
434436 }
435437
436- // Blocked on untill all flows got full sync cut.
438+ // Blocked on until all flows got full sync cut.
437439 fibers_ext::BlockingCounter sync_block{num_df_flows_};
438440
439441 auto err_handler = [this , sync_block](const auto & ge) mutable {
@@ -705,14 +707,63 @@ void Replica::StableSyncDflyFb(Context* cntx) {
705707 cntx->Error (res.error (), " Journal format error" );
706708 return ;
707709 }
708-
709- executor.Execute (std::move (res.value ()));
710-
710+ ExecuteEntry (&executor, res.value ());
711711 last_io_time_ = sock_->proactor ()->GetMonotonicTimeNs ();
712712 }
713713 return ;
714714}
715715
716+ void Replica::ExecuteEntry (JournalExecutor* executor, journal::ParsedEntry& entry) {
717+ if (entry.shard_cnt <= 1 ) { // not multi shard cmd
718+ executor->Execute (entry);
719+ return ;
720+ }
721+
722+ // Multi shard command flow:
723+ // step 1: Fiber wait until all the fibers that should execute this tranaction got
724+ // to the journal entry of the transaction.
725+ // step 2: execute the command (All fibers)
726+ // step 3: Fiber wait until all fibers finished the execution
727+ // By step 1 we enforce that replica will execute multi shard commands that finished on master
728+ // By step 3 we ensures the correctness of flushall/flushdb commands
729+
730+ // TODO: this implemantaion does not support atomicity in replica
731+ // Although multi shard transaction happen in step 2 very close to each other,
732+ // user can query replica between executions.
733+ // To support atomicity we should have one fiber in step 2 which will excute all the entries of
734+ // the transaction together. In case of global comand such as flushdb the command can be executed
735+ // by only one fiber.
736+
737+ // TODO: support error handler in this flow
738+
739+ // Only the first fiber to reach the transaction will create data for transaction in map
740+ multi_shard_exe_->map_mu .lock ();
741+ auto [it, was_insert] = multi_shard_exe_->tx_sync_execution .emplace (entry.txid , entry.shard_cnt );
742+
743+ // Note: we must release the mutex befor calling wait on barrier
744+ multi_shard_exe_->map_mu .unlock ();
745+
746+ VLOG (2 ) << " txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
747+ << " was_insert: " << was_insert;
748+
749+ // step 1
750+ it->second .barrier .wait ();
751+ // step 2
752+ executor->Execute (entry);
753+ // step 3
754+ it->second .barrier .wait ();
755+
756+ // Note: erase from map can be done only after all fibers returned from wait.
757+ // The last fiber which will decrease the counter to 0 will be the one to erase the data from map
758+ auto val = it->second .counter .fetch_sub (1 , std::memory_order_relaxed);
759+ VLOG (2 ) << " txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
760+ << " counter: " << val;
761+ if (val == 1 ) {
762+ std::lock_guard lg{multi_shard_exe_->map_mu };
763+ multi_shard_exe_->tx_sync_execution .erase (entry.txid );
764+ }
765+ }
766+
716767error_code Replica::ReadRespReply (base::IoBuf* io_buf, uint32_t * consumed) {
717768 DCHECK (parser_);
718769
0 commit comments