Skip to content

Commit 3f2a712

Browse files
authored
[chore](partition) Write versioned partition key when commit index (#59078)
1 parent 8e8556b commit 3f2a712

File tree

8 files changed

+119
-19
lines changed

8 files changed

+119
-19
lines changed

cloud/src/meta-service/meta_service_partition.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,37 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
286286
}
287287
}
288288

289+
// Save the partition meta keys
290+
if (is_versioned_write) {
291+
for (auto partition_id : request->partition_ids()) {
292+
int64_t db_id = request->db_id();
293+
int64_t table_id = request->table_id();
294+
std::string part_meta_key = versioned::meta_partition_key({instance_id, partition_id});
295+
std::string part_index_key =
296+
versioned::partition_index_key({instance_id, partition_id});
297+
std::string part_inverted_index_key = versioned::partition_inverted_index_key(
298+
{instance_id, db_id, table_id, partition_id});
299+
PartitionIndexPB part_index_pb;
300+
part_index_pb.set_db_id(db_id);
301+
part_index_pb.set_table_id(table_id);
302+
std::string part_index_value;
303+
if (!part_index_pb.SerializeToString(&part_index_value)) {
304+
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
305+
msg = fmt::format("failed to serialize PartitionIndexPB");
306+
LOG_WARNING(msg).tag("part_id", partition_id);
307+
return;
308+
}
309+
versioned_put(txn.get(), part_meta_key, "");
310+
txn->put(part_inverted_index_key, "");
311+
txn->put(part_index_key, part_index_value);
312+
313+
LOG(INFO) << "xxx put versioned partition index key=" << hex(part_index_key)
314+
<< " partition_id=" << partition_id;
315+
316+
commit_index_log.add_partition_ids(partition_id);
317+
}
318+
}
319+
289320
if (request->has_is_new_table() && request->is_new_table()) {
290321
if (is_versioned_read) {
291322
// Read the table version, to build the operation log visible version range.

cloud/test/meta_service_operation_log_test.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ TEST(MetaServiceOperationLogTest, CommitIndexLog) {
425425
constexpr int64_t db_id = 123;
426426
constexpr int64_t table_id = 10001;
427427
constexpr int64_t index_id = 10002;
428+
constexpr int64_t part_id = 10003;
428429

429430
{
430431
// write instance
@@ -461,12 +462,17 @@ TEST(MetaServiceOperationLogTest, CommitIndexLog) {
461462
req.set_table_id(table_id);
462463
req.add_index_ids(index_id);
463464
req.set_is_new_table(true);
465+
for (size_t i = 0; i < 5; i++) {
466+
req.add_partition_ids(part_id + i);
467+
}
464468
meta_service->commit_index(&ctrl, &req, &res, nullptr);
465469
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString();
466470
}
467471

468472
auto txn_kv = meta_service->txn_kv();
469473
Versionstamp version1;
474+
Versionstamp version2;
475+
470476
{
471477
// Verify index meta/index/inverted indexes are exists
472478
std::string index_meta_key = versioned::meta_index_key({instance_id, index_id});
@@ -488,7 +494,35 @@ TEST(MetaServiceOperationLogTest, CommitIndexLog) {
488494
ASSERT_EQ(index_index.table_id(), table_id);
489495
}
490496

491-
Versionstamp version2;
497+
{
498+
// Verify table version exists
499+
MetaReader meta_reader(instance_id, txn_kv.get());
500+
ASSERT_EQ(meta_reader.get_table_version(table_id, &version2), TxnErrorCode::TXN_OK);
501+
}
502+
503+
{
504+
for (size_t i = 0; i < 5; i++) {
505+
std::string part_index_key = versioned::partition_index_key({instance_id, part_id + i});
506+
std::string part_meta_key = versioned::meta_partition_key({instance_id, part_id + i});
507+
std::string part_inverted_index_key = versioned::partition_inverted_index_key(
508+
{instance_id, db_id, table_id, part_id + i});
509+
std::unique_ptr<Transaction> txn;
510+
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
511+
std::string value;
512+
ASSERT_EQ(versioned_get(txn.get(), part_meta_key, &version1, &value),
513+
TxnErrorCode::TXN_OK);
514+
515+
ASSERT_EQ(txn->get(part_index_key, &value), TxnErrorCode::TXN_OK);
516+
517+
PartitionIndexPB part_index;
518+
ASSERT_TRUE(part_index.ParseFromString(value));
519+
ASSERT_EQ(part_index.db_id(), db_id);
520+
ASSERT_EQ(part_index.table_id(), table_id);
521+
522+
ASSERT_EQ(txn->get(part_inverted_index_key, &value), TxnErrorCode::TXN_OK);
523+
}
524+
}
525+
492526
{
493527
// Verify table version exists
494528
MetaReader meta_reader(instance_id, txn_kv.get());

fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ protected void onCreateRollupReplicaDone() throws AlterCancelException {
110110
rollupIndexList.add(rollupIndexId);
111111
try {
112112
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
113-
.commitMaterializedIndex(dbId, tableId, rollupIndexList, false);
113+
.commitMaterializedIndex(dbId, tableId, rollupIndexList, null, false);
114114
} catch (Exception e) {
115115
LOG.warn("commitMaterializedIndex Exception:{}", e);
116116
throw new AlterCancelException(e.getMessage());

fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected void commitShadowIndex() throws AlterCancelException {
8383
indexIdMap.keySet().stream().collect(Collectors.toList());
8484
try {
8585
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
86-
.commitMaterializedIndex(dbId, tableId, shadowIdxList, false);
86+
.commitMaterializedIndex(dbId, tableId, shadowIdxList, null, false);
8787
} catch (Exception e) {
8888
LOG.warn("commitMaterializedIndex exception:", e);
8989
throw new AlterCancelException(e.getMessage());

fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<Part
844844
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
845845
}
846846
Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(),
847-
succeedPartitionIds, indexIds, true);
847+
succeedPartitionIds, indexIds, true /* isCreateTable */, false /* isBatchCommit */);
848848
LOG.info("begin write edit log to add partitions in batch, "
849849
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
850850
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());

fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -451,24 +451,39 @@ public void beforeCreatePartitions(long dbId, long tableId, List<Long> partition
451451
}
452452
}
453453

454+
/**
455+
* Commit partition creation to MetaService.
456+
*
457+
* @param partitionIds Partition IDs to commit
458+
* @param indexIds Index IDs to commit
459+
* @param isCreateTable Whether this is part of table creation
460+
* @param isBatchCommit If true, use commitMaterializedIndex (commit_index RPC)
461+
* to batch commit all partitions
462+
* and indexes in one MetaService call for better
463+
* performance;
464+
* If false, use commitPartition (commit_partition RPC) to
465+
* commit partitions separately
466+
* @throws DdlException If commit to MetaService fails
467+
*/
454468
@Override
455469
public void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds,
456-
boolean isCreateTable)
470+
boolean isCreateTable, boolean isBatchCommit)
457471
throws DdlException {
458-
if (partitionIds == null) {
459-
commitMaterializedIndex(dbId, tableId, indexIds, isCreateTable);
472+
if (isBatchCommit) {
473+
commitMaterializedIndex(dbId, tableId, indexIds, partitionIds, isCreateTable);
460474
} else {
461475
commitPartition(dbId, tableId, partitionIds, indexIds);
462476
}
463477
if (!Config.check_create_table_recycle_key_remained) {
464478
return;
465479
}
466-
checkCreatePartitions(dbId, tableId, partitionIds, indexIds);
480+
checkCreatePartitions(dbId, tableId, partitionIds, indexIds, isBatchCommit);
467481
}
468482

469-
private void checkCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds)
483+
private void checkCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds,
484+
boolean isBatchCommit)
470485
throws DdlException {
471-
if (partitionIds == null) {
486+
if (isBatchCommit) {
472487
checkMaterializedIndex(dbId, tableId, indexIds);
473488
} else {
474489
checkPartition(dbId, tableId, partitionIds);
@@ -530,10 +545,12 @@ public void commitPartition(long dbId, long tableId, List<Long> partitionIds, Li
530545

531546
Cloud.PartitionRequest.Builder partitionRequestBuilder = Cloud.PartitionRequest.newBuilder();
532547
partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
533-
partitionRequestBuilder.addAllPartitionIds(partitionIds);
534548
partitionRequestBuilder.addAllIndexIds(indexIds);
535549
partitionRequestBuilder.setDbId(dbId);
536550
partitionRequestBuilder.setTableId(tableId);
551+
if (partitionIds != null) {
552+
partitionRequestBuilder.addAllPartitionIds(partitionIds);
553+
}
537554
final Cloud.PartitionRequest partitionRequest = partitionRequestBuilder.build();
538555

539556
Cloud.PartitionResponse response = null;
@@ -596,7 +613,8 @@ public void prepareMaterializedIndex(Long tableId, List<Long> indexIds, long exp
596613
}
597614
}
598615

599-
public void commitMaterializedIndex(long dbId, long tableId, List<Long> indexIds, boolean isCreateTable)
616+
public void commitMaterializedIndex(long dbId, long tableId, List<Long> indexIds, List<Long> partitionIds,
617+
boolean isCreateTable)
600618
throws DdlException {
601619
if (Config.enable_check_compatibility_mode) {
602620
LOG.info("skip committing materialized index in checking compatibility mode");
@@ -609,6 +627,9 @@ public void commitMaterializedIndex(long dbId, long tableId, List<Long> indexIds
609627
indexRequestBuilder.setDbId(dbId);
610628
indexRequestBuilder.setTableId(tableId);
611629
indexRequestBuilder.setIsNewTable(isCreateTable);
630+
if (partitionIds != null) {
631+
indexRequestBuilder.addAllPartitionIds(partitionIds);
632+
}
612633
final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
613634

614635
Cloud.IndexResponse response = null;

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,7 +1781,8 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti
17811781
}
17821782

17831783
if (!isCreateTable) {
1784-
afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable);
1784+
afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, isCreateTable,
1785+
false /* isBatchCommit */);
17851786
}
17861787
if (writeEditLog) {
17871788
Env.getCurrentEnv().getEditLog().logAddPartition(info);
@@ -2205,8 +2206,18 @@ public void beforeCreatePartitions(long dbId, long tableId, List<Long> partition
22052206
throws DdlException {
22062207
}
22072208

2209+
/**
2210+
* Commit partition creation to MetaService (Cloud mode only).
2211+
*
2212+
* @param partitionIds Partition IDs to commit
2213+
* @param indexIds Index IDs to commit
2214+
* @param isCreateTable Whether this is part of table creation
2215+
* @param isBatchCommit If true, use commit_index RPC to batch commit all partitions and indexes in one call;
2216+
* If false, use commit_partition RPC to commit partitions individually
2217+
* @throws DdlException If commit fails
2218+
*/
22082219
public void afterCreatePartitions(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds,
2209-
boolean isCreateTable)
2220+
boolean isCreateTable, boolean isBatchCommit)
22102221
throws DdlException {
22112222
}
22122223

@@ -2953,8 +2964,8 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
29532964
idGeneratorBuffer,
29542965
binlogConfigForTask,
29552966
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
2956-
afterCreatePartitions(db.getId(), olapTable.getId(), null,
2957-
olapTable.getIndexIdList(), true);
2967+
afterCreatePartitions(db.getId(), olapTable.getId(), olapTable.getPartitionIds(),
2968+
olapTable.getIndexIdList(), true /* isCreateTable */, true /* isBatchCommit */);
29582969
olapTable.addPartition(partition);
29592970
} else if (partitionInfo.getType() == PartitionType.RANGE
29602971
|| partitionInfo.getType() == PartitionType.LIST) {
@@ -3047,8 +3058,8 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
30473058
olapTable.getPartitionInfo().getDataProperty(partition.getId())
30483059
.setStoragePolicy(partionStoragePolicy);
30493060
}
3050-
afterCreatePartitions(db.getId(), olapTable.getId(), null,
3051-
olapTable.getIndexIdList(), true);
3061+
afterCreatePartitions(db.getId(), olapTable.getId(), olapTable.getPartitionIds(),
3062+
olapTable.getIndexIdList(), true /* isCreateTable */, true /* isBatchCommit */);
30523063
} else {
30533064
throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
30543065
}
@@ -3509,7 +3520,8 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
35093520
newPartitions.add(newPartition);
35103521
}
35113522

3512-
afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true);
3523+
afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true /* isCreateTable */,
3524+
false /* isBatchCommit */);
35133525

35143526
} catch (DdlException e) {
35153527
// create partition failed, remove all newly created tablets

gensrc/proto/cloud.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,7 @@ message CommitIndexLogPB {
770770
optional int64 table_id = 2;
771771
repeated int64 index_ids = 3;
772772
optional bool update_table_version = 4;
773+
repeated int64 partition_ids = 5;
773774
}
774775

775776
message DropIndexLogPB {
@@ -1341,6 +1342,7 @@ message IndexRequest {
13411342
optional int64 db_id = 5;
13421343
optional bool is_new_table = 6; // For table version
13431344
optional string request_ip = 7;
1345+
repeated int64 partition_ids = 8;
13441346
}
13451347

13461348
message IndexResponse {

0 commit comments

Comments
 (0)