2222
2323import java .util .Arrays ;
2424import java .util .Comparator ;
25+ import java .util .List ;
26+ import java .util .Objects ;
2527import java .util .concurrent .CountDownLatch ;
2628import java .util .concurrent .TimeUnit ;
2729import java .util .stream .Collectors ;
30+ import java .util .stream .IntStream ;
2831
2932import static org .hamcrest .Matchers .anEmptyMap ;
3033import static org .hamcrest .Matchers .instanceOf ;
@@ -39,10 +42,10 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
3942 public void testGetCheckpoint () throws Exception {
4043 final String indexNamePrefix = "test_index-" ;
4144 final int shards = randomIntBetween (1 , 5 );
42- final int indices = randomIntBetween (1 , 5 );
45+ var indices = indices ( indexNamePrefix , randomIntBetween (1 , 5 ) );
4346
44- for (int i = 0 ; i < indices ; ++ i ) {
45- indicesAdmin ().prepareCreate (indexNamePrefix + i ).setSettings (indexSettings (shards , 1 )).get ();
47+ for (var index : indices ) {
48+ indicesAdmin ().prepareCreate (index ).setSettings (indexSettings (shards , 1 )).get ();
4649 }
4750
4851 final GetCheckpointAction .Request request = new GetCheckpointAction .Request (
@@ -54,7 +57,7 @@ public void testGetCheckpoint() throws Exception {
5457 );
5558
5659 final GetCheckpointAction .Response response = client ().execute (GetCheckpointAction .INSTANCE , request ).get ();
57- assertEquals (indices , response .getCheckpoints ().size ());
60+ assertEquals (indices . size () , response .getCheckpoints ().size ());
5861
5962 // empty indices should report -1 as sequence id
6063 assertFalse (
@@ -63,30 +66,30 @@ public void testGetCheckpoint() throws Exception {
6366
6467 final int docsToCreatePerShard = randomIntBetween (0 , 10 );
6568 for (int d = 0 ; d < docsToCreatePerShard ; ++d ) {
66- for (int i = 0 ; i < indices ; ++ i ) {
69+ for (var index : indices ) {
6770 for (int j = 0 ; j < shards ; ++j ) {
68- prepareIndex (indexNamePrefix + i ).setSource ("{" + "\" field\" :" + j + "}" , XContentType .JSON ).get ();
71+ prepareIndex (index ).setSource ("{" + "\" field\" :" + j + "}" , XContentType .JSON ).get ();
6972 }
7073 }
7174 }
7275
7376 indicesAdmin ().refresh (new RefreshRequest (indexNamePrefix + "*" ));
7477
7578 final GetCheckpointAction .Response response2 = client ().execute (GetCheckpointAction .INSTANCE , request ).get ();
76- assertEquals (indices , response2 .getCheckpoints ().size ());
79+ assertEquals (indices . size () , response2 .getCheckpoints ().size ());
7780
7881 // check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1
7982 long checkpointSum = response2 .getCheckpoints ().values ().stream ().map (l -> Arrays .stream (l ).sum ()).mapToLong (Long ::valueOf ).sum ();
8083 assertEquals (
8184 "Expected "
82- + (docsToCreatePerShard - 1 ) * shards * indices
85+ + (docsToCreatePerShard - 1 ) * shards * indices . size ()
8386 + " as sum of "
8487 + response2 .getCheckpoints ()
8588 .entrySet ()
8689 .stream ()
8790 .map (e -> e .getKey () + ": {" + Strings .arrayToCommaDelimitedString (Arrays .stream (e .getValue ()).boxed ().toArray ()) + "}" )
8891 .collect (Collectors .joining ("," )),
89- (docsToCreatePerShard - 1 ) * shards * indices ,
92+ (docsToCreatePerShard - 1 ) * shards * indices . size () ,
9093 checkpointSum
9194 );
9295
@@ -98,25 +101,28 @@ public void testGetCheckpoint() throws Exception {
98101 .filter (i -> i .getShardRouting ().primary ())
99102 .sorted (Comparator .comparingInt (value -> value .getShardRouting ().id ()))
100103 .mapToLong (s -> s .getSeqNoStats ().getGlobalCheckpoint ())
104+ .filter (Objects ::nonNull )
101105 .sum (),
102106 checkpointSum
103107 );
108+ deleteIndices (indices );
104109 }
105110
106111 public void testGetCheckpointWithQueryThatFiltersOutEverything () throws Exception {
107112 final String indexNamePrefix = "test_index-" ;
108- final int indices = randomIntBetween (1 , 5 );
113+ var indices = indices ( indexNamePrefix , randomIntBetween (1 , 5 ) );
109114 final int shards = randomIntBetween (1 , 5 );
110115 final int docsToCreatePerShard = randomIntBetween (0 , 10 );
111116
112- for (int i = 0 ; i < indices ; ++i ) {
113- indicesAdmin ().prepareCreate (indexNamePrefix + i )
117+ for (int i = 0 ; i < indices .size (); ++i ) {
118+ var index = indices .get (i );
119+ indicesAdmin ().prepareCreate (index )
114120 .setSettings (indexSettings (shards , 1 ))
115121 .setMapping ("field" , "type=long" , "@timestamp" , "type=date" )
116122 .get ();
117123 for (int j = 0 ; j < shards ; ++j ) {
118124 for (int d = 0 ; d < docsToCreatePerShard ; ++d ) {
119- client ().prepareIndex (indexNamePrefix + i )
125+ client ().prepareIndex (index )
120126 .setSource (Strings .format ("{ \" field\" :%d, \" @timestamp\" : %d }" , j , 10_000_000 + d + i + j ), XContentType .JSON )
121127 .get ();
122128 }
@@ -135,6 +141,7 @@ public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exceptio
135141
136142 final GetCheckpointAction .Response response = client ().execute (GetCheckpointAction .INSTANCE , request ).get ();
137143 assertThat ("Response was: " + response .getCheckpoints (), response .getCheckpoints (), is (anEmptyMap ()));
144+ deleteIndices (indices );
138145 }
139146
140147 public void testGetCheckpointWithMissingIndex () throws Exception {
@@ -163,11 +170,11 @@ public void testGetCheckpointWithMissingIndex() throws Exception {
163170
164171 public void testGetCheckpointTimeoutExceeded () throws Exception {
165172 final String indexNamePrefix = "test_index-" ;
166- final int indices = 100 ;
173+ var indices = indices ( indexNamePrefix , 100 ) ;
167174 final int shards = 5 ;
168175
169- for (int i = 0 ; i < indices ; ++ i ) {
170- indicesAdmin ().prepareCreate (indexNamePrefix + i ).setSettings (indexSettings (shards , 0 )).get ();
176+ for (var index : indices ) {
177+ indicesAdmin ().prepareCreate (index ).setSettings (indexSettings (shards , 0 )).get ();
171178 }
172179
173180 final GetCheckpointAction .Request request = new GetCheckpointAction .Request (
@@ -184,7 +191,7 @@ public void testGetCheckpointTimeoutExceeded() throws Exception {
184191 finalException .set (e );
185192 latch .countDown ();
186193 }));
187- latch .await (10 , TimeUnit .SECONDS );
194+ assertTrue ( latch .await (10 , TimeUnit .SECONDS ) );
188195
189196 Exception e = finalException .get ();
190197 if (e != null ) {
@@ -198,5 +205,19 @@ public void testGetCheckpointTimeoutExceeded() throws Exception {
198205 // Due to system clock usage, the timeout does not always occur where it should.
199206 // We cannot mock the clock so we just have to live with it.
200207 }
208+ deleteIndices (indices );
209+ }
210+
211+ private List <String > indices (String prefix , int numberOfIndices ) {
212+ return IntStream .range (0 , numberOfIndices ).mapToObj (i -> prefix + i ).toList ();
213+ }
214+
215+ private void deleteIndices (List <String > indices ) {
216+ try {
217+ indicesAdmin ().prepareDelete (indices .toArray (new String [0 ])).get ();
218+ } catch (Exception e ) {
219+ // we can fail to clean up the indices, but this wouldn't impact other tests since the node gets torn down anyway
220+ // the index delete is to help the node tear down go smoother
221+ }
201222 }
202223}
0 commit comments