Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -39,10 +42,10 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
public void testGetCheckpoint() throws Exception {
final String indexNamePrefix = "test_index-";
final int shards = randomIntBetween(1, 5);
final int indices = randomIntBetween(1, 5);
var indices = indices(indexNamePrefix, randomIntBetween(1, 5));

for (int i = 0; i < indices; ++i) {
indicesAdmin().prepareCreate(indexNamePrefix + i).setSettings(indexSettings(shards, 1)).get();
for (var index : indices) {
indicesAdmin().prepareCreate(index).setSettings(indexSettings(shards, 1)).get();
}

final GetCheckpointAction.Request request = new GetCheckpointAction.Request(
Expand All @@ -54,7 +57,7 @@ public void testGetCheckpoint() throws Exception {
);

final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get();
assertEquals(indices, response.getCheckpoints().size());
assertEquals(indices.size(), response.getCheckpoints().size());

// empty indices should report -1 as sequence id
assertFalse(
Expand All @@ -63,30 +66,30 @@ public void testGetCheckpoint() throws Exception {

final int docsToCreatePerShard = randomIntBetween(0, 10);
for (int d = 0; d < docsToCreatePerShard; ++d) {
for (int i = 0; i < indices; ++i) {
for (var index : indices) {
for (int j = 0; j < shards; ++j) {
prepareIndex(indexNamePrefix + i).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get();
prepareIndex(index).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get();
}
}
}

indicesAdmin().refresh(new RefreshRequest(indexNamePrefix + "*"));

final GetCheckpointAction.Response response2 = client().execute(GetCheckpointAction.INSTANCE, request).get();
assertEquals(indices, response2.getCheckpoints().size());
assertEquals(indices.size(), response2.getCheckpoints().size());

// check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1
long checkpointSum = response2.getCheckpoints().values().stream().map(l -> Arrays.stream(l).sum()).mapToLong(Long::valueOf).sum();
assertEquals(
"Expected "
+ (docsToCreatePerShard - 1) * shards * indices
+ (docsToCreatePerShard - 1) * shards * indices.size()
+ " as sum of "
+ response2.getCheckpoints()
.entrySet()
.stream()
.map(e -> e.getKey() + ": {" + Strings.arrayToCommaDelimitedString(Arrays.stream(e.getValue()).boxed().toArray()) + "}")
.collect(Collectors.joining(",")),
(docsToCreatePerShard - 1) * shards * indices,
(docsToCreatePerShard - 1) * shards * indices.size(),
checkpointSum
);

Expand All @@ -98,25 +101,28 @@ public void testGetCheckpoint() throws Exception {
.filter(i -> i.getShardRouting().primary())
.sorted(Comparator.comparingInt(value -> value.getShardRouting().id()))
.mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint())
.filter(Objects::nonNull)
.sum(),
checkpointSum
);
deleteIndices(indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the test fails will the deleteIndices still be called? Would it make sense to move these to an After method. I guess not all tests need to delete the indices, but maybe we could just ignore a failure on a delete attempt in that situation and always try to delete the indices.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's ignore the failure from delete - the cleanup code may still pass (this seems to be a rare issue anyway)

}

public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exception {
final String indexNamePrefix = "test_index-";
final int indices = randomIntBetween(1, 5);
var indices = indices(indexNamePrefix, randomIntBetween(1, 5));
final int shards = randomIntBetween(1, 5);
final int docsToCreatePerShard = randomIntBetween(0, 10);

for (int i = 0; i < indices; ++i) {
indicesAdmin().prepareCreate(indexNamePrefix + i)
for (int i = 0; i < indices.size(); ++i) {
var index = indices.get(i);
indicesAdmin().prepareCreate(index)
.setSettings(indexSettings(shards, 1))
.setMapping("field", "type=long", "@timestamp", "type=date")
.get();
for (int j = 0; j < shards; ++j) {
for (int d = 0; d < docsToCreatePerShard; ++d) {
client().prepareIndex(indexNamePrefix + i)
client().prepareIndex(index)
.setSource(Strings.format("{ \"field\":%d, \"@timestamp\": %d }", j, 10_000_000 + d + i + j), XContentType.JSON)
.get();
}
Expand All @@ -135,6 +141,7 @@ public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exceptio

final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get();
assertThat("Response was: " + response.getCheckpoints(), response.getCheckpoints(), is(anEmptyMap()));
deleteIndices(indices);
}

public void testGetCheckpointWithMissingIndex() throws Exception {
Expand Down Expand Up @@ -163,11 +170,11 @@ public void testGetCheckpointWithMissingIndex() throws Exception {

public void testGetCheckpointTimeoutExceeded() throws Exception {
final String indexNamePrefix = "test_index-";
final int indices = 100;
var indices = indices(indexNamePrefix, 100);
final int shards = 5;

for (int i = 0; i < indices; ++i) {
indicesAdmin().prepareCreate(indexNamePrefix + i).setSettings(indexSettings(shards, 0)).get();
for (var index : indices) {
indicesAdmin().prepareCreate(index).setSettings(indexSettings(shards, 0)).get();
}

final GetCheckpointAction.Request request = new GetCheckpointAction.Request(
Expand All @@ -184,7 +191,7 @@ public void testGetCheckpointTimeoutExceeded() throws Exception {
finalException.set(e);
latch.countDown();
}));
latch.await(10, TimeUnit.SECONDS);
assertTrue(latch.await(10, TimeUnit.SECONDS));

Exception e = finalException.get();
if (e != null) {
Expand All @@ -198,5 +205,14 @@ public void testGetCheckpointTimeoutExceeded() throws Exception {
// Due to system clock usage, the timeout does not always occur where it should.
// We cannot mock the clock so we just have to live with it.
}
deleteIndices(indices);
}

private List<String> indices(String prefix, int numberOfIndices) {
return IntStream.range(0, numberOfIndices).mapToObj(i -> prefix + i).toList();
}

private void deleteIndices(List<String> indices) {
indicesAdmin().prepareDelete(indices.toArray(new String[0])).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class TransformGetCheckpointTests extends ESSingleNodeTestCase {
private ThreadPool threadPool;
private IndexNameExpressionResolver indexNameExpressionResolver;
private Client client;
private MockTransport mockTransport;
private Task transformTask;
private final String indexNamePattern = "test_index-";
private String[] testIndices;
Expand All @@ -99,7 +98,7 @@ public void setUp() throws Exception {
indexNameExpressionResolver = new MockResolver();
clusterService = getInstanceFromNode(ClusterService.class);
indicesService = getInstanceFromNode(IndicesService.class);
mockTransport = new MockTransport() {
MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(GetCheckpointNodeAction.NAME)) {
Expand Down