Skip to content

Commit 808a7b3

Browse files
committed
[feature](info) Append RunningTasks into BE info (apache#56277)
To help us determine whether any query/loading tasks are running, `RunningTasks ` metrics is appended in this PR and users could retrieve it by `show backends` command. <img width="3800" height="668" alt="image" src="https://github.com/user-attachments/assets/50124384-3427-4621-91b3-b165bfda8d93" />
1 parent 909711d commit 808a7b3

File tree

9 files changed

+40
-10
lines changed

9 files changed

+40
-10
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,7 @@ void report_task_callback(const ClusterInfo* cluster_info) {
10611061
}
10621062
}
10631063
request.__set_backend(BackendOptions::get_local_backend());
1064+
request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
10641065
bool succ = handle_report(request, cluster_info, "task");
10651066
report_task_total << 1;
10661067
if (!succ) [[unlikely]] {

be/src/runtime/fragment_mgr.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,12 @@ void FragmentMgr::remove_pipeline_context(
653653
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
654654
}
655655

656+
void FragmentMgr::remove_query_context(const TUniqueId& key) {
657+
#ifndef BE_TEST
658+
_query_ctx_map.erase(key);
659+
#endif
660+
}
661+
656662
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
657663
auto val = _query_ctx_map.find(query_id);
658664
if (auto q_ctx = val.lock()) {
@@ -891,7 +897,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
891897
}
892898
}
893899
query_ctx->cancel(reason);
894-
_query_ctx_map.erase(query_id);
900+
remove_query_context(query_id);
895901
LOG(INFO) << "Query " << print_id(query_id)
896902
<< " is cancelled and removed. Reason: " << reason.to_string();
897903
}

be/src/runtime/fragment_mgr.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class FragmentMgr : public RestMonitorIface {
134134
Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
135135
const FinishCallback& cb);
136136

137+
void remove_query_context(const TUniqueId& key);
137138
Status start_query_execution(const PExecPlanFragmentStartRequest* request);
138139

