Skip to content

Commit c5a85b4

Browse files
authored
Migrate ITAppendBatchIndexTest, ITCustomCoordinatorDutiesTest to embedded tests (#18867)
Changes: - Replace `ITAppendBatchIndexTest` with a new method in `IndexParallelTaskTest` - Migrate `ITCustomCoordinatorDutiesTest` to embedded tests as `KillSupervisorsCustomDutyTest` - Remove the following test groups since all the tests for that group have already been migrated - `input-source` (#18805, #18752) - `cds-task-schema-publish-disabled` - `cds-coordinator-metadata-query-disabled`
1 parent 47d6f34 commit c5a85b4

20 files changed

+193
-1028
lines changed

.github/workflows/cron-job-its.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ jobs:
6060
strategy:
6161
fail-fast: false
6262
matrix:
63-
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index, append-ingestion]
63+
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index]
6464
uses: ./.github/workflows/reusable-standard-its.yml
6565
needs: build
6666
with:
@@ -74,7 +74,7 @@ jobs:
7474
strategy:
7575
fail-fast: false
7676
matrix:
77-
testing_group: [ input-source, kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format, append-ingestion ]
77+
testing_group: [ kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
7878
uses: ./.github/workflows/reusable-standard-its.yml
7979
needs: build
8080
with:

.github/workflows/standard-its.yml

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
strategy:
4848
fail-fast: false
4949
matrix:
50-
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index, append-ingestion, cds-task-schema-publish-disabled, cds-coordinator-metadata-query-disabled]
50+
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
5151
uses: ./.github/workflows/reusable-standard-its.yml
5252
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
5353
with:
@@ -63,7 +63,7 @@ jobs:
6363
strategy:
6464
fail-fast: false
6565
matrix:
66-
testing_group: [input-source, kafka-index, append-ingestion]
66+
testing_group: [kafka-index]
6767
uses: ./.github/workflows/reusable-standard-its.yml
6868
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
6969
with:
@@ -101,15 +101,3 @@ jobs:
101101
mysql_driver: org.mariadb.jdbc.Driver
102102
override_config_path: ./environment-configs/test-groups/prepopulated-data
103103
group: query
104-
105-
integration-custom-coordinator-duties-tests:
106-
needs: changes
107-
uses: ./.github/workflows/reusable-standard-its.yml
108-
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
109-
with:
110-
build_jdk: 17
111-
runtime_jdk: 17
112-
testing_groups: -Dgroups=custom-coordinator-duties
113-
use_indexer: middleManager
114-
override_config_path: ./environment-configs/test-groups/custom-coordinator-duties
115-
group: custom coordinator duties

