Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2500,10 +2500,33 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
skip = Boolean.parseBoolean(skipWriteIndexOnLoad) ? 1 : 0;
}

for (Partition partition : partitions) {
updatePartitionProperties(db, olapTable.getName(), partition.getName(), storagePolicyId, isInMemory,
null, compactionPolicy, timeSeriesCompactionConfig, enableSingleCompaction, skip,
disableAutoCompaction);
// Only iterate partitions when there are properties that actually need to be
// dispatched to each partition's tablets. Pure catalog-level metadata properties
// such as partition.retention_count do not require per-partition updates, and
// iterating over a stale partition snapshot can race with concurrent partition
// drops (e.g., by DynamicPartitionScheduler when retention_count or dynamic_partition
// is enabled) and fail with "Partition does not exist".
boolean needPerPartitionUpdate = isInMemory >= 0 || storagePolicyId >= 0
|| compactionPolicy != null || !timeSeriesCompactionConfig.isEmpty()
|| enableSingleCompaction >= 0 || skip >= 0 || disableAutoCompaction >= 0;
if (needPerPartitionUpdate) {
for (Partition partition : partitions) {
try {
updatePartitionProperties(db, olapTable.getName(), partition.getName(),
storagePolicyId, isInMemory, null, compactionPolicy, timeSeriesCompactionConfig,
enableSingleCompaction, skip, disableAutoCompaction);
} catch (DdlException e) {
// The partition may have been dropped concurrently (e.g., by
// DynamicPartitionScheduler). It is safe to skip the meta dispatch
// for a partition that no longer exists.
if (olapTable.getPartition(partition.getName()) == null) {
LOG.info("partition {} of table {} was dropped concurrently, "
+ "skip updating its properties", partition.getName(), olapTable.getName());
continue;
}
throw e;
}
}
}

olapTable.writeLockOrDdlException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,25 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam
LoadJob loadJob;
if (idToLoadJob.containsKey(jobId)) {
loadJob = idToLoadJob.get(jobId);
if (LOG.isDebugEnabled()) {
LOG.debug("recordFinishedLoadJob: reuse existing load job, jobId={}, label={}, dbId={}, jobType={}",
jobId, label, db.getId(), jobType);
}
if (loadJob instanceof InsertLoadJob) {
((InsertLoadJob) loadJob).setJobProperties(transactionId, tableId, createTimestamp,
failMsg, trackingUrl, firstErrorMsg, userInfo);
}
} else {
// The jobId received here does not exist in idToLoadJob. This means the InsertLoadJob
// that was registered during executor construction (and that accumulated BE-reported
// load statistics via updateJobProgress) is NOT the one we are about to snapshot here.
// A brand-new InsertLoadJob will be created below with an empty LoadStatistic, so
// SHOW LOAD's JobDetails (ScannedRows / LoadBytes / All backends) will be all zero.
// Logging this at WARN so CI failures of the form
// "test_insert_statistic: expected:<N> but was:<0>" can be diagnosed directly.
LOG.warn("recordFinishedLoadJob: jobId={} not found in idToLoadJob, creating a new "
+ "{} load job for label={}, dbId={}. JobDetails statistics will be empty.",
jobId, jobType, label, db.getId());
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
Expand Down Expand Up @@ -813,6 +827,12 @@ public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmen
public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
long scannedBytes, boolean isDone) {
LoadJob job = idToLoadJob.get(jobId);
if (LOG.isDebugEnabled()) {
LOG.debug("updateJobProgress: jobId={}, beId={}, scannedRows={}, scannedBytes={}, isDone={}, "
+ "found={}, jobIdMatched={}",
jobId, beId, scannedRows, scannedBytes, isDone,
idToLoadJob.containsKey(jobId), job == null ? -1L : job.getId());
}
if (job != null) {
job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
@Override
protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask) {
if (params.isSetLoadedRows() && jobId != -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("doProcessReportExecStatus: forwarding load progress to LoadManager, "
+ "jobId={}, beId={}, queryId={}, loadedRows={}, loadedBytes={}, isDone={}",
jobId, params.getBackendId(), DebugUtil.printId(params.getQueryId()),
params.getLoadedRows(), params.getLoadedBytes(), params.isDone());
}
if (params.isSetFragmentInstanceReports()) {
for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !show --

-- !show_1be --
show_create_table_and_views_nereids_table CREATE TABLE `show_create_table_and_views_nereids_table` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show_multi_be --
show_create_table_and_views_nereids_table CREATE TABLE `show_create_table_and_views_nereids_table` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 3",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !select --
1 1 30
1 2 5
Expand Down Expand Up @@ -35,12 +39,20 @@ show_create_table_and_views_nereids_view CREATE VIEW `show_create_table_and_view
200 1
300 1

-- !show --
-- !show_1be --
show_create_table_and_views_nereids_table CREATE TABLE `show_create_table_and_views_nereids_table` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show --
-- !show_multi_be --
show_create_table_and_views_nereids_table CREATE TABLE `show_create_table_and_views_nereids_table` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 3",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show_1be --
show_create_table_and_views_nereids_like CREATE TABLE `show_create_table_and_views_nereids_like` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show --
-- !show_multi_be --
show_create_table_and_views_nereids_like CREATE TABLE `show_create_table_and_views_nereids_like` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 3",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show_1be --
show_create_table_and_views_nereids_like_with_rollup CREATE TABLE `show_create_table_and_views_nereids_like_with_rollup` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 1",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);

-- !show_multi_be --
show_create_table_and_views_nereids_like_with_rollup CREATE TABLE `show_create_table_and_views_nereids_like_with_rollup` (\n `user_id` largeint NOT NULL,\n `good_id` largeint NOT NULL,\n `cost` bigint SUM NULL DEFAULT "0",\n INDEX index_user_id (`user_id`) USING INVERTED COMMENT "test index comment",\n INDEX index_good_id (`good_id`) USING INVERTED COMMENT "test index\\" comment"\n) ENGINE=OLAP\nAGGREGATE KEY(`user_id`, `good_id`)\nPARTITION BY RANGE(`good_id`)\n(PARTITION p1 VALUES [("-170141183460469231731687303715884105728"), ("100")),\nPARTITION p2 VALUES [("100"), ("200")),\nPARTITION p3 VALUES [("200"), ("300")),\nPARTITION p4 VALUES [("300"), ("400")),\nPARTITION p5 VALUES [("400"), ("500")),\nPARTITION p6 VALUES [("500"), ("600")),\nPARTITION p7 VALUES [("600"), (MAXVALUE)))\nDISTRIBUTED BY HASH(`user_id`) BUCKETS 2\nPROPERTIES (\n"replication_allocation" = "tag.location.default: 3",\n"min_load_replica_num" = "-1",\n"is_being_synced" = "false",\n"storage_medium" = "hdd",\n"storage_format" = "V2",\n"inverted_index_storage_format" = "V3",\n"light_schema_change" = "true",\n"disable_auto_compaction" = "false",\n"binlog.enable" = "false",\n"binlog.ttl_seconds" = "86400",\n"binlog.max_bytes" = "9223372036854775807",\n"binlog.max_history_nums" = "9223372036854775807",\n"enable_single_replica_compaction" = "false",\n"group_commit_interval_ms" = "10000",\n"group_commit_data_bytes" = "134217728"\n);
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
4 \N 987 77777 1234 \N 100

-- !inspect --
1 \N 987 77777 1234 \N 100 5 0
1 100 1 1 1 1 100 2 0
1 100 1 1 1 1 100 3 1
1 100 987 77777 1 1 100 5 0
1 \N 987 77777 1234 \N 100 5 0
2 \N 987 77777 1234 \N 200 5 0
2 100 2 2 2 2 100 2 0
2 200 2 2 2 2 200 3 1
2 100 987 77777 2 2 100 5 0
2 \N 987 77777 1234 \N 200 5 0
3 100 3 3 3 3 100 2 0
2 200 2 2 2 2 200 3 1
3 50 \N 9876 1234 \N 50 3 1
3 100 3 3 3 3 100 2 0
3 100 987 77777 3 3 100 5 0
4 \N 987 77777 1234 \N 100 5 0
4 100 4 4 4 4 100 2 0
4 100 4 4 4 4 100 4 1
4 100 987 77777 4 4 100 5 0
4 \N 987 77777 1234 \N 100 5 0

1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ max_query_profile_num = 2000
max_spilled_profile_num = 2000

check_table_lock_leaky=true
max_remote_file_system_cache_num=1000
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,26 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {
assertTrue(result.ColocateMismatchNum as int == 0)
}

// Wait until the colocate group of `db_name`.`group_name` becomes stable.
// After RESTORE creates a brand-new colocate group in a new db, the group
// is unstable until ColocateTableCheckerAndBalancer scans it (default every
// tablet_checker_interval_ms = 20s). Nereids skips colocate join while the
// group is unstable, so EXPLAIN right after RESTORE FINISHED can miss COLOCATE.
def waitColocateGroupStable = { db_name, group_name ->
def fullName = "${db_name}.${group_name}".toString()
def deadline = System.currentTimeMillis() + 60_000
while (System.currentTimeMillis() < deadline) {
def groups = sql_return_maparray("SHOW PROC '/colocation_group'")
def g = groups.find { it.GroupName == fullName }
if (g != null && g.IsStable == "true") {
log.info("colocate group ${fullName} is stable")
return
}
sleep(1000)
}
log.warn("colocate group ${fullName} did not become stable within 60s")
}

def syncer = getSyncer()
syncer.createS3Repository(repoName)

Expand Down Expand Up @@ -624,6 +644,11 @@ suite("test_backup_restore_colocate_with_partition", "backup_restore") {

query = "select * from ${newDbName}.${tableName1} as t1, ${newDbName}.${tableName2} as t2 where t1.id=t2.id;"

// RESTORE to a brand-new db creates a new colocate group that is initially
// unstable; wait for ColocateTableCheckerAndBalancer to mark it stable, otherwise
// EXPLAIN below may fall back to BROADCAST/SHUFFLE.
waitColocateGroupStable(newDbName, groupName)

explain {
sql("${query}")
contains("COLOCATE")
Expand Down
Loading
Loading