Skip to content

Commit 9af6a22

Browse files
authored
Add utility method to EmbeddedClusterApis to run tasks (#18349)
Description: Embedded tests currently run into serialization issues when submitting a `Task` or `CompactionSupervisorSpec` payload to the cluster using any `OverlordClient`. This can happen because the `ObjectMapper` instance used by the `OverlordClient` might not have the necessary Jackson modules loaded. The hack so far has been to ensure that the first server in an embedded cluster is always an Overlord. But this can quickly run into issues, e.g. when running a cluster with basic auth enabled where a Coordinator must be started before all other servers. Fix: - Fix up methods `EmbeddedClusterApis`: `leaderOverlord`, `leaderCoordinator` and `anyBroker` to use the appropriate servers. The request is still sent over the wire even if the client bound on a server is used to call itself. - Add utility methods in `EmbeddedClusterApis` to `submitTask`, `runTask`, `postSupervisor`
1 parent eb44f1e commit 9af6a22

File tree

15 files changed

+126
-105
lines changed

15 files changed

+126
-105
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.testing.embedded.compact;
2121

22+
import org.apache.druid.common.utils.IdUtils;
2223
import org.apache.druid.indexing.common.task.TaskBuilder;
2324
import org.apache.druid.java.util.common.Pair;
2425
import org.apache.druid.testing.embedded.EmbeddedBroker;
@@ -68,12 +69,11 @@ protected Closeable unloader(String dataSource)
6869
*/
6970
protected String runTask(TaskBuilder<?, ?, ?> taskBuilder)
7071
{
71-
return cluster.callApi().runTask(
72-
(ds, taskId) -> taskBuilder.dataSource(ds).withId(taskId),
73-
dataSource,
74-
overlord,
75-
coordinator
76-
);
72+
final String taskId = IdUtils.getRandomId();
73+
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), overlord);
74+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
75+
76+
return taskId;
7777
}
7878

7979
protected void verifySegmentIntervals(List<Interval> expectedIntervals)

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,7 @@ public void test_concurrentAppend_toIntervalWithUnusedSegment_usesNewSegmentId()
6868
.appendToExisting(true)
6969
.context("useConcurrentLocks", true)
7070
.dimensions();
71-
cluster.callApi().onLeaderOverlord(
72-
o -> o.runTask(task1, taskBuilder.withId(task1))
73-
);
74-
cluster.callApi().waitForTaskToSucceed(task1, overlord);
71+
cluster.callApi().runTask(taskBuilder.withId(task1), overlord);
7572

7673
List<DataSegment> usedSegments = getAllUsedSegments();
7774
Assertions.assertEquals(1, usedSegments.size());
@@ -87,10 +84,7 @@ public void test_concurrentAppend_toIntervalWithUnusedSegment_usesNewSegmentId()
8784

8885
// Run the APPEND task again with a different taskId
8986
final String task2 = EmbeddedClusterApis.newTaskId(dataSource);
90-
cluster.callApi().onLeaderOverlord(
91-
o -> o.runTask(task2, taskBuilder.withId(task2))
92-
);
93-
cluster.callApi().waitForTaskToSucceed(task2, overlord);
87+
cluster.callApi().runTask(taskBuilder.withId(task2), overlord);
9488