docs/development/modules.md

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -326,19 +326,8 @@ The duties will be grouped into multiple groups as per the elements in list `dru
326326
All duties in the same group will have the same run period configured by `druid.coordinator.<GROUP_NAME>.period`.
327327
Currently, there is a single thread running the duties sequentially for each group.
328328

329-
For example, see `KillSupervisorsCustomDuty` for a custom coordinator duty implementation and the `custom-coordinator-duties`
330-
integration test group which loads `KillSupervisorsCustomDuty` using the configs set in `integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties`.
331-
This config file adds the configs below to enable a custom coordinator duty.
332-
333-
```properties
334-
druid.coordinator.dutyGroups=["cleanupMetadata"]
335-
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
336-
druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M
337-
druid.coordinator.cleanupMetadata.period=PT10S
338-
```
339-
340-
These configurations create a custom coordinator duty group called `cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` every 10 seconds.
341-
The custom coordinator duty `killSupervisors` also has a config called `durationToRetain` which is set to 0 minute.
329+
For example, see `KillSupervisorsCustomDuty` for a custom coordinator duty implementation and the `KillSupervisorsCustomDutyTest`
330+
for sample properties that may be used to configure the `KillSupervisorsCustomDuty`.
342331

343332
### Routing data through a HTTP proxy for your extension
344333

embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@
2929
import org.apache.druid.indexing.overlord.Segments;
3030
import org.apache.druid.java.util.common.granularity.Granularities;
3131
import org.apache.druid.java.util.common.granularity.Granularity;
32-
import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
33-
import org.apache.druid.msq.guice.MSQDurableStorageModule;
34-
import org.apache.druid.msq.guice.MSQIndexingModule;
35-
import org.apache.druid.msq.guice.MSQSqlModule;
36-
import org.apache.druid.msq.guice.SqlTaskModule;
3732
import org.apache.druid.query.DruidMetrics;
3833
import org.apache.druid.rpc.UpdateResponse;
3934
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
@@ -90,15 +85,7 @@ public EmbeddedDruidCluster createCluster()
9085
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
9186
)
9287
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
93-
.addExtensions(
94-
CatalogClientModule.class,
95-
CatalogCoordinatorModule.class,
96-
IndexerMemoryManagementModule.class,
97-
MSQDurableStorageModule.class,
98-
MSQIndexingModule.class,
99-
MSQSqlModule.class,
100-
SqlTaskModule.class
101-
)
88+
.addExtensions(CatalogClientModule.class, CatalogCoordinatorModule.class)
10289
.addServer(coordinator)
10390
.addServer(overlord)
10491
.addServer(indexer)

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -129,30 +129,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
129129
{
130130
final boolean isRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
131131

132-
final TaskBuilder.IndexParallel indexTask =
133-
TaskBuilder.ofTypeIndexParallel()
134-
.dataSource(dataSource)
135-
.timestampColumn("timestamp")
136-
.jsonInputFormat()
137-
.localInputSourceWithFiles(
138-
Resources.DataFile.tinyWiki1Json(),
139-
Resources.DataFile.tinyWiki2Json(),
140-
Resources.DataFile.tinyWiki3Json()
141-
)
142-
.segmentGranularity("DAY")
143-
.dimensions("namespace", "page", "language")
144-
.metricAggregates(
145-
new DoubleSumAggregatorFactory("added", "added"),
146-
new DoubleSumAggregatorFactory("deleted", "deleted"),
147-
new DoubleSumAggregatorFactory("delta", "delta"),
148-
new CountAggregatorFactory("count")
149-
)
150-
.tuningConfig(
151-
t -> t.withPartitionsSpec(partitionsSpec)
152-
.withForceGuaranteedRollup(isRollup)
153-
.withMaxNumConcurrentSubTasks(10)
154-
.withSplitHintSpec(new MaxSizeSplitHintSpec(1, null))
155-
);
132+
final TaskBuilder.IndexParallel indexTask = buildIndexParallelTask(partitionsSpec, false);
156133

157134
runTask(indexTask, dataSource);
158135
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
@@ -211,6 +188,60 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
211188
runQueries(dataSource3);
212189
}
213190

191+
@MethodSource("getTestParamPartitionsSpec")
192+
@ParameterizedTest(name = "partitionsSpec={0}")
193+
public void test_runIndexTask_andAppendData(PartitionsSpec partitionsSpec)
194+
{
195+
final TaskBuilder.IndexParallel initialTask = buildIndexParallelTask(partitionsSpec, false);
196+
runTask(initialTask, dataSource);
197+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
198+
cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource, "10");
199+
runGroupByQuery("Crimson Typhoon,1,905.0,9050.0");
200+
201+
final TaskBuilder.IndexParallel appendTask
202+
= buildIndexParallelTask(new DynamicPartitionsSpec(null, null), true);
203+
runTask(appendTask, dataSource);
204+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
205+
cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource, "20");
206+
runGroupByQuery("Crimson Typhoon,2,1810.0,18100.0");
207+
}
208+
209+
/**
210+
* Creates a builder for an "index_parallel" task to ingest into {@link #dataSource}.
211+
*/
212+
private TaskBuilder.IndexParallel buildIndexParallelTask(
213+
PartitionsSpec partitionsSpec,
214+
boolean appendToExisting
215+
)
216+
{
217+
final boolean isRollup = partitionsSpec.isForceGuaranteedRollupCompatible();
218+
219+
return TaskBuilder.ofTypeIndexParallel()
220+
.dataSource(dataSource)
221+
.timestampColumn("timestamp")
222+
.jsonInputFormat()
223+
.localInputSourceWithFiles(
224+
Resources.DataFile.tinyWiki1Json(),
225+
Resources.DataFile.tinyWiki2Json(),
226+
Resources.DataFile.tinyWiki3Json()
227+
)
228+
.segmentGranularity("DAY")
229+
.dimensions("namespace", "page", "language")
230+
.metricAggregates(
231+
new DoubleSumAggregatorFactory("added", "added"),
232+
new DoubleSumAggregatorFactory("deleted", "deleted"),
233+
new DoubleSumAggregatorFactory("delta", "delta"),
234+
new CountAggregatorFactory("count")
235+
)
236+
.appendToExisting(appendToExisting)
237+
.tuningConfig(
238+
t -> t.withPartitionsSpec(partitionsSpec)
239+
.withForceGuaranteedRollup(isRollup)
240+
.withMaxNumConcurrentSubTasks(10)
241+
.withSplitHintSpec(new MaxSizeSplitHintSpec(1, null))
242+
);
243+
}
244+
214245
private String runTask(TaskBuilder.IndexParallel taskBuilder, String dataSource)
215246
{
216247
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
@@ -224,8 +255,13 @@ private void runQueries(String dataSource)
224255
"10,2013-09-01T12:41:27.000Z,2013-08-31T01:02:33.000Z",
225256
cluster.runSql("SELECT COUNT(*), MAX(__time), MIN(__time) FROM %s", dataSource)
226257
);
258+
runGroupByQuery("Crimson Typhoon,1,905.0,9050.0");
259+
}
260+
261+
private void runGroupByQuery(String expectedResult)
262+
{
227263
Assertions.assertEquals(
228-
"Crimson Typhoon,1,905.0,9050.0",
264+
expectedResult,
229265
cluster.runSql(
230266
"SELECT \"page\", COUNT(*) AS \"rows\", SUM(\"added\"), 10 * SUM(\"added\") AS added_times_ten"
231267
+ " FROM %s"
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.server;
21+
22+
import com.fasterxml.jackson.core.type.TypeReference;
23+
import org.apache.druid.error.ExceptionMatcher;
24+
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
25+
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
26+
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
27+
import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
28+
import org.apache.druid.java.util.common.StringUtils;
29+
import org.apache.druid.rpc.HttpResponseException;
30+
import org.apache.druid.rpc.RequestBuilder;
31+
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
32+
import org.apache.druid.testing.embedded.EmbeddedBroker;
33+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
34+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
35+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
36+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
37+
import org.hamcrest.MatcherAssert;
38+
import org.hamcrest.Matchers;
39+
import org.jboss.netty.handler.codec.http.HttpMethod;
40+
import org.junit.jupiter.api.Assertions;
41+
import org.junit.jupiter.api.Test;
42+
43+
import java.util.List;
44+
45+
public class KillSupervisorsCustomDutyTest extends EmbeddedClusterTestBase
46+
{
47+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
48+
.addProperty("druid.coordinator.kill.supervisor.on", "false")
49+
.addProperty("druid.coordinator.dutyGroups", "[\"cleanupMetadata\"]")
50+
.addProperty("druid.coordinator.cleanupMetadata.duties", "[\"killSupervisors\"]")
51+
.addProperty("druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain", "PT0M")
52+
.addProperty("druid.coordinator.cleanupMetadata.period", "PT0.1S");
53+
54+
@Override
55+
protected EmbeddedDruidCluster createCluster()
56+
{
57+
return EmbeddedDruidCluster
58+
.withEmbeddedDerbyAndZookeeper()
59+
.useLatchableEmitter()
60+
.addServer(coordinator)
61+
.addServer(new EmbeddedOverlord())
62+
.addServer(new EmbeddedBroker());
63+
}
64+
65+
@Test
66+
public void test_customDuty_removesHistoryOfTerminatedSupervisor()
67+
{
68+
// Create a compaction supervisor
69+
final CompactionSupervisorSpec supervisor = new CompactionSupervisorSpec(
70+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(dataSource).build(),
71+
false,
72+
null
73+
);
74+
cluster.callApi().postSupervisor(supervisor);
75+
76+
// Verify that the history of the supervisor has 1 entry
77+
final List<VersionedSupervisorSpec> history = getSupervisorHistory(supervisor.getId());
78+
Assertions.assertEquals(1, history.size());
79+
80+
final SupervisorSpec supervisorEntry = history.get(0).getSpec();
81+
Assertions.assertNotNull(supervisorEntry);
82+
Assertions.assertEquals(List.of(dataSource), supervisorEntry.getDataSources());
83+
Assertions.assertEquals(supervisor.getId(), supervisorEntry.getId());
84+
85+
// Terminate the supervisor
86+
cluster.callApi().onLeaderOverlord(o -> o.terminateSupervisor(supervisor.getId()));
87+
88+
// Verify that the history now has 2 entries and the latest entry is a tombstone
89+
final List<VersionedSupervisorSpec> historyAfterTermination = getSupervisorHistory(supervisor.getId());
90+
Assertions.assertEquals(2, historyAfterTermination.size());
91+
Assertions.assertInstanceOf(NoopSupervisorSpec.class, historyAfterTermination.get(0).getSpec());
92+
93+
// Wait until the cleanup metric has been emitted
94+
coordinator.latchableEmitter().waitForEvent(
95+
event -> event.hasMetricName("metadata/kill/supervisor/count")
96+
.hasValueMatching(Matchers.greaterThanOrEqualTo(1L))
97+
);
98+
99+
// Verify that the history now returns 404 Not Found
100+
MatcherAssert.assertThat(
101+
Assertions.assertThrows(
102+
RuntimeException.class,
103+
() -> getSupervisorHistory(supervisor.getId())
104+
),
105+
ExceptionMatcher.of(RuntimeException.class).expectRootCause(
106+
ExceptionMatcher.of(HttpResponseException.class)
107+
.expectMessageContains("404 Not Found")
108+
.expectMessageContains(StringUtils.format("No history for [%s]", supervisor.getId()))
109+
)
110+
);
111+
}
112+
113+
private List<VersionedSupervisorSpec> getSupervisorHistory(String supervisorId)
114+
{
115+
final String url = StringUtils.format(
116+
"/druid/indexer/v1/supervisor/%s/history",
117+
StringUtils.urlEncode(supervisorId)
118+
);
119+
return cluster.callApi().serviceClient().onLeaderOverlord(
120+
mapper -> new RequestBuilder(HttpMethod.GET, url),
121+
new TypeReference<>() {}
122+
);
123+
}
124+
}

0 commit comments

Comments
 (0)