Skip to content

Commit 284d048

Browse files
authored
Fix TaskQueryTool.getAllActiveTasks() (#18854)
1 parent 2a56038 commit 284d048

File tree

6 files changed

+33
-23
lines changed

6 files changed

+33
-23
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,12 @@
2929
import org.apache.druid.indexer.TaskStatusPlus;
3030
import org.apache.druid.indexing.common.TaskLock;
3131
import org.apache.druid.indexing.common.task.Task;
32-
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
3332
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
3433
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
3534
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
3635
import org.apache.druid.java.util.common.IAE;
3736
import org.apache.druid.java.util.common.Intervals;
3837
import org.apache.druid.java.util.common.StringUtils;
39-
import org.apache.druid.java.util.common.logger.Logger;
4038
import org.apache.druid.metadata.LockFilterPolicy;
4139
import org.apache.druid.metadata.TaskLookup;
4240
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -60,28 +58,23 @@
6058
*/
6159
public class TaskQueryTool
6260
{
63-
private static final Logger log = new Logger(TaskQueryTool.class);
64-
6561
private final TaskStorage storage;
6662
private final GlobalTaskLockbox taskLockbox;
6763
private final TaskMaster taskMaster;
6864
private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier;
69-
private final ProvisioningStrategy provisioningStrategy;
7065

7166
@Inject
7267
public TaskQueryTool(
7368
TaskStorage storage,
7469
GlobalTaskLockbox taskLockbox,
7570
TaskMaster taskMaster,
76-
ProvisioningStrategy provisioningStrategy,
7771
Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier
7872
)
7973
{
8074
this.storage = storage;
8175
this.taskLockbox = taskLockbox;
8276
this.taskMaster = taskMaster;
8377
this.workerBehaviorConfigSupplier = workerBehaviorConfigSupplier;
84-
this.provisioningStrategy = provisioningStrategy;
8578
}
8679

8780
/**
@@ -155,19 +148,22 @@ public TaskInfo getTaskInfo(String taskId)
155148
return storage.getTaskInfo(taskId);
156149
}
157150

151+
/**
152+
* Retrieves all active tasks from the {@link TaskQueue} if available,
153+
* otherwise from the metadata store.
154+
*/
158155
public List<TaskStatusPlus> getAllActiveTasks()
159156
{
160157
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
161158
if (taskQueue.isPresent()) {
162-
// Serve active task statuses from memory
159+
// Serve active task statuses from the TaskQueue memory
163160
final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
164161
final List<TaskInfo> activeTasks = taskQueue.get().getTaskInfos();
165162

166163
for (TaskInfo taskInfo : activeTasks) {
167164
final Task task = taskInfo.getTask();
168-
final Optional<TaskStatus> statusOptional = taskQueue.get().getTaskStatus(task.getId());
169-
if (statusOptional.isPresent()) {
170-
final TaskStatus status = statusOptional.get();
165+
final TaskStatus status = taskInfo.getStatus();
166+
if (status.isRunnable()) {
171167
taskStatusPlusList.add(
172168
new TaskStatusPlus(
173169
task.getId(),
@@ -220,7 +216,7 @@ public List<TaskStatusPlus> getTaskStatusPlusList(
220216
// This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process
221217
// and use the snapshot from taskRunner as a reference for potential task state updates happened
222218
// after the first snapshotting.
223-
Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusStream(
219+
Stream<TaskStatusPlus> taskStatusPlusStream = retrieveTaskStatusesFromMetadataStore(
224220
state,
225221
dataSource,
226222
createdTimeDuration,
@@ -286,7 +282,7 @@ public List<TaskStatusPlus> getTaskStatusPlusList(
286282
return taskStatuses;
287283
}
288284

289-
private Stream<TaskStatusPlus> getTaskStatusPlusStream(
285+
private Stream<TaskStatusPlus> retrieveTaskStatusesFromMetadataStore(
290286
TaskStateLookup state,
291287
@Nullable String dataSource,
292288
Duration createdTimeDuration,

indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,6 @@ public List<Task> getActiveTasks()
10441044
}
10451045

10461046
/**
1047-
* Returns the list of currently active tasks for the given datasource.
10481047
* List of all active and completed task infos currently being managed by this TaskQueue.
10491048
*/
10501049
public List<TaskInfo> getTaskInfos()

indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private void initScheduler()
215215
scheduler = new OverlordCompactionScheduler(
216216
taskMaster,
217217
taskLockbox,
218-
new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, () -> defaultWorkerConfig),
218+
new TaskQueryTool(taskStorage, taskLockbox, taskMaster, () -> defaultWorkerConfig),
219219
segmentsMetadataManager,
220220
new SegmentsMetadataManagerConfig(null, null, null),
221221
() -> DruidCompactionConfig.empty().withClusterConfig(compactionConfig.get()),

indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ private TaskStorage setUpTaskStorage()
437437
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
438438
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
439439
EasyMock.replay(taskMaster);
440-
tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
440+
tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
441441
return taskStorage;
442442
}
443443

indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
5353
import org.apache.druid.indexing.overlord.TaskStorage;
5454
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
55-
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
5655
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
5756
import org.apache.druid.java.util.common.DateTimes;
5857
import org.apache.druid.java.util.common.Intervals;
@@ -109,7 +108,6 @@ public class OverlordResourceTest
109108
private TaskStorage taskStorage;
110109
private GlobalTaskLockbox taskLockbox;
111110
private JacksonConfigManager configManager;
112-
private ProvisioningStrategy provisioningStrategy;
113111
private AuthConfig authConfig;
114112
private TaskQueryTool taskQueryTool;
115113
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@@ -128,7 +126,6 @@ public void setUp()
128126
taskRunner = EasyMock.createMock(TaskRunner.class);
129127
taskQueue = EasyMock.createStrictMock(TaskQueue.class);
130128
configManager = EasyMock.createMock(JacksonConfigManager.class);
131-
provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
132129
authConfig = EasyMock.createMock(AuthConfig.class);
133130
overlord = EasyMock.createStrictMock(DruidOverlord.class);
134131
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
@@ -138,7 +135,6 @@ public void setUp()
138135
taskStorage,
139136
taskLockbox,
140137
taskMaster,
141-
provisioningStrategy,
142138
() -> configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class).get()
143139
);
144140
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -226,8 +222,7 @@ private void replayAll()
226222
workerTaskRunnerQueryAdapter,
227223
authConfig,
228224
configManager,
229-
auditManager,
230-
provisioningStrategy
225+
auditManager
231226
);
232227
}
233228

@@ -353,6 +348,26 @@ public void testSecuredGetRunningTasks()
353348
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
354349
}
355350

351+
@Test
352+
public void test_getAllActiveTasks_withTaskQueryTool_returnsRunningTasksOnly()
353+
{
354+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
355+
EasyMock.expect(taskQueue.getTaskInfos()).andReturn(
356+
List.of(
357+
new TaskInfo(DateTimes.nowUtc(), TaskStatus.success("s"), new NoopTask("s", null, null, 1L, 0L, null)),
358+
new TaskInfo(DateTimes.nowUtc(), TaskStatus.failure("f", ""), new NoopTask("f", null, null, 1L, 0L, null)),
359+
new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r1"), new NoopTask("r1", null, null, 1L, 0L, null)),
360+
new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r2"), new NoopTask("r2", null, null, 1L, 0L, null))
361+
)
362+
);
363+
364+
replayAll();
365+
366+
final List<TaskStatusPlus> activeTasks = taskQueryTool.getAllActiveTasks();
367+
Assert.assertEquals(2, activeTasks.size());
368+
Assert.assertTrue(activeTasks.stream().allMatch(status -> status.getStatusCode().equals(TaskState.RUNNING)));
369+
}
370+
356371
@Test
357372
public void testGetTasks()
358373
{

indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public void testOverlordRun() throws Exception
283283
Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation());
284284

285285
final TaskQueryTool taskQueryTool
286-
= new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
286+
= new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
287287
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
288288
= new WorkerTaskRunnerQueryAdapter(taskMaster, null);
289289
// Test Overlord resource stuff

0 commit comments

Comments
 (0)