Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3662,9 +3662,8 @@ Content-Type: application/json
<details>
<summary>202 Accepted</summary>

```json
{}
```
*Empty response*

</details>

### Shut down a supervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.testing.embedded.indexing;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void verifyAndTearDown()
verifyRowCount(totalRecords);
}

@ParameterizedTest
@ParameterizedTest(name = "useTransactions={0}")
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterOverlordRestart(boolean useTransactions) throws Exception
{
Expand Down Expand Up @@ -107,7 +106,7 @@ public void test_supervisorRecovers_afterHistoricalRestart() throws Exception
totalRecords += publish1kRecords(topic, useTransactions);
}

@ParameterizedTest
@ParameterizedTest(name = "useTransactions={0}")
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterSuspendResume(boolean useTransactions)
{
Expand All @@ -121,7 +120,7 @@ public void test_supervisorRecovers_afterSuspendResume(boolean useTransactions)
totalRecords += publish1kRecords(topic, useTransactions);
}

@ParameterizedTest
@ParameterizedTest(name = "useTransactions={0}")
@ValueSource(booleans = {true, false})
public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean useTransactions)
{
Expand Down Expand Up @@ -149,7 +148,7 @@ public void test_supervisorLaunchesNewTask_ifEarlyHandoff()
cluster.callApi().serviceClient().onLeaderOverlord(
mapper -> new RequestBuilder(HttpMethod.POST, path)
.jsonContent(mapper, Map.of("taskGroupIds", List.of(0, 1))),
new TypeReference<>() {}
null
);

// Wait for the handoff notice to be processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void test_overlord_skipsCleanupOfPendingSegments()
cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
kafkaServer.deleteTopic(topic);

cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
verifyRowCount(recordCount);

// Verify that pending segments are not cleaned up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,7 @@ public Response handoffTaskGroups(
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.status(Response.Status.ACCEPTED)
.entity(Map.of()) // empty json object to allow deserialization by the client
.build();
return Response.status(Response.Status.ACCEPTED).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,28 @@ public void testSpecPostMergeFallsBackToProvidedTaskCountMin()
EasyMock.verify(newSpec.getIoConfig());
}

@Test
public void test_handoffTaskGroups_returnsAccepted()
{
final SupervisorSpec spec = createTestSpec(1, 1);
final SupervisorResource.HandoffTaskGroupsRequest handoffRequest
= new SupervisorResource.HandoffTaskGroupsRequest(List.of(0));

EasyMock.expect(taskMaster.getSupervisorManager())
.andReturn(Optional.of(supervisorManager))
.times(1);
EasyMock.expect(supervisorManager.handoffTaskGroupsEarly(spec.getId(), handoffRequest.getTaskGroupIds()))
.andReturn(true)
.times(1);
replayAll();

final Response response = supervisorResource.handoffTaskGroups(spec.getId(), handoffRequest);
Assert.assertEquals(202, response.getStatus());
Assert.assertNull(response.getEntity());

verifyAll();
}

private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount, int taskCountMin)
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
Expand Down
Loading