Skip to content

Commit bdb8f22

Browse files
authored
Migrate Kafka ITs to embedded tests (#18870)
Changes: - Add `KafkaResource.produceRecordsWithoutTransaction()` - Add `KafkaFaultToleranceTest` - Add `FaultyClusterTest` to replace `ITFaultyClusterTest` - Fix up `FaultyTaskLockbox`, `FaultyTaskActionClientFactory` on Indexers - Allow deseriazation of `KafkaSupervisorReportPayload` - Add `KafkaMultiSupervisorTest` - Add new methods to `EmbeddedKafkaSupervisorTest` - Remove the following IT groups from GHA workflow - `kafka-index` - `kafka-index-slow` - `kafka-transactional-index` - `kafka-transactional-index-slow`
1 parent 761c2bc commit bdb8f22

File tree

31 files changed

+844
-1263
lines changed

31 files changed

+844
-1263
lines changed

.github/workflows/cron-job-its.yml

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,34 +56,6 @@ jobs:
5656
run: |
5757
./it.sh ci
5858
59-
integration-index-tests-middleManager:
60-
strategy:
61-
fail-fast: false
62-
matrix:
63-
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index]
64-
uses: ./.github/workflows/reusable-standard-its.yml
65-
needs: build
66-
with:
67-
build_jdk: 17
68-
runtime_jdk: 17
69-
testing_groups: -Dgroups=${{ matrix.testing_group }}
70-
use_indexer: middleManager
71-
group: ${{ matrix.testing_group }}
72-
73-
integration-index-tests-indexer:
74-
strategy:
75-
fail-fast: false
76-
matrix:
77-
testing_group: [ kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
78-
uses: ./.github/workflows/reusable-standard-its.yml
79-
needs: build
80-
with:
81-
build_jdk: 17
82-
runtime_jdk: 17
83-
testing_groups: -Dgroups=${{ matrix.testing_group }}
84-
use_indexer: indexer
85-
group: ${{ matrix.testing_group }}
86-
8759
integration-query-tests-middleManager:
8860
strategy:
8961
fail-fast: false

.github/workflows/standard-its.yml

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,37 +42,6 @@ jobs:
4242
core:
4343
- '!extension*/**'
4444
45-
integration-index-tests-middleManager:
46-
needs: changes
47-
strategy:
48-
fail-fast: false
49-
matrix:
50-
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
51-
uses: ./.github/workflows/reusable-standard-its.yml
52-
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
53-
with:
54-
build_jdk: 17
55-
runtime_jdk: 17
56-
testing_groups: -Dgroups=${{ matrix.testing_group }}
57-
override_config_path: ./environment-configs/test-groups/prepopulated-data
58-
use_indexer: middleManager
59-
group: ${{ matrix.testing_group }}
60-
61-
integration-index-tests-indexer:
62-
needs: changes
63-
strategy:
64-
fail-fast: false
65-
matrix:
66-
testing_group: [kafka-index]
67-
uses: ./.github/workflows/reusable-standard-its.yml
68-
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
69-
with:
70-
build_jdk: 17
71-
runtime_jdk: 17
72-
testing_groups: -Dgroups=${{ matrix.testing_group }}
73-
use_indexer: indexer
74-
group: ${{ matrix.testing_group }}
75-
7645
integration-query-tests-middleManager:
7746
needs: changes
7847
strategy:

docs/api-reference/supervisor-api.md

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3598,12 +3598,33 @@ Host: http://ROUTER_IP:ROUTER_PORT
35983598

35993599
### Handoff task groups for a supervisor early
36003600

3601-
Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution
3601+
Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution.
36023602

36033603
#### URL
36043604

36053605
`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`
36063606

3607+
#### Responses
3608+
3609+
<Tabs>
3610+
3611+
<TabItem value="1" label="202 ACCEPTED">
3612+
3613+
*Request has been accepted and handoff will be initiated in the background.*
3614+
3615+
</TabItem>
3616+
<TabItem value="2" label="404 NOT FOUND">
3617+
3618+
*Invalid supervisor ID or the supervisor is not running.*
3619+
3620+
</TabItem>
3621+
<TabItem value="3" label="400 BAD REQUEST">
3622+
3623+
*Supervisor does not support early handoff.*
3624+
3625+
</TabItem>
3626+
</Tabs>
3627+
36073628
#### Sample request
36083629

36093630
The following example shows how to handoff task groups for a supervisor with the name `social_media` and has the task groups: `1,2,3`.
@@ -3639,8 +3660,11 @@ Content-Type: application/json
36393660
#### Sample response
36403661

36413662
<details>
3642-
<summary>View the response</summary>
3643-
(empty response)
3663+
<summary>202 Accepted</summary>
3664+
3665+
```json
3666+
{}
3667+
```
36443668
</details>
36453669

36463670
### Shut down a supervisor
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.indexing;
21+
22+
import com.fasterxml.jackson.core.type.TypeReference;
23+
import org.apache.druid.indexer.TaskStatusPlus;
24+
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
25+
import org.apache.druid.java.util.common.StringUtils;
26+
import org.apache.druid.query.DruidMetrics;
27+
import org.apache.druid.rpc.RequestBuilder;
28+
import org.jboss.netty.handler.codec.http.HttpMethod;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.ValueSource;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Set;
39+
import java.util.stream.Collectors;
40+
41+
public class KafkaFaultToleranceTest extends KafkaTestBase
42+
{
43+
private SupervisorSpec supervisorSpec = null;
44+
private String topic = null;
45+
private int totalRecords = 0;
46+
47+
@BeforeEach
48+
public void setupTopicAndSupervisor()
49+
{
50+
totalRecords = 0;
51+
topic = "topic_" + dataSource;
52+
kafkaServer.createTopicWithPartitions(topic, 2);
53+
54+
supervisorSpec = createSupervisor().withId("supe_" + dataSource).build(dataSource, topic);
55+
cluster.callApi().postSupervisor(supervisorSpec);
56+
}
57+
58+
@AfterEach
59+
public void verifyAndTearDown()
60+
{
61+
waitUntilPublishedRecordsAreIngested(totalRecords);
62+
verifySupervisorIsRunningHealthy(supervisorSpec.getId());
63+
cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
64+
kafkaServer.deleteTopic(topic);
65+
verifyRowCount(totalRecords);
66+
}
67+
68+
@ParameterizedTest
69+
@ValueSource(booleans = {true, false})
70+
public void test_supervisorRecovers_afterOverlordRestart(boolean useTransactions) throws Exception
71+
{
72+
totalRecords = publish1kRecords(topic, useTransactions);
73+
waitUntilPublishedRecordsAreIngested(totalRecords);
74+
75+
overlord.stop();
76+
totalRecords += publish1kRecords(topic, useTransactions);
77+
78+
overlord.start();
79+
totalRecords += publish1kRecords(topic, useTransactions);
80+
}
81+
82+
@Test
83+
public void test_supervisorRecovers_afterCoordinatorRestart() throws Exception
84+
{
85+
final boolean useTransactions = true;
86+
totalRecords = publish1kRecords(topic, useTransactions);
87+
waitUntilPublishedRecordsAreIngested(totalRecords);
88+
89+
coordinator.stop();
90+
totalRecords += publish1kRecords(topic, useTransactions);
91+
92+
coordinator.start();
93+
totalRecords += publish1kRecords(topic, useTransactions);
94+
}
95+
96+
@Test
97+
public void test_supervisorRecovers_afterHistoricalRestart() throws Exception
98+
{
99+
final boolean useTransactions = false;
100+
totalRecords = publish1kRecords(topic, useTransactions);
101+
waitUntilPublishedRecordsAreIngested(totalRecords);
102+
103+
historical.stop();
104+
totalRecords += publish1kRecords(topic, useTransactions);
105+
106+
historical.start();
107+
totalRecords += publish1kRecords(topic, useTransactions);
108+
}
109+
110+
@ParameterizedTest
111+
@ValueSource(booleans = {true, false})
112+
public void test_supervisorRecovers_afterSuspendResume(boolean useTransactions)
113+
{
114+
totalRecords = publish1kRecords(topic, useTransactions);
115+
waitUntilPublishedRecordsAreIngested(totalRecords);
116+
117+
cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
118+
totalRecords += publish1kRecords(topic, useTransactions);
119+
120+
cluster.callApi().postSupervisor(supervisorSpec.createRunningSpec());
121+
totalRecords += publish1kRecords(topic, useTransactions);
122+
}
123+
124+
@ParameterizedTest
125+
@ValueSource(booleans = {true, false})
126+
public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean useTransactions)
127+
{
128+
totalRecords = publish1kRecords(topic, useTransactions);
129+
130+
kafkaServer.increasePartitionsInTopic(topic, 4);
131+
totalRecords += publish1kRecords(topic, useTransactions);
132+
}
133+
134+
@Test
135+
public void test_supervisorLaunchesNewTask_ifEarlyHandoff()
136+
{
137+
final boolean useTransactions = true;
138+
totalRecords = publish1kRecords(topic, useTransactions);
139+
140+
waitUntilPublishedRecordsAreIngested(totalRecords);
141+
142+
final Set<String> taskIdsBeforeHandoff = getRunningTaskIds(dataSource);
143+
Assertions.assertFalse(taskIdsBeforeHandoff.isEmpty());
144+
145+
final String path = StringUtils.format(
146+
"/druid/indexer/v1/supervisor/%s/taskGroups/handoff",
147+
supervisorSpec.getId()
148+
);
149+
cluster.callApi().serviceClient().onLeaderOverlord(
150+
mapper -> new RequestBuilder(HttpMethod.POST, path)
151+
.jsonContent(mapper, Map.of("taskGroupIds", List.of(0, 1))),
152+
new TypeReference<>() {}
153+
);
154+
155+
// Wait for the handoff notice to be processed
156+
overlord.latchableEmitter().waitForEvent(
157+
event -> event.hasMetricName("ingest/notices/time")
158+
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorSpec.getId())
159+
.hasDimension("noticeType", "handoff_task_group_notice")
160+
);
161+
162+
totalRecords += publish1kRecords(topic, useTransactions);
163+
waitUntilPublishedRecordsAreIngested(totalRecords);
164+
165+
// Verify that the running task IDs have changed
166+
final Set<String> taskIdsAfterHandoff = getRunningTaskIds(dataSource);
167+
Assertions.assertFalse(taskIdsAfterHandoff.isEmpty());
168+
Assertions.assertFalse(taskIdsBeforeHandoff.stream().anyMatch(taskIdsAfterHandoff::contains));
169+
}
170+
171+
private Set<String> getRunningTaskIds(String dataSource)
172+
{
173+
return cluster.callApi()
174+
.getTasks(dataSource, "running")
175+
.stream()
176+
.map(TaskStatusPlus::getId)
177+
.collect(Collectors.toSet());
178+
}
179+
}

0 commit comments

Comments
 (0)