9589
// Verify that the new segment gets appended with the same version but a different ID
9690
usedSegments = getAllUsedSegments();

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
214214
private String runTask(TaskBuilder.IndexParallel taskBuilder, String dataSource)
215215
{
216216
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
217-
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, taskBuilder.withId(taskId)));
218-
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
219-
217+
cluster.callApi().runTask(taskBuilder.withId(taskId), overlord);
220218
return taskId;
221219
}
222220

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ public EmbeddedDruidCluster createCluster()
6363
{
6464
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
6565
.useLatchableEmitter()
66-
.addServer(overlord)
6766
.addServer(coordinator)
67+
.addServer(overlord)
6868
.addServer(indexer)
6969
.addServer(historical)
7070
.addServer(broker)
@@ -76,13 +76,12 @@ public EmbeddedDruidCluster createCluster()
7676
public void test_runIndexTask_forInlineDatasource()
7777
{
7878
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
79-
final Object task = createIndexTaskForInlineData(
79+
final IndexTask task = createIndexTaskForInlineData(
8080
taskId,
8181
Resources.InlineData.CSV_10_DAYS
8282
);
8383

84-
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
85-
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
84+
cluster.callApi().runTask(task, overlord);
8685

8786
// Verify that the task created 10 DAY-granularity segments
8887
final List<DataSegment> segments = new ArrayList<>(
@@ -142,7 +141,7 @@ public void test_run100Tasks_concurrently()
142141
runTasksConcurrently(100);
143142
}
144143

145-
private Object createIndexTaskForInlineData(String taskId, String inlineDataCsv)
144+
private IndexTask createIndexTaskForInlineData(String taskId, String inlineDataCsv)
146145
{
147146
return TaskBuilder.ofTypeIndex()
148147
.dataSource(dataSource)
@@ -169,16 +168,14 @@ private void runTasksConcurrently(int count)
169168
int index = 0;
170169
for (String taskId : taskIds) {
171170
index++;
172-
final Object task = createIndexTaskForInlineData(
171+
final IndexTask task = createIndexTaskForInlineData(
173172
taskId,
174173
StringUtils.format(
175174
"%s,%s,%d",
176175
jan1.plusDays(index), "item " + index, index
177176
)
178177
);
179-
cluster.callApi().onLeaderOverlord(
180-
o -> o.runTask(taskId, task)
181-
);
178+
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
182179
}
183180
for (String taskId : taskIds) {
184181
cluster.callApi().waitForTaskToSucceed(taskId, overlord);

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
156156
maxRowsPerSegment
157157
);
158158

159-
final Map<String, String> startSupervisorResult = cluster.callApi().onLeaderOverlord(
160-
o -> o.postSupervisor(kafkaSupervisorSpec)
159+
Assertions.assertEquals(
160+
supervisorId,
161+
cluster.callApi().postSupervisor(kafkaSupervisorSpec)
161162
);
162-
Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
163163

164164
// Wait for segments to be handed off
165165
indexer.latchableEmitter().waitForEventAggregate(
@@ -183,9 +183,7 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
183183
verifyIngestedMetricCountMatchesEmittedCount("coordinator/time", coordinator);
184184

185185
// Suspend the supervisor
186-
cluster.callApi().onLeaderOverlord(
187-
o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
188-
);
186+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
189187
}
190188

191189
@Test
@@ -208,9 +206,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
208206
taskCompletionTimeoutMillis,
209207
maxRowsPerSegment
210208
);
211-
cluster.callApi().onLeaderOverlord(
212-
o -> o.postSupervisor(kafkaSupervisorSpec)
213-
);
209+
cluster.callApi().postSupervisor(kafkaSupervisorSpec);
214210

215211
// Wait for a task to succeed
216212
overlord.latchableEmitter().waitForEventAggregate(
@@ -248,9 +244,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
248244
false,
249245
null
250246
);
251-
cluster.callApi().onLeaderOverlord(
252-
o -> o.postSupervisor(compactionSupervisorSpec)
253-
);
247+
cluster.callApi().postSupervisor(compactionSupervisorSpec);
254248

255249
// Wait until some compaction tasks have finished
256250
overlord.latchableEmitter().waitForEventAggregate(
@@ -309,12 +303,8 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
309303
);
310304

311305
// Suspend the supervisors
312-
cluster.callApi().onLeaderOverlord(
313-
o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
314-
);
315-
cluster.callApi().onLeaderOverlord(
316-
o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
317-
);
306+
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
307+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
318308
}
319309

320310
@Test
@@ -337,9 +327,7 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
337327
taskCompletionTimeoutMillis,
338328
maxRowsPerSegment
339329
);
340-
cluster.callApi().onLeaderOverlord(
341-
o -> o.postSupervisor(kafkaSupervisorSpec)
342-
);
330+
cluster.callApi().postSupervisor(kafkaSupervisorSpec);
343331

344332
// Wait for some segments to be published
345333
overlord.latchableEmitter().waitForEvent(
@@ -371,9 +359,7 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
371359
false,
372360
null
373361
);
374-
cluster.callApi().onLeaderOverlord(
375-
o -> o.postSupervisor(compactionSupervisorSpec)
376-
);
362+
cluster.callApi().postSupervisor(compactionSupervisorSpec);
377363

378364
// Wait until some skipped metrics have been emitted
379365
overlord.latchableEmitter().waitForEventAggregate(
@@ -388,12 +374,8 @@ public void test_ingestClusterMetrics_compactionSkipsLockedIntervals()
388374
);
389375

390376
// Suspend the supervisors
391-
cluster.callApi().onLeaderOverlord(
392-
o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
393-
);
394-
cluster.callApi().onLeaderOverlord(
395-
o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
396-
);
377+
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
378+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
397379
}
398380

399381
/**

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.druid.segment.indexing.DataSchema;
4141
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
4242
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
43+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
4344
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
4445
import org.apache.kafka.clients.producer.ProducerRecord;
4546
import org.joda.time.Period;
@@ -84,13 +85,14 @@ void setupCreateKafkaTopic()
8485
/**
8586
* Submits a supervisor spec to the Overlord.
8687
*/
87-
protected void submitSupervisor()
88+
protected void submitSupervisor(EmbeddedOverlord overlord)
8889
{
8990
// Submit a supervisor.
9091
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor();
91-
final Map<String, String> startSupervisorResult =
92-
cluster.callApi().onLeaderOverlord(o -> o.postSupervisor(kafkaSupervisorSpec));
93-
Assertions.assertEquals(Map.of("id", dataSource), startSupervisorResult);
92+
Assertions.assertEquals(
93+
dataSource,
94+
cluster.callApi().postSupervisor(kafkaSupervisorSpec)
95+
);
9496
}
9597

9698
/**

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ protected void setupLookups() throws Exception
167167
void setUpEach()
168168
{
169169
msqApis = new EmbeddedMSQApis(cluster, overlord);
170-
submitSupervisor();
170+
submitSupervisor(overlord);
171171
publishToKafka(TestIndex.getMMappedWikipediaIndex());
172172

173173
// Wait for it to be loaded.

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void setUpEach()
113113

114114
QueryableIndex index = TestIndex.getMMappedTestIndex();
115115

116-
submitSupervisor();
116+
submitSupervisor(overlord);
117117
publishToKafka(index);
118118

119119
final int totalRows = index.getNumRows();

embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,7 @@ private void runIndexTask()
192192
.dimensions()
193193
.withId(taskId);
194194

195-
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
196-
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
195+
cluster.callApi().runTask(task, overlord);
197196
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
198197
}
199198
}

embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ public void test_switchLeader_andVerifyUsingSysTables()
121121
.inlineInputSourceWithData(Resources.InlineData.CSV_10_DAYS)
122122
.dimensions()
123123
.withId(taskId);
124-
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
125-
cluster.callApi().waitForTaskToSucceed(taskId, overlord1);
124+
cluster.callApi().runTask(task, overlord1);
126125
coordinator1.latchableEmitter().waitForEvent(
127126
event -> event.hasMetricName("segment/metadataCache/used/count")
128127
.hasDimension(DruidMetrics.DATASOURCE, dataSource)

0 commit comments

Comments
 (0)