diff --git a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp index 11f5fb376bc1b0..c965338d46a5b5 100644 --- a/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp @@ -56,6 +56,7 @@ std::vector SchemaRoutineLoadJobScanner::_s_tbls_colu {"USER_NAME", TYPE_STRING, sizeof(StringRef), true}, {"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true}, {"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true}, + {"COMPUTE_GROUP", TYPE_STRING, sizeof(StringRef), true}, }; SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner() @@ -173,6 +174,9 @@ Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) { case 17: // USER_NAME column_value = job_info.__isset.user_name ? job_info.user_name : ""; break; + case 20: // COMPUTE_GROUP + column_value = job_info.__isset.compute_group ? job_info.compute_group : ""; + break; } str_refs[row_idx] = diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 92356740c50ea0..9fd564ab40b408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -684,6 +684,7 @@ public class SchemaTable extends Table { .column("USER_NAME", ScalarType.createStringType()) .column("CURRENT_ABORT_TASK_NUM", ScalarType.createType(PrimitiveType.INT)) .column("IS_ABNORMAL_PAUSE", ScalarType.createType(PrimitiveType.BOOLEAN)) + .column("COMPUTE_GROUP", ScalarType.createStringType()) .build()) ) .put("load_jobs", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 41b476ddc6f776..825cc2176dd079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1638,6 +1638,10 @@ public void setCloudCluster(String cloudCluster) { this.cloudCluster = cloudCluster; } + public String getClusterInfo() { + return Strings.nullToEmpty(cloudCluster); + } + // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, TransactionState txnState, @@ -1691,6 +1695,7 @@ public List getShowInfo() { row.add(otherMsg); row.add(userIdentity.getQualifiedUser()); row.add(comment); + row.add(getClusterInfo()); return row; } finally { readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java index 7ffaa02b195ae1..fe5c1441ebc6d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java @@ -109,6 +109,7 @@ public class ShowRoutineLoadCommand extends ShowCommand { .add("OtherMsg") .add("User") .add("Comment") + .add("ComputeGroup") .build(); private final LabelNameInfo labelNameInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 3a1f56c7672984..dfb65acf4ab78e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -4364,6 +4364,7 @@ public TFetchRoutineLoadJobResult fetchRoutineLoadJob(TFetchRoutineLoadJobReques jobInfo.setUserName(job.getUserIdentity().getQualifiedUser()); jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum); jobInfo.setIsAbnormalPause(job.isAbnormalPause()); + jobInfo.setComputeGroup(job.getClusterInfo()); jobInfos.add(jobInfo); } if (LOG.isDebugEnabled()) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 863a38610f90a8..b00fc5ccbb4569 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1550,6 +1550,7 @@ struct TRoutineLoadJob { 18: optional string user_name 19: optional i32 current_abort_task_num 20: optional bool is_abnormal_pause + 21: optional string compute_group } struct TFetchRoutineLoadJobResult { diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy index be95cc3240e9e4..edef339ce0d12b 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy @@ -134,6 +134,11 @@ suite("test_routine_load_job_info_system_table","p0") { def res = sql "SELECT JOB_NAME FROM information_schema.routine_load_jobs WHERE CURRENT_ABORT_TASK_NUM > 0 OR IS_ABNORMAL_PAUSE = TRUE" log.info("res: ${res}".toString()) assertTrue(res.toString().contains("${jobName}")) + + def computeGroupRes = sql "SELECT JOB_NAME, COMPUTE_GROUP FROM information_schema.routine_load_jobs WHERE JOB_NAME = '${jobName}'" + log.info("compute group res: ${computeGroupRes}".toString()) + assertTrue(computeGroupRes.size() > 0) + assertNotNull(computeGroupRes[0][1]) } finally { sql "stop routine load for ${jobName}" sql "DROP TABLE IF EXISTS ${tableName}" diff --git a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy index ac3d14ffe370df..9b01e226df7627 100644 --- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy @@ -191,5 +191,26 @@ suite("test_show_routine_load","p0") { } finally { sql "stop routine load for testShow" } + + // test show routine load computegroup + try { + sql """ + CREATE ROUTINE LOAD testShowComputeGroup ON ${tableName} + COLUMNS TERMINATED BY "," + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + def res = sql "show routine load for testShowComputeGroup" + // ComputeGroup is the last column (index 22) + def computeGroupStr = res[0][22] + log.info("routine load computegroup: ${computeGroupStr.toString()}".toString()) + assertNotNull(computeGroupStr) + } finally { + sql "stop routine load for testShowComputeGroup" + } } }