3232#include " meta-service/meta_service_helper.h"
3333#include " meta-store/clone_chain_reader.h"
3434#include " meta-store/keys.h"
35+ #include " meta-store/meta_reader.h"
3536#include " meta-store/txn_kv.h"
3637#include " meta-store/txn_kv_error.h"
38+ #include " meta-store/versioned_value.h"
3739
3840namespace doris ::cloud {
3941
@@ -302,7 +304,8 @@ void internal_get_load_tablet_stats_batch(MetaServiceCode& code, std::string& ms
302304
303305MetaServiceResponseStatus parse_fix_tablet_stats_param (
304306 std::shared_ptr<ResourceManager> resource_mgr, const std::string& table_id_str,
305- const std::string& cloud_unique_id_str, int64_t & table_id, std::string& instance_id) {
307+ const std::string& cloud_unique_id_str, const std::string& tablet_id_str, int64_t & table_id,
308+ std::string& instance_id, int64_t & tablet_id) {
306309 MetaServiceCode code = MetaServiceCode::OK;
307310 std::string msg;
308311 MetaServiceResponseStatus st;
@@ -317,6 +320,16 @@ MetaServiceResponseStatus parse_fix_tablet_stats_param(
317320 return st;
318321 }
319322
323+ if (!tablet_id_str.empty ()) {
324+ try {
325+ tablet_id = std::stoll (tablet_id_str);
326+ } catch (...) {
327+ st.set_code (MetaServiceCode::INVALID_ARGUMENT);
328+ st.set_msg (" Invalid tablet_id, tablet_id: " + tablet_id_str);
329+ return st;
330+ }
331+ }
332+
320333 instance_id = get_instance_id (resource_mgr, cloud_unique_id_str);
321334 if (instance_id.empty ()) {
322335 code = MetaServiceCode::INVALID_ARGUMENT;
@@ -636,4 +649,163 @@ MetaServiceResponseStatus check_new_tablet_stats(
636649 return st;
637650}
638651
652+ std::pair<MetaServiceCode, std::string> fix_versioned_tablet_stats_internal (
653+ TxnKv* txn_kv, const std::string& instance_id, const TabletIndexPB& tablet_idx,
654+ bool is_versioned_read, bool is_versioned_write, ResourceManager* resource_mgr) {
655+ int64_t tablet_id = tablet_idx.tablet_id ();
656+ std::unique_ptr<Transaction> txn;
657+ MetaServiceCode code = MetaServiceCode::OK;
658+ std::string msg;
659+
660+ TxnErrorCode err = txn_kv->create_txn (&txn);
661+ if (err != TxnErrorCode::TXN_OK) {
662+ code = cast_as<ErrCategory::CREATE>(err);
663+ msg = " failed to create txn" ;
664+ return {code, msg};
665+ }
666+
667+ TabletStatsPB original_tablet_stat;
668+ TabletStatsPB existing_compact_stats;
669+ TabletStatsPB existing_load_stats;
670+ Versionstamp compact_versionstamp;
671+ Versionstamp load_versionstamp;
672+ GetRowsetResponse resp;
673+
674+ CloneChainReader meta_reader (instance_id, resource_mgr);
675+ if (is_versioned_read) {
676+ // Get existing compact stats
677+ err = meta_reader.get_tablet_compact_stats (txn.get (), tablet_id, &existing_compact_stats,
678+ &compact_versionstamp, true );
679+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
680+ code = cast_as<ErrCategory::READ>(err);
681+ msg = fmt::format (" failed to get versioned compact stats, tablet_id={}, err={}" ,
682+ tablet_id, err);
683+ return {code, msg};
684+ }
685+
686+ // Get existing load stats
687+ err = meta_reader.get_tablet_load_stats (txn.get (), tablet_id, &existing_load_stats,
688+ &load_versionstamp, true );
689+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
690+ code = cast_as<ErrCategory::READ>(err);
691+ msg = fmt::format (" failed to get versioned load stats, tablet_id={}, err={}" , tablet_id,
692+ err);
693+ return {code, msg};
694+ }
695+ MetaReader::merge_tablet_stats (existing_compact_stats, existing_load_stats,
696+ &original_tablet_stat);
697+
698+ std::vector<RowsetMetaCloudPB> rowset_metas;
699+ int64_t start = 0 , end = std::numeric_limits<int64_t >::max () - 1 ;
700+ err = meta_reader.get_rowset_metas (txn.get (), tablet_id, start, end, &rowset_metas);
701+ if (err != TxnErrorCode::TXN_OK) {
702+ code = cast_as<ErrCategory::READ>(err);
703+ msg = fmt::format (" failed to get versioned rowset, err={}, tablet_id={}" , err,
704+ tablet_id);
705+ return {code, msg};
706+ }
707+
708+ std::move (rowset_metas.begin (), rowset_metas.end (),
709+ google::protobuf::RepeatedPtrFieldBackInserter (resp.mutable_rowset_meta ()));
710+ } else {
711+ internal_get_tablet_stats (code, msg, txn.get (), instance_id, tablet_idx,
712+ original_tablet_stat, true );
713+ if (code != MetaServiceCode::OK) {
714+ return {code, msg};
715+ }
716+ // get rowsets in tablet and accumulate disk size
717+ internal_get_rowset (txn.get (), 0 , std::numeric_limits<int64_t >::max () - 1 , instance_id,
718+ tablet_id, code, msg, &resp);
719+ if (code != MetaServiceCode::OK) {
720+ return {code, msg};
721+ }
722+ }
723+
724+ int64_t table_id = original_tablet_stat.idx ().table_id ();
725+ int64_t index_id = original_tablet_stat.idx ().index_id ();
726+ int64_t partition_id = original_tablet_stat.idx ().partition_id ();
727+
728+ int64_t total_disk_size = 0 ;
729+ int64_t index_disk_size = 0 ;
730+ int64_t data_disk_size = 0 ;
731+ for (const auto & rs_meta : resp.rowset_meta ()) {
732+ total_disk_size += rs_meta.total_disk_size ();
733+ index_disk_size += rs_meta.index_disk_size ();
734+ data_disk_size += rs_meta.data_disk_size ();
735+ }
736+
737+ // set new disk size to tabletPB and write it back
738+ TabletStatsPB tablet_stat;
739+ tablet_stat.CopyFrom (original_tablet_stat);
740+ tablet_stat.set_data_size (total_disk_size);
741+ tablet_stat.set_index_size (index_disk_size);
742+ tablet_stat.set_segment_size (data_disk_size);
743+
744+ // Write single version stats
745+ std::string tablet_stat_key;
746+ std::string tablet_stat_value;
747+ tablet_stat_key = stats_tablet_key ({instance_id, table_id, index_id, partition_id, tablet_id});
748+ if (!tablet_stat.SerializeToString (&tablet_stat_value)) {
749+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
750+ msg = " failed to serialize tablet stat" ;
751+ return {code, msg};
752+ }
753+ txn->put (tablet_stat_key, tablet_stat_value);
754+
755+ std::string num_segs_key =
756+ stats_tablet_num_segs_key ({instance_id, table_id, index_id, partition_id, tablet_id});
757+ std::string num_rows_key =
758+ stats_tablet_num_rows_key ({instance_id, table_id, index_id, partition_id, tablet_id});
759+ std::string num_rowsets_key = stats_tablet_num_rowsets_key (
760+ {instance_id, table_id, index_id, partition_id, tablet_id});
761+ std::string data_size_key =
762+ stats_tablet_data_size_key ({instance_id, table_id, index_id, partition_id, tablet_id});
763+ std::string index_size_key =
764+ stats_tablet_index_size_key ({instance_id, table_id, index_id, partition_id, tablet_id});
765+ std::string segment_size_key = stats_tablet_segment_size_key (
766+ {instance_id, table_id, index_id, partition_id, tablet_id});
767+ txn->remove (num_segs_key);
768+ txn->remove (num_rows_key);
769+ txn->remove (num_rowsets_key);
770+ txn->remove (data_size_key);
771+ txn->remove (index_size_key);
772+ txn->remove (segment_size_key);
773+
774+ if (is_versioned_write) {
775+ // Write compact stats (aggregate stats with accurate disk sizes)
776+ std::string compact_stats_key =
777+ versioned::tablet_compact_stats_key ({instance_id, tablet_id});
778+ TabletStatsPB compact_stats = tablet_stat; // Use the fixed stats with accurate disk sizes
779+ versioned_put (txn.get (), compact_stats_key, compact_versionstamp, tablet_stat_value);
780+ LOG (INFO) << " put versioned tablet compact stats key=" << hex (compact_stats_key)
781+ << " tablet_id=" << tablet_id << " with existing versionstamp" ;
782+
783+ // Write load stats (detached stats, set to 0 since we recalculated from rowsets)
784+ std::string load_stats_key = versioned::tablet_load_stats_key ({instance_id, tablet_id});
785+ TabletStatsPB load_stats;
786+ load_stats.mutable_idx ()->CopyFrom (tablet_stat.idx ());
787+
788+ std::string load_stats_value;
789+ if (!load_stats.SerializeToString (&load_stats_value)) {
790+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
791+ msg = " failed to serialize load stats" ;
792+ return {code, msg};
793+ }
794+
795+ // Overwrite with existing versionstamp
796+ versioned_put (txn.get (), load_stats_key, load_versionstamp, load_stats_value);
797+ LOG (INFO) << " put versioned tablet load stats key=" << hex (load_stats_key)
798+ << " tablet_id=" << tablet_id << " with existing versionstamp" ;
799+ }
800+
801+ err = txn->commit ();
802+ if (err != TxnErrorCode::TXN_OK) {
803+ code = cast_as<ErrCategory::COMMIT>(err);
804+ msg = " failed to commit txn" ;
805+ return {code, msg};
806+ }
807+
808+ return {MetaServiceCode::OK, " " };
809+ }
810+
639811} // namespace doris::cloud
0 commit comments