Skip to content

Commit c23c0e3

Browse files
George Shiqi WuFrankChen021
authored andcommitted
Fix bug with cancelling pending tasks when running kubernetes ingestion. (#16036)
* Fix bug * Add new test
1 parent 4b313f2 commit c23c0e3

File tree

4 files changed

+62
-9
lines changed

4 files changed

+62
-9
lines changed

extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,14 @@ public void shutdown(String taskid, String reason)
263263
return;
264264
}
265265

266+
workItem.shutdown();
267+
if (!workItem.getResult().isDone()) {
268+
return;
269+
}
266270
synchronized (tasks) {
267271
tasks.remove(taskid);
268272
}
269-
270-
workItem.shutdown();
273+
271274
}
272275

273276
@Override

extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public KubernetesPeonClient(
6161
this.emitter = emitter;
6262
}
6363

64-
public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit)
64+
public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit) throws IllegalStateException
6565
{
6666
long start = System.currentTimeMillis();
6767
// launch job
@@ -74,12 +74,15 @@ public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUn
7474
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
7575
.waitUntilCondition(pod -> {
7676
if (pod == null) {
77-
return false;
77+
return true;
7878
}
7979
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
8080
}, howLong, timeUnit);
81+
82+
if (result == null) {
83+
throw new IllegalStateException("K8s pod for the task [%s] appeared and disappeared. It can happen if the task was canceled");
84+
}
8185
long duration = System.currentTimeMillis() - start;
82-
log.info("Took task %s %d ms for pod to startup", jobName, duration);
8386
emitK8sPodMetrics(task, "k8s/peon/startup/time", duration);
8487
return result;
8588
});

extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
7474
@Mock private KubernetesPeonClient peonClient;
7575
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
7676
@Mock private ServiceEmitter emitter;
77+
@Mock private ListenableFuture<TaskStatus> statusFuture;
7778

7879
private KubernetesTaskRunnerConfig config;
7980
private KubernetesTaskRunner runner;
@@ -329,16 +330,35 @@ public void test_doTask_whenShutdownRequested_throwsRuntimeException()
329330
@Test
330331
public void test_shutdown_withExistingTask_removesTaskFromMap()
331332
{
332-
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
333+
KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
333334
@Override
334335
protected synchronized void shutdown()
335336
{
336337
}
337338
};
338-
339+
EasyMock.expect(statusFuture.isDone()).andReturn(true).anyTimes();
340+
replayAll();
339341
runner.tasks.put(task.getId(), workItem);
340342
runner.shutdown(task.getId(), "");
341343
Assert.assertTrue(runner.tasks.isEmpty());
344+
verifyAll();
345+
}
346+
347+
@Test
348+
public void test_shutdown_withExistingTask_futureIncomplete_removesTaskFromMap()
349+
{
350+
KubernetesWorkItem workItem = new KubernetesWorkItem(task, statusFuture) {
351+
@Override
352+
protected synchronized void shutdown()
353+
{
354+
}
355+
};
356+
EasyMock.expect(statusFuture.isDone()).andReturn(false).anyTimes();
357+
replayAll();
358+
runner.tasks.put(task.getId(), workItem);
359+
runner.shutdown(task.getId(), "");
360+
Assert.assertEquals(1, runner.tasks.size());
361+
verifyAll();
342362
}
343363

344364
@Test

extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ void test_launchPeonJobAndWaitForStart()
106106
}
107107

108108
@Test
109-
void test_launchPeonJobAndWaitForStart_withDisappearingPod_throwsKubernetesClientTimeoutException()
109+
void test_launchPeonJobAndWaitForStart_withDisappearingPod_throwIllegalStateExceptionn()
110110
{
111111
Job job = new JobBuilder()
112112
.withNewMetadata()
@@ -127,11 +127,38 @@ void test_launchPeonJobAndWaitForStart_withDisappearingPod_throwsKubernetesClien
127127
).once();
128128

129129
Assertions.assertThrows(
130-
KubernetesClientTimeoutException.class,
130+
IllegalStateException.class,
131131
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
132132
);
133133
}
134134

135+
@Test
136+
void test_launchPeonJobAndWaitForStart_withPendingPod_throwIllegalStateExceptionn()
137+
{
138+
Job job = new JobBuilder()
139+
.withNewMetadata()
140+
.withName(KUBERNETES_JOB_NAME)
141+
.endMetadata()
142+
.build();
143+
144+
Pod pod = new PodBuilder()
145+
.withNewMetadata()
146+
.withName(POD_NAME)
147+
.addToLabels("job-name", KUBERNETES_JOB_NAME)
148+
.endMetadata()
149+
.withNewStatus()
150+
.withPodIP(null)
151+
.endStatus()
152+
.build();
153+
154+
client.pods().inNamespace(NAMESPACE).resource(pod).create();
155+
156+
Assertions.assertThrows(
157+
KubernetesClientTimeoutException.class,
158+
() -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS)
159+
);
160+
}
161+
135162
@Test
136163
void test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAndSucceededPeonPhase()
137164
{

0 commit comments

Comments
 (0)