Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_rowset_meta->set_job_id(_context.job_id);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
if (_context.mow_context != nullptr) {
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
context.tablet_schema = _new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id);
context.job_id = _job_id;
context.write_file_cache = sc_params.output_to_file_cache;
context.tablet = _new_tablet;
if (!context.storage_resource) {
Expand Down
24 changes: 24 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
auto* slice_locations = out->mutable_packed_slice_locations();
slice_locations->clear();
slice_locations->insert(in.packed_slice_locations().begin(), in.packed_slice_locations().end());
if (in.has_is_recycled()) {
out->set_is_recycled(in.is_recycled());
}
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
}

void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
Expand Down Expand Up @@ -177,6 +183,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
auto* slice_locations = out->mutable_packed_slice_locations();
slice_locations->clear();
slice_locations->insert(in.packed_slice_locations().begin(), in.packed_slice_locations().end());
if (in.has_is_recycled()) {
out->set_is_recycled(in.is_recycled());
}
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
}

RowsetMetaPB cloud_rowset_meta_to_doris(const RowsetMetaCloudPB& in) {
Expand Down Expand Up @@ -259,6 +271,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
auto* slice_locations = out->mutable_packed_slice_locations();
slice_locations->clear();
slice_locations->insert(in.packed_slice_locations().begin(), in.packed_slice_locations().end());
if (in.has_is_recycled()) {
out->set_is_recycled(in.is_recycled());
}
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
}

void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
Expand Down Expand Up @@ -330,6 +348,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
auto* slice_locations = out->mutable_packed_slice_locations();
slice_locations->clear();
slice_locations->insert(in.packed_slice_locations().begin(), in.packed_slice_locations().end());
if (in.has_is_recycled()) {
out->set_is_recycled(in.is_recycled());
}
if (in.has_job_id()) {
out->set_job_id(in.job_id());
}
}

TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
ctx.approximate_bytes_to_write = _input_rowsets_total_size;
ctx.tablet = _tablet;
ctx.job_id = _uuid;

_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical));
RETURN_IF_ERROR(
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
new_load_id->set_lo(load_id.lo());
}

void set_job_id(const std::string& job_id) { _rowset_meta_pb.set_job_id(job_id); }

const std::string& job_id() const { return _rowset_meta_pb.job_id(); }

bool delete_flag() const { return _rowset_meta_pb.delete_flag(); }

int64_t creation_time() const { return _rowset_meta_pb.creation_time(); }
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ struct RowsetWriterContext {

std::optional<EncryptionAlgorithmPB> encrypt_algorithm;

std::string job_id;

bool is_local_rowset() const { return !storage_resource; }

std::string segment_path(int seg_id) const {
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ CONF_mBool(enable_load_txn_status_check, "true");

CONF_mBool(enable_tablet_job_check, "true");

CONF_mBool(enable_recycle_delete_rowset_key_check, "true");
CONF_mBool(enable_mark_delete_rowset_before_recycle, "true");
CONF_mBool(enable_abort_txn_and_job_for_delete_rowset_before_recycle, "true");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
Expand Down
94 changes: 73 additions & 21 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,75 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
}
}

