Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions .github/workflows/cron-job-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,6 @@ jobs:
run: |
./it.sh ci

integration-index-tests-middleManager:
strategy:
fail-fast: false
matrix:
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
build_jdk: 17
runtime_jdk: 17
testing_groups: -Dgroups=${{ matrix.testing_group }}
use_indexer: middleManager
group: ${{ matrix.testing_group }}

integration-index-tests-indexer:
strategy:
fail-fast: false
matrix:
testing_group: [ kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
build_jdk: 17
runtime_jdk: 17
testing_groups: -Dgroups=${{ matrix.testing_group }}
use_indexer: indexer
group: ${{ matrix.testing_group }}

integration-query-tests-middleManager:
strategy:
fail-fast: false
Expand Down
31 changes: 0 additions & 31 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,37 +42,6 @@ jobs:
core:
- '!extension*/**'

integration-index-tests-middleManager:
needs: changes
strategy:
fail-fast: false
matrix:
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
build_jdk: 17
runtime_jdk: 17
testing_groups: -Dgroups=${{ matrix.testing_group }}
override_config_path: ./environment-configs/test-groups/prepopulated-data
use_indexer: middleManager
group: ${{ matrix.testing_group }}

integration-index-tests-indexer:
needs: changes
strategy:
fail-fast: false
matrix:
testing_group: [kafka-index]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
build_jdk: 17
runtime_jdk: 17
testing_groups: -Dgroups=${{ matrix.testing_group }}
use_indexer: indexer
group: ${{ matrix.testing_group }}

integration-query-tests-middleManager:
needs: changes
strategy:
Expand Down
30 changes: 27 additions & 3 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3598,12 +3598,33 @@ Host: http://ROUTER_IP:ROUTER_PORT

### Handoff task groups for a supervisor early

Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution
Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution.

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`

#### Responses

<Tabs>

<TabItem value="1" label="202 ACCEPTED">

*Request has been accepted and handoff will be initiated in the background.*

</TabItem>
<TabItem value="2" label="404 NOT FOUND">

*Invalid supervisor ID or the supervisor is not running.*

</TabItem>
<TabItem value="3" label="400 BAD REQUEST">

*Supervisor does not support early handoff.*

</TabItem>
</Tabs>

#### Sample request

The following example shows how to handoff task groups for a supervisor with the name `social_media` and has the task groups: `1,2,3`.
Expand Down Expand Up @@ -3639,8 +3660,11 @@ Content-Type: application/json
#### Sample response

<details>
<summary>View the response</summary>
(empty response)
<summary>202 Accepted</summary>

```json
{}
```
</details>

### Shut down a supervisor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.indexing;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.RequestBuilder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaFaultToleranceTest extends KafkaTestBase
{
private SupervisorSpec supervisorSpec = null;
private String topic = null;
private int totalRecords = 0;

@BeforeEach
public void setupTopicAndSupervisor()
{
totalRecords = 0;
topic = "topic_" + dataSource;
kafkaServer.createTopicWithPartitions(topic, 2);

supervisorSpec = createSupervisor().withId("supe_" + dataSource).build(dataSource, topic);
cluster.callApi().postSupervisor(supervisorSpec);
}

@AfterEach
public void verifyAndTearDown()
{
waitUntilPublishedRecordsAreIngested(totalRecords);
verifySupervisorIsRunningHealthy(supervisorSpec.getId());
cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
kafkaServer.deleteTopic(topic);
verifyRowCount(totalRecords);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterOverlordRestart(boolean useTransactions) throws Exception
{
totalRecords = publish1kRecords(topic, useTransactions);
waitUntilPublishedRecordsAreIngested(totalRecords);

overlord.stop();
totalRecords += publish1kRecords(topic, useTransactions);

overlord.start();
totalRecords += publish1kRecords(topic, useTransactions);
}

@Test
public void test_supervisorRecovers_afterCoordinatorRestart() throws Exception
{
final boolean useTransactions = true;
totalRecords = publish1kRecords(topic, useTransactions);
waitUntilPublishedRecordsAreIngested(totalRecords);

coordinator.stop();
totalRecords += publish1kRecords(topic, useTransactions);

coordinator.start();
totalRecords += publish1kRecords(topic, useTransactions);
}

@Test
public void test_supervisorRecovers_afterHistoricalRestart() throws Exception
{
final boolean useTransactions = false;
totalRecords = publish1kRecords(topic, useTransactions);
waitUntilPublishedRecordsAreIngested(totalRecords);

historical.stop();
totalRecords += publish1kRecords(topic, useTransactions);

historical.start();
totalRecords += publish1kRecords(topic, useTransactions);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterSuspendResume(boolean useTransactions)
{
totalRecords = publish1kRecords(topic, useTransactions);
waitUntilPublishedRecordsAreIngested(totalRecords);

cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
totalRecords += publish1kRecords(topic, useTransactions);

cluster.callApi().postSupervisor(supervisorSpec.createRunningSpec());
totalRecords += publish1kRecords(topic, useTransactions);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean useTransactions)
{
totalRecords = publish1kRecords(topic, useTransactions);

kafkaServer.increasePartitionsInTopic(topic, 4);
totalRecords += publish1kRecords(topic, useTransactions);
Comment on lines +126 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed locally for me with transactions disabled: org.apache.druid.java.util.common.ISE: Timed out waiting for event after [60,000]ms

It seems like it's stuck in some loop from these logs:

2026-01-02T20:18:32,119 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,225 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,331 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,387 WARN [KafkaSupervisor-supe_datasource_nhjeoeib] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_nhjeoeib] is greater than the number of partitions[1].
2026-01-02T20:18:32,388 INFO [KafkaSupervisor-supe_datasource_nhjeoeib] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_nhjeoeib], signalling tasks to complete.
2026-01-02T20:18:32,438 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,509 WARN [KafkaSupervisor-supe_datasource_nfgacpoa] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_nfgacpoa] is greater than the number of partitions[1].
2026-01-02T20:18:32,510 INFO [KafkaSupervisor-supe_datasource_nfgacpoa] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_nfgacpoa], signalling tasks to complete.
2026-01-02T20:18:32,542 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,648 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,754 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,861 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,962 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,035 WARN [KafkaSupervisor-supe_datasource_lnedibpp] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_lnedibpp] is greater than the number of partitions[1].
2026-01-02T20:18:33,035 INFO [KafkaSupervisor-supe_datasource_lnedibpp] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_lnedibpp], signalling tasks to complete.
2026-01-02T20:18:33,066 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,169 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,275 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,378 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:33,413 WARN [KafkaSupervisor-supe_datasource_abegdknn] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_abegdknn] is greater than the number of partitions[1].
2026-01-02T20:18:33,414 INFO [KafkaSupervisor-supe_datasource_abegdknn] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_abegdknn], signalling tasks to complete.
2026-01-02T20:18:33,481 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,577 WARN [KafkaSupervisor-supe_datasource_dpcjklpc] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_dpcjklpc] is greater than the number of partitions[1].
2026-01-02T20:18:33,578 INFO [KafkaSupervisor-supe_datasource_dpcjklpc] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_dpcjklpc], signalling tasks to complete.
2026-01-02T20:18:33,584 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.

org.apache.druid.java.util.common.ISE: Timed out waiting for event after [60,000]ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, let me check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekrb19 , I tried to debug this test. The logs that you have shared occur even in success scenarios.

Could you try running with an increased timeout and see if it works then?

Copy link
Contributor

@abhishekrb19 abhishekrb19 Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried running this test in a loop 10 times and I'm not able to reproduce the failure with the existing timeout. Iirc I noticed the failure the first time I ran it and maybe it was a one-off. We can keep an eye on the CI builds.

}

@Test
public void test_supervisorLaunchesNewTask_ifEarlyHandoff()
{
final boolean useTransactions = true;
totalRecords = publish1kRecords(topic, useTransactions);

waitUntilPublishedRecordsAreIngested(totalRecords);

final Set<String> taskIdsBeforeHandoff = getRunningTaskIds(dataSource);
Assertions.assertFalse(taskIdsBeforeHandoff.isEmpty());

final String path = StringUtils.format(
"/druid/indexer/v1/supervisor/%s/taskGroups/handoff",
supervisorSpec.getId()
);
cluster.callApi().serviceClient().onLeaderOverlord(
mapper -> new RequestBuilder(HttpMethod.POST, path)
.jsonContent(mapper, Map.of("taskGroupIds", List.of(0, 1))),
new TypeReference<>() {}
);

// Wait for the handoff notice to be processed
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("ingest/notices/time")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorSpec.getId())
.hasDimension("noticeType", "handoff_task_group_notice")
);

totalRecords += publish1kRecords(topic, useTransactions);
Comment on lines +156 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test actually verify that the old task(s) have completed successfully on early handoff and new ones have been spun?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I meant to do that initially but then just lazied out 😛 . Let me see if I can add that verification too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

waitUntilPublishedRecordsAreIngested(totalRecords);

// Verify that the running task IDs have changed
final Set<String> taskIdsAfterHandoff = getRunningTaskIds(dataSource);
Assertions.assertFalse(taskIdsAfterHandoff.isEmpty());
Assertions.assertFalse(taskIdsBeforeHandoff.stream().anyMatch(taskIdsAfterHandoff::contains));
}

private Set<String> getRunningTaskIds(String dataSource)
{
return cluster.callApi()
.getTasks(dataSource, "running")
.stream()
.map(TaskStatusPlus::getId)
.collect(Collectors.toSet());
}
}
Loading
Loading