Skip to content

Commit 2a56038

Browse files
authored
Fix serde of UpdateLocationAction used by K8s-based tasks (#18853)
* Fix serde of UpdateLocationAction used by K8s-based tasks * Add null check and throw exception
1 parent ab969cb commit 2a56038

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,34 @@
2323
import com.fasterxml.jackson.annotation.JsonProperty;
2424
import com.fasterxml.jackson.core.type.TypeReference;
2525
import com.google.common.base.Optional;
26+
import org.apache.druid.error.InvalidInput;
2627
import org.apache.druid.indexer.TaskLocation;
2728
import org.apache.druid.indexing.common.task.Task;
2829
import org.apache.druid.indexing.overlord.TaskRunner;
2930

31+
import java.util.Objects;
32+
33+
/**
34+
* Notifies the Overlord that the location of a task is now updated.
35+
* Used when running K8s-based tasks in encapsulated mode.
36+
*/
3037
public class UpdateLocationAction implements TaskAction<Void>
3138
{
32-
private final TaskLocation taskLocation;
39+
private final TaskLocation location;
3340

3441
@JsonCreator
3542
public UpdateLocationAction(
3643
@JsonProperty("location") TaskLocation location
3744
)
3845
{
39-
this.taskLocation = location;
46+
InvalidInput.conditionalException(location != null, "No task location specified");
47+
this.location = location;
4048
}
4149

4250
@JsonProperty
43-
public TaskLocation getTaskLocation()
51+
public TaskLocation getLocation()
4452
{
45-
return taskLocation;
53+
return location;
4654
}
4755

4856
@Override
@@ -56,7 +64,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
5664
{
5765
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
5866
if (taskRunner.isPresent()) {
59-
taskRunner.get().updateLocation(task, taskLocation);
67+
taskRunner.get().updateLocation(task, location);
6068
}
6169
return null;
6270
}
@@ -65,7 +73,26 @@ public Void perform(Task task, TaskActionToolbox toolbox)
6573
public String toString()
6674
{
6775
return "UpdateLocationAction{" +
68-
"taskLocation=" + taskLocation +
76+
"taskLocation=" + location +
6977
'}';
7078
}
79+
80+
@Override
81+
public boolean equals(Object object)
82+
{
83+
if (this == object) {
84+
return true;
85+
}
86+
if (object == null || getClass() != object.getClass()) {
87+
return false;
88+
}
89+
UpdateLocationAction that = (UpdateLocationAction) object;
90+
return Objects.equals(location, that.location);
91+
}
92+
93+
@Override
94+
public int hashCode()
95+
{
96+
return Objects.hashCode(location);
97+
}
7198
}

indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
package org.apache.druid.indexing.common.actions;
2121

22+
import com.fasterxml.jackson.core.JsonProcessingException;
2223
import com.google.common.base.Optional;
24+
import org.apache.druid.error.DruidException;
2325
import org.apache.druid.indexer.TaskLocation;
2426
import org.apache.druid.indexing.common.task.NoopTask;
2527
import org.apache.druid.indexing.common.task.Task;
2628
import org.apache.druid.indexing.overlord.TaskRunner;
29+
import org.apache.druid.segment.TestHelper;
30+
import org.junit.Assert;
2731
import org.junit.Test;
2832

2933
import java.net.InetAddress;
@@ -68,4 +72,24 @@ public void testWithNoTaskRunner() throws UnknownHostException
6872
action.perform(task, toolbox);
6973
verify(runner, never()).updateStatus(any(), any());
7074
}
75+
76+
@Test
77+
public void testSerde() throws JsonProcessingException
78+
{
79+
final UpdateLocationAction original = new UpdateLocationAction(
80+
TaskLocation.create("host", 1000, -1, false, "pod")
81+
);
82+
final String json = TestHelper.JSON_MAPPER.writeValueAsString(original);
83+
final TaskAction<?> deserialized = TestHelper.JSON_MAPPER.readValue(json, TaskAction.class);
84+
Assert.assertEquals(original, deserialized);
85+
}
86+
87+
@Test
88+
public void test_actionWithNullLocation_throwsException()
89+
{
90+
Assert.assertThrows(
91+
DruidException.class,
92+
() -> new UpdateLocationAction(null)
93+
);
94+
}
7195
}

0 commit comments

Comments
 (0)