Skip to content

Commit 57f248f

Browse files
committed
Fix serde of UpdateLocationAction used by K8s-based tasks
1 parent ab969cb commit 57f248f

File tree

2 files changed

+45
-6
lines changed

2 files changed

+45
-6
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,28 @@
2727
import org.apache.druid.indexing.common.task.Task;
2828
import org.apache.druid.indexing.overlord.TaskRunner;
2929

30+
import java.util.Objects;
31+
32+
/**
33+
* Notifies the Overlord that the location of a task is now updated.
34+
* Used when running K8s-based tasks in encapsulated mode.
35+
*/
3036
public class UpdateLocationAction implements TaskAction<Void>
3137
{
32-
private final TaskLocation taskLocation;
38+
private final TaskLocation location;
3339

3440
@JsonCreator
3541
public UpdateLocationAction(
3642
@JsonProperty("location") TaskLocation location
3743
)
3844
{
39-
this.taskLocation = location;
45+
this.location = location;
4046
}
4147

4248
@JsonProperty
43-
public TaskLocation getTaskLocation()
49+
public TaskLocation getLocation()
4450
{
45-
return taskLocation;
51+
return location;
4652
}
4753

4854
@Override
@@ -56,7 +62,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
5662
{
5763
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
5864
if (taskRunner.isPresent()) {
59-
taskRunner.get().updateLocation(task, taskLocation);
65+
taskRunner.get().updateLocation(task, location);
6066
}
6167
return null;
6268
}
@@ -65,7 +71,26 @@ public Void perform(Task task, TaskActionToolbox toolbox)
6571
public String toString()
6672
{
6773
return "UpdateLocationAction{" +
68-
"taskLocation=" + taskLocation +
74+
"taskLocation=" + location +
6975
'}';
7076
}
77+
78+
@Override
79+
public boolean equals(Object object)
80+
{
81+
if (this == object) {
82+
return true;
83+
}
84+
if (object == null || getClass() != object.getClass()) {
85+
return false;
86+
}
87+
UpdateLocationAction that = (UpdateLocationAction) object;
88+
return Objects.equals(location, that.location);
89+
}
90+
91+
@Override
92+
public int hashCode()
93+
{
94+
return Objects.hashCode(location);
95+
}
7196
}

indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919

2020
package org.apache.druid.indexing.common.actions;
2121

22+
import com.fasterxml.jackson.core.JsonProcessingException;
2223
import com.google.common.base.Optional;
2324
import org.apache.druid.indexer.TaskLocation;
2425
import org.apache.druid.indexing.common.task.NoopTask;
2526
import org.apache.druid.indexing.common.task.Task;
2627
import org.apache.druid.indexing.overlord.TaskRunner;
28+
import org.apache.druid.segment.TestHelper;
29+
import org.junit.Assert;
2730
import org.junit.Test;
2831

2932
import java.net.InetAddress;
@@ -68,4 +71,15 @@ public void testWithNoTaskRunner() throws UnknownHostException
6871
action.perform(task, toolbox);
6972
verify(runner, never()).updateStatus(any(), any());
7073
}
74+
75+
@Test
76+
public void testSerde() throws JsonProcessingException
77+
{
78+
final UpdateLocationAction original = new UpdateLocationAction(
79+
TaskLocation.create("host", 1000, -1, false, "pod")
80+
);
81+
final String json = TestHelper.JSON_MAPPER.writeValueAsString(original);
82+
final TaskAction<?> deserialized = TestHelper.JSON_MAPPER.readValue(json, TaskAction.class);
83+
Assert.assertEquals(original, deserialized);
84+
}
7185
}

0 commit comments

Comments
 (0)