139140
Status trigger_pipeline_context_report(const ReportStatusRequest,

be/src/runtime/query_context.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ QueryContext::~QueryContext() {
196196

197197
_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
198198
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
199+
// TODO(gabriel): we need to clear outdated query contexts on time
200+
// ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
199201
// the only one msg shows query's end. any other msg should append to it if need.
200202
LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
201203
}

fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class BackendsProcDir implements ProcDirInterface {
5151
.add("DataUsedCapacity").add("TrashUsedCapacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct")
5252
.add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status")
5353
.add("HeartbeatFailureCounter").add("NodeRole").add("CpuCores").add("Memory")
54-
.build();
54+
.add("RunningTasks").build();
5555

5656
public static final ImmutableList<String> DISK_TITLE_NAMES = new ImmutableList.Builder<String>()
5757
.add("BackendId").add("Host").add("RootPath").add("DirType").add("DiskState")
@@ -173,6 +173,9 @@ public static List<List<String>> getBackendInfos() {
173173

174174
// memory
175175
backendInfo.add(RuntimeProfile.printCounter(backend.getBeMemory(), TUnit.BYTES));
176+
177+
// runningFragments
178+
backendInfo.add(String.valueOf(backend.getRunningTasks()));
176179
comparableBackendInfos.add(backendInfo);
177180
}
178181

fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ public TMasterResult handleReport(TReportRequest request) throws TException {
224224

225225
ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, tablets, partitionsVersion,
226226
reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(),
227-
request.getPipelineExecutorSize(), numTablets, request.getIndexPolicy());
227+
request.getPipelineExecutorSize(), numTablets, request.getIndexPolicy(),
228+
request.isSetRunningTasks() ? request.getRunningTasks() : -1);
228229
try {
229230
putToQueue(reportTask);
230231
} catch (Exception e) {
@@ -314,14 +315,15 @@ private class ReportTask extends MasterTask {
314315
private List<TStorageResource> storageResources;
315316
private int cpuCores;
316317
private int pipelineExecutorSize;
318+
private long runningTasks;
317319
private long numTablets;
318320
private List<TIndexPolicy> indexPolicys;
319321

320322
public ReportTask(long beId, ReportType reportType, Map<TTaskType, Set<Long>> tasks,
321323
Map<String, TDisk> disks, Map<Long, TTablet> tablets,
322324
Map<Long, Long> partitionsVersion, long reportVersion,
323325
List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores,
324-
int pipelineExecutorSize, long numTablets, List<TIndexPolicy> indexPolicys) {
326+
int pipelineExecutorSize, long numTablets, List<TIndexPolicy> indexPolicys, long runningTasks) {
325327
this.beId = beId;
326328
this.reportType = reportType;
327329
this.tasks = tasks;
@@ -335,12 +337,13 @@ public ReportTask(long beId, ReportType reportType, Map<TTaskType, Set<Long>> ta
335337
this.pipelineExecutorSize = pipelineExecutorSize;
336338
this.numTablets = numTablets;
337339
this.indexPolicys = indexPolicys;
340+
this.runningTasks = runningTasks;
338341
}
339342

340343
@Override
341344
protected void exec() {
342345
if (tasks != null) {
343-
ReportHandler.taskReport(beId, tasks);
346+
ReportHandler.taskReport(beId, tasks, runningTasks);
344347
}
345348
if (disks != null) {
346349
ReportHandler.diskReport(beId, disks);
@@ -704,7 +707,8 @@ private static void debugBlock() {
704707
}
705708
}
706709

707-
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
710+
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks,
711+
long numRunningTasks) {
708712
debugBlock();
709713
if (LOG.isDebugEnabled()) {
710714
LOG.debug("begin to handle task report from backend {}", backendId);
@@ -728,6 +732,7 @@ private static void taskReport(long backendId, Map<TTaskType, Set<Long>> running
728732
? runningTasks.get(TTaskType.PUBLISH_VERSION).size() : 0;
729733
if (be != null) {
730734
be.setPublishTaskLastTimeAccumulated((long) publishTaskSize);
735+
be.setRunningTasks(numRunningTasks);
731736
}
732737

733738
List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks);

fe/fe-core/src/main/java/org/apache/doris/system/Backend.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public class Backend implements Writable {
106106

107107
private Long lastPublishTaskAccumulatedNum = 0L;
108108

109+
private Long runningTasks = 0L;
110+
109111
private String heartbeatErrMsg = "";
110112

111113
// This is used for the first time we init pathHashToDishInfo in SystemInfoService.
@@ -959,6 +961,14 @@ private int getDiskNum() {
959961
return disksRef.size();
960962
}
961963

964+
public Long getRunningTasks() {
965+
return runningTasks;
966+
}
967+
968+
public void setRunningTasks(Long runningTasks) {
969+
this.runningTasks = runningTasks;
970+
}
971+
962972
/**
963973
* Note: This class must be a POJO in order to display in JSON format
964974
* Add additional information in the class to show in `show backends`

fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,13 @@ public void testCreateDbAndTable() throws Exception {
200200
ProcResult result = dir.fetchResult();
201201
Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size());
202202
Assert.assertEquals("{\"location\" : \"default\"}",
203-
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 8));
203+
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 9));
204204
Assert.assertEquals(
205205
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,"
206206
+ "\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}",
207-
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 5));
208-
Assert.assertEquals("0", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 4));
209-
Assert.assertEquals(Tag.VALUE_MIX, result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3));
207+
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6));
208+
Assert.assertEquals("0", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 5));
209+
Assert.assertEquals(Tag.VALUE_MIX, result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 4));
210210
}
211211

212212
private static void updateReplicaPathHash() {

gensrc/thrift/MasterService.thrift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ struct TReportRequest {
121121
// tablet num in be, in cloud num_tablets may not eq tablet_list.size()
122122
14: optional i64 num_tablets
123123
15: optional list<AgentService.TIndexPolicy> index_policy
124+
// Running query/loading tasks
125+
16: optional i64 running_tasks
124126
}
125127

126128
struct TMasterResult {

0 commit comments

Comments
 (0)