// Check recycle_rowset_key to ensure idempotency for commit_rowset operation.
// The precondition for commit_rowset is that prepare_rowset has been successfully executed,
// which creates the recycle_rowset_key. Therefore, we only need to check if the
// recycle_rowset_key exists to determine if this is a duplicate request:
// - If key not found: commit_rowset has already been executed and remove the key,
// this is a duplicate request and should be rejected.
// - If key exists but marked as recycled: the rowset data has been recycled by recycler,
// this request should be rejected to prevent data inconsistency.
// - If key exists and not marked: this is a valid commit_rowset request, proceed normally.
// Note: We don't need to check txn/job status separately because prepare_rowset has already
// validated them, and the existence of recycle_rowset_key is sufficient to guarantee idempotency.
int check_idempotent_for_txn_or_job(Transaction* txn, const std::string& recycle_rs_key,
doris::RowsetMetaCloudPB& rowset_meta,
const std::string& instance_id, int64_t tablet_id,
const std::string& rowset_id, const std::string& tablet_job_id,
bool is_versioned_read, ResourceManager* resource_mgr,
MetaServiceCode& code, std::string& msg) {
if (!rowset_meta.has_delete_predicate() && config::enable_recycle_delete_rowset_key_check) {
std::string recycle_rs_val;
TxnErrorCode err = txn->get(recycle_rs_key, &recycle_rs_val);
if (err == TxnErrorCode::TXN_OK) {
RecycleRowsetPB recycle_rowset;
if (!recycle_rowset.ParseFromString(recycle_rs_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("malformed recycle rowset value. key={}", hex(recycle_rs_key));
return 1;
}
auto rs_meta = recycle_rowset.rowset_meta();
if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("rowset has already been marked as recycled, key={}, rs_meta={}",
hex(recycle_rs_key), rs_meta.ShortDebugString());
return 1;
}
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("recycle rowset key not found, key={}", hex(recycle_rs_key));
return 1;
} else {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get recycle rowset, err={}, key={}", err,
hex(recycle_rs_key));
return -1;
}
} else if (!config::enable_recycle_delete_rowset_key_check) {
if (config::enable_tablet_job_check && tablet_job_id.empty() && !tablet_job_id.empty()) {
if (!check_job_existed(txn, code, msg, instance_id, tablet_id, rowset_id, tablet_job_id,
is_versioned_read, resource_mgr)) {
return 1;
}
}

// Check if the prepare rowset request is invalid.
// If the transaction has been finished, it means this prepare rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
// If the rowset had load id, it means it is a load request, otherwise it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn, instance_id,
rowset_meta.txn_id(), code, msg)) {
LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg;
return 1;
}
}
return 0;
}

