Skip to content

Commit d9747df

Browse files
sollhuizzzxl1993
authored andcommitted
[feat](job) support show routine load job compute group (apache#59540)
support show routine load job compute group(both `show routine load` command and routine load system table): ``` ComputeGroup: cluster_name0 ```
1 parent 124017a commit d9747df

File tree

8 files changed

+39
-0
lines changed

8 files changed

+39
-0
lines changed

be/src/exec/schema_scanner/schema_routine_load_job_scanner.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRoutineLoadJobScanner::_s_tbls_colu
5656
{"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
5757
{"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
5858
{"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
59+
{"COMPUTE_GROUP", TYPE_STRING, sizeof(StringRef), true},
5960
};
6061

6162
SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
@@ -173,6 +174,9 @@ Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
173174
case 17: // USER_NAME
174175
column_value = job_info.__isset.user_name ? job_info.user_name : "";
175176
break;
177+
case 20: // COMPUTE_GROUP
178+
column_value = job_info.__isset.compute_group ? job_info.compute_group : "";
179+
break;
176180
}
177181

178182
str_refs[row_idx] =

fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ public class SchemaTable extends Table {
684684
.column("USER_NAME", ScalarType.createStringType())
685685
.column("CURRENT_ABORT_TASK_NUM", ScalarType.createType(PrimitiveType.INT))
686686
.column("IS_ABNORMAL_PAUSE", ScalarType.createType(PrimitiveType.BOOLEAN))
687+
.column("COMPUTE_GROUP", ScalarType.createStringType())
687688
.build())
688689
)
689690
.put("load_jobs",

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,6 +1638,10 @@ public void setCloudCluster(String cloudCluster) {
16381638
this.cloudCluster = cloudCluster;
16391639
}
16401640

1641+
public String getClusterInfo() {
1642+
return Strings.nullToEmpty(cloudCluster);
1643+
}
1644+
16411645
// check the correctness of commit info
16421646
protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
16431647
TransactionState txnState,
@@ -1691,6 +1695,7 @@ public List<String> getShowInfo() {
16911695
row.add(otherMsg);
16921696
row.add(userIdentity.getQualifiedUser());
16931697
row.add(comment);
1698+
row.add(getClusterInfo());
16941699
return row;
16951700
} finally {
16961701
readUnlock();

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowRoutineLoadCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public class ShowRoutineLoadCommand extends ShowCommand {
109109
.add("OtherMsg")
110110
.add("User")
111111
.add("Comment")
112+
.add("ComputeGroup")
112113
.build();
113114

114115
private final LabelNameInfo labelNameInfo;

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4364,6 +4364,7 @@ public TFetchRoutineLoadJobResult fetchRoutineLoadJob(TFetchRoutineLoadJobReques
43644364
jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
43654365
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
43664366
jobInfo.setIsAbnormalPause(job.isAbnormalPause());
4367+
jobInfo.setComputeGroup(job.getClusterInfo());
43674368
jobInfos.add(jobInfo);
43684369
}
43694370
if (LOG.isDebugEnabled()) {

gensrc/thrift/FrontendService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,7 @@ struct TRoutineLoadJob {
15501550
18: optional string user_name
15511551
19: optional i32 current_abort_task_num
15521552
20: optional bool is_abnormal_pause
1553+
21: optional string compute_group
15531554
}
15541555

15551556
struct TFetchRoutineLoadJobResult {

regression-test/suites/load_p0/routine_load/test_routine_load_job_info_system_table.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ suite("test_routine_load_job_info_system_table","p0") {
134134
def res = sql "SELECT JOB_NAME FROM information_schema.routine_load_jobs WHERE CURRENT_ABORT_TASK_NUM > 0 OR IS_ABNORMAL_PAUSE = TRUE"
135135
log.info("res: ${res}".toString())
136136
assertTrue(res.toString().contains("${jobName}"))
137+
138+
def computeGroupRes = sql "SELECT JOB_NAME, COMPUTE_GROUP FROM information_schema.routine_load_jobs WHERE JOB_NAME = '${jobName}'"
139+
log.info("compute group res: ${computeGroupRes}".toString())
140+
assertTrue(computeGroupRes.size() > 0)
141+
assertNotNull(computeGroupRes[0][1])
137142
} finally {
138143
sql "stop routine load for ${jobName}"
139144
sql "DROP TABLE IF EXISTS ${tableName}"

regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,5 +191,26 @@ suite("test_show_routine_load","p0") {
191191
} finally {
192192
sql "stop routine load for testShow"
193193
}
194+
195+
// test show routine load computegroup
196+
try {
197+
sql """
198+
CREATE ROUTINE LOAD testShowComputeGroup ON ${tableName}
199+
COLUMNS TERMINATED BY ","
200+
FROM KAFKA
201+
(
202+
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
203+
"kafka_topic" = "${kafkaCsvTpoics[0]}",
204+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
205+
);
206+
"""
207+
def res = sql "show routine load for testShowComputeGroup"
208+
// ComputeGroup is the last column (index 22)
209+
def computeGroupStr = res[0][22]
210+
log.info("routine load computegroup: ${computeGroupStr.toString()}".toString())
211+
assertNotNull(computeGroupStr)
212+
} finally {
213+
sql "stop routine load for testShowComputeGroup"
214+
}
194215
}
195216
}

0 commit comments

Comments
 (0)