/**
* 1. Check and confirm tmp rowset kv does not exist
* a. if exist
Expand Down Expand Up @@ -2635,27 +2704,12 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}

// Check if the compaction/sc tablet job has finished
bool is_versioned_read = is_version_read_enabled(instance_id);
if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
!request->tablet_job_id().empty()) {
if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id,
request->tablet_job_id(), is_versioned_read, resource_mgr_.get())) {
return;
}
}
auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id});

// Check if the commit rowset request is invalid.
// If the transaction has been finished, it means this commit rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
// If the rowset has load id, it means it is a load request, otherwise it is a
// compaction/sc request.
if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
!check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id,
rowset_meta.txn_id(), code, msg)) {
LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
<< ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id
<< ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg;
if (check_idempotent_for_txn_or_job(txn.get(), recycle_rs_key, rowset_meta, instance_id,
tablet_id, rowset_id, request->tablet_job_id(),
is_versioned_read, resource_mgr_.get(), code, msg) != 0) {
return;
}

Expand Down Expand Up @@ -2765,9 +2819,7 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
rowset_meta.set_allocated_tablet_schema(nullptr);
}

auto recycle_rs_key = recycle_rowset_key({instance_id, tablet_id, rowset_id});
txn->remove(recycle_rs_key);

DCHECK_GT(rowset_meta.txn_expiration(), 0);
auto tmp_rs_val = rowset_meta.SerializeAsString();
txn->put(tmp_rs_key, tmp_rs_val);
Expand Down
12 changes: 12 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ static void* run_bthread_work(void* arg) {
int64_t partition_count, int64_t tablet_count,
int64_t txn_id);

[[maybe_unused]] void _abort_txn(const std::string& instance_id, const AbortTxnRequest* request,
Transaction* txn, TxnInfoPB& return_txn_info,
std::stringstream& ss, MetaServiceCode& code, std::string& msg);

[[maybe_unused]] void _finish_tablet_job(const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
std::string& instance_id,
std::unique_ptr<Transaction>& txn, TxnKv* txn_kv,
DeleteBitmapLockWhiteList* delete_bitmap_lock_white_list,
ResourceManager* resource_mgr, MetaServiceCode& code,
std::string& msg, std::stringstream& ss);

class MetaServiceImpl : public cloud::MetaService {
public:
MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, std::shared_ptr<ResourceManager> resource_mgr,
Expand Down
87 changes: 56 additions & 31 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,14 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
return;
}

if (rs_meta.has_is_recycled() && rs_meta.is_recycled()) {
SS << "rowset has already been marked as recycled, tablet_id=" << tablet_id
<< " txn_id=" << rs_meta.txn_id() << " rowset_id=" << rs_meta.rowset_id_v2();
msg = ss.str();
code = MetaServiceCode::TXN_ALREADY_ABORTED;
return;
}

txn->remove(tmp_rowset_key);
INSTANCE_LOG(INFO) << "remove tmp rowset meta, tablet_id=" << tablet_id
<< " tmp_rowset_key=" << hex(tmp_rowset_key);
Expand Down Expand Up @@ -1804,6 +1812,16 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
msg = ss.str();
return;
}

if (tmp_rowset_meta.has_is_recycled() && tmp_rowset_meta.is_recycled()) {
SS << "rowset has already been marked as recycled, tablet_id=" << new_tablet_id
<< " txn_id=" << tmp_rowset_meta.txn_id()
<< " rowset_id=" << tmp_rowset_meta.rowset_id_v2();
msg = ss.str();
code = MetaServiceCode::TXN_ALREADY_ABORTED;
return;
}

using namespace std::chrono;
auto rowset_visible_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
Expand Down Expand Up @@ -1896,34 +1914,16 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
}
}

void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* controller,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(finish_tablet_job, get, put, del);
std::string cloud_unique_id = request->cloud_unique_id();
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(INFO) << msg;
return;
}
RPC_RATE_LIMIT(finish_tablet_job)
if (!request->has_job() ||
(request->job().compaction().empty() && !request->job().has_schema_change())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid job specified";
return;
}

bool is_versioned_read = is_version_read_enabled(instance_id);
bool is_versioned_write = is_version_write_enabled(instance_id);
void _finish_tablet_job(const FinishTabletJobRequest* request, FinishTabletJobResponse* response,
std::string& instance_id, std::unique_ptr<Transaction>& txn, TxnKv* txn_kv,
DeleteBitmapLockWhiteList* delete_bitmap_lock_white_list,
ResourceManager* resource_mgr, MetaServiceCode& code, std::string& msg,
std::stringstream& ss) {
bool is_versioned_read = resource_mgr->is_version_read_enabled(instance_id);
bool is_versioned_write = resource_mgr->is_version_write_enabled(instance_id);
for (int retry = 0; retry <= 1; retry++) {
bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
Expand All @@ -1943,7 +1943,7 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
} else {
CloneChainReader reader(instance_id, resource_mgr_.get());
CloneChainReader reader(instance_id, resource_mgr);
err = reader.get_tablet_index(txn.get(), tablet_id, &tablet_idx);
if (err != TxnErrorCode::TXN_OK) {
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND
Expand Down Expand Up @@ -1987,20 +1987,19 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
FinishTabletJobRequest_Action action = request->action();

std::string use_version =
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
delete_bitmap_lock_white_list->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
<< " use_version=" << use_version;
if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit, use_version, is_versioned_read,
is_versioned_write, txn_kv_.get(), resource_mgr_.get());
is_versioned_write, txn_kv, resource_mgr);
} else if (request->job().has_schema_change()) {
// Process schema change commit
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job,
instance_id, job_key, need_commit, use_version,
is_versioned_read, is_versioned_write, txn_kv_.get(),
resource_mgr_.get());
is_versioned_read, is_versioned_write, txn_kv, resource_mgr);
}

if (!need_commit) return;
Expand Down Expand Up @@ -2037,6 +2036,32 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
}
}

void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* controller,
const FinishTabletJobRequest* request,
FinishTabletJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(finish_tablet_job, get, put, del);
std::string cloud_unique_id = request->cloud_unique_id();
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
SS << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(INFO) << msg;
return;
}
RPC_RATE_LIMIT(finish_tablet_job)
if (!request->has_job() ||
(request->job().compaction().empty() && !request->job().has_schema_change())) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid job specified";
return;
}
_finish_tablet_job(request, response, instance_id, txn, txn_kv_.get(),
delete_bitmap_lock_white_list_.get(), resource_mgr_.get(), code, msg, ss);
}

#undef SS
#undef INSTANCE_LOG
} // namespace doris::cloud
Loading
Loading