26
26
import org .elasticsearch .index .shard .ShardId ;
27
27
import org .elasticsearch .index .shard .ShardLongFieldRange ;
28
28
29
- import java .util .ArrayList ;
30
29
import java .util .List ;
31
30
import java .util .stream .IntStream ;
32
31
import java .util .stream .Stream ;
@@ -142,45 +141,57 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception {
142
141
assertSame (clusterState , executeTasks (clusterState , tasks ));
143
142
}
144
143
145
- public void testStartedShards () throws Exception {
144
+ public void testStartPrimary () throws Exception {
146
145
final String indexName = "test" ;
147
- final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING , ShardRoutingState . INITIALIZING );
146
+ final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING );
148
147
149
148
final IndexMetadata indexMetadata = clusterState .metadata ().index (indexName );
150
149
final ShardId shardId = new ShardId (indexMetadata .getIndex (), 0 );
151
150
final long primaryTerm = indexMetadata .primaryTerm (shardId .id ());
152
151
final ShardRouting primaryShard = clusterState .routingTable ().shardRoutingTable (shardId ).primaryShard ();
153
152
final String primaryAllocationId = primaryShard .allocationId ().getId ();
154
153
155
- final List <StartedShardUpdateTask > tasks = new ArrayList <>();
156
- tasks .add (
157
- new StartedShardUpdateTask (
158
- new StartedShardEntry (shardId , primaryAllocationId , primaryTerm , "test" , ShardLongFieldRange .UNKNOWN ),
159
- createTestListener ()
160
- )
154
+ final var task = new StartedShardUpdateTask (
155
+ new StartedShardEntry (shardId , primaryAllocationId , primaryTerm , "test" , ShardLongFieldRange .UNKNOWN ),
156
+ createTestListener ()
161
157
);
162
- if (randomBoolean ()) {
163
- final ShardRouting replicaShard = clusterState .routingTable ().shardRoutingTable (shardId ).replicaShards ().iterator ().next ();
164
- final String replicaAllocationId = replicaShard .allocationId ().getId ();
165
- tasks .add (
166
- new StartedShardUpdateTask (
167
- new StartedShardEntry (shardId , replicaAllocationId , primaryTerm , "test" , ShardLongFieldRange .UNKNOWN ),
168
- createTestListener ()
169
- )
170
- );
171
- }
172
158
173
- final var resultingState = executeTasks (clusterState , tasks );
159
+ final var resultingState = executeTasks (clusterState , List . of ( task ) );
174
160
assertNotSame (clusterState , resultingState );
175
- for (final var task : tasks ) {
176
- assertThat (
177
- resultingState .routingTable ()
178
- .shardRoutingTable (task .getEntry ().shardId )
179
- .getByAllocationId (task .getEntry ().allocationId )
180
- .state (),
181
- is (ShardRoutingState .STARTED )
182
- );
183
- }
161
+ assertThat (
162
+ resultingState .routingTable ()
163
+ .shardRoutingTable (task .getEntry ().shardId )
164
+ .getByAllocationId (task .getEntry ().allocationId )
165
+ .state (),
166
+ is (ShardRoutingState .STARTED )
167
+ );
168
+ }
169
+
170
+ public void testStartReplica () throws Exception {
171
+ final String indexName = "test" ;
172
+ final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .STARTED , ShardRoutingState .INITIALIZING );
173
+
174
+ final IndexMetadata indexMetadata = clusterState .metadata ().index (indexName );
175
+ final ShardId shardId = new ShardId (indexMetadata .getIndex (), 0 );
176
+ final long primaryTerm = indexMetadata .primaryTerm (shardId .id ());
177
+ final ShardRouting primaryShard = clusterState .routingTable ().shardRoutingTable (shardId ).primaryShard ();
178
+
179
+ final ShardRouting replicaShard = clusterState .routingTable ().shardRoutingTable (shardId ).replicaShards ().iterator ().next ();
180
+ final String replicaAllocationId = replicaShard .allocationId ().getId ();
181
+ final var task = new StartedShardUpdateTask (
182
+ new StartedShardEntry (shardId , replicaAllocationId , primaryTerm , "test" , ShardLongFieldRange .UNKNOWN ),
183
+ createTestListener ()
184
+ );
185
+
186
+ final var resultingState = executeTasks (clusterState , List .of (task ));
187
+ assertNotSame (clusterState , resultingState );
188
+ assertThat (
189
+ resultingState .routingTable ()
190
+ .shardRoutingTable (task .getEntry ().shardId )
191
+ .getByAllocationId (task .getEntry ().allocationId )
192
+ .state (),
193
+ is (ShardRoutingState .STARTED )
194
+ );
184
195
}
185
196
186
197
public void testDuplicateStartsAreOkay () throws Exception {
@@ -215,12 +226,12 @@ public void testDuplicateStartsAreOkay() throws Exception {
215
226
}
216
227
}
217
228
218
- public void testPrimaryTermsMismatch () throws Exception {
229
+ public void testPrimaryTermsMismatchOnPrimary () throws Exception {
219
230
final String indexName = "test" ;
220
231
final int shard = 0 ;
221
232
final int primaryTerm = 2 + randomInt (200 );
222
233
223
- ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING , ShardRoutingState . INITIALIZING );
234
+ ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING );
224
235
clusterState = ClusterState .builder (clusterState )
225
236
.metadata (
226
237
Metadata .builder (clusterState .metadata ())
@@ -272,8 +283,23 @@ public void testPrimaryTermsMismatch() throws Exception {
272
283
.state (),
273
284
is (ShardRoutingState .STARTED )
274
285
);
275
- clusterState = resultingState ;
276
286
}
287
+ }
288
+
289
+ public void testPrimaryTermsMismatchOnReplica () throws Exception {
290
+ final String indexName = "test" ;
291
+ final int shard = 0 ;
292
+ final int primaryTerm = 2 + randomInt (200 );
293
+
294
+ ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .STARTED , ShardRoutingState .INITIALIZING );
295
+ clusterState = ClusterState .builder (clusterState )
296
+ .metadata (
297
+ Metadata .builder (clusterState .metadata ())
298
+ .put (IndexMetadata .builder (clusterState .metadata ().index (indexName )).primaryTerm (shard , primaryTerm ).build (), true )
299
+ .build ()
300
+ )
301
+ .build ();
302
+ final ShardId shardId = new ShardId (clusterState .metadata ().index (indexName ).getIndex (), shard );
277
303
{
278
304
final long replicaPrimaryTerm = randomBoolean () ? primaryTerm : primaryTerm - 1 ;
279
305
final String replicaAllocationId = clusterState .routingTable ()
@@ -301,9 +327,9 @@ public void testPrimaryTermsMismatch() throws Exception {
301
327
}
302
328
}
303
329
304
- public void testExpandsTimestampRange () throws Exception {
330
+ public void testExpandsTimestampRangeForPrimary () throws Exception {
305
331
final String indexName = "test" ;
306
- final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING , ShardRoutingState . INITIALIZING );
332
+ final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .INITIALIZING );
307
333
308
334
final IndexMetadata indexMetadata = clusterState .metadata ().index (indexName );
309
335
final ShardId shardId = new ShardId (indexMetadata .getIndex (), 0 );
@@ -317,47 +343,66 @@ public void testExpandsTimestampRange() throws Exception {
317
343
: randomBoolean () ? ShardLongFieldRange .EMPTY
318
344
: ShardLongFieldRange .of (1606407943000L , 1606407944000L );
319
345
320
- final List <StartedShardUpdateTask > tasks = new ArrayList <>();
321
- tasks .add (
322
- new StartedShardUpdateTask (
323
- new StartedShardEntry (shardId , primaryAllocationId , primaryTerm , "test" , shardTimestampRange ),
324
- createTestListener ()
325
- )
346
+ final var task = new StartedShardUpdateTask (
347
+ new StartedShardEntry (shardId , primaryAllocationId , primaryTerm , "test" , shardTimestampRange ),
348
+ createTestListener ()
326
349
);
327
- if (randomBoolean ()) {
328
- final ShardRouting replicaShard = clusterState .routingTable ().shardRoutingTable (shardId ).replicaShards ().iterator ().next ();
329
- final String replicaAllocationId = replicaShard .allocationId ().getId ();
330
- tasks .add (
331
- new StartedShardUpdateTask (
332
- new StartedShardEntry (shardId , replicaAllocationId , primaryTerm , "test" , shardTimestampRange ),
333
- createTestListener ()
334
- )
335
- );
336
- }
337
- final var resultingState = executeTasks (clusterState , tasks );
350
+
351
+ final var resultingState = executeTasks (clusterState , List .of (task ));
338
352
assertNotSame (clusterState , resultingState );
339
- for (final var task : tasks ) {
340
- assertThat (
341
- resultingState .routingTable ()
342
- .shardRoutingTable (task .getEntry ().shardId )
343
- .getByAllocationId (task .getEntry ().allocationId )
344
- .state (),
345
- is (ShardRoutingState .STARTED )
346
- );
353
+ assertThat (
354
+ resultingState .routingTable ()
355
+ .shardRoutingTable (task .getEntry ().shardId )
356
+ .getByAllocationId (task .getEntry ().allocationId )
357
+ .state (),
358
+ is (ShardRoutingState .STARTED )
359
+ );
347
360
348
- final var timestampRange = resultingState .metadata ().index (indexName ).getTimestampRange ();
349
- if (shardTimestampRange == ShardLongFieldRange .UNKNOWN ) {
350
- assertThat (timestampRange , sameInstance (IndexLongFieldRange .UNKNOWN ));
351
- } else if (shardTimestampRange == ShardLongFieldRange .EMPTY ) {
352
- assertThat (timestampRange , sameInstance (IndexLongFieldRange .EMPTY ));
353
- } else {
354
- assertTrue (timestampRange .isComplete ());
355
- assertThat (timestampRange .getMin (), equalTo (shardTimestampRange .getMin ()));
356
- assertThat (timestampRange .getMax (), equalTo (shardTimestampRange .getMax ()));
357
- }
361
+ final var timestampRange = resultingState .metadata ().index (indexName ).getTimestampRange ();
362
+ if (shardTimestampRange == ShardLongFieldRange .UNKNOWN ) {
363
+ assertThat (timestampRange , sameInstance (IndexLongFieldRange .UNKNOWN ));
364
+ } else if (shardTimestampRange == ShardLongFieldRange .EMPTY ) {
365
+ assertThat (timestampRange , sameInstance (IndexLongFieldRange .EMPTY ));
366
+ } else {
367
+ assertTrue (timestampRange .isComplete ());
368
+ assertThat (timestampRange .getMin (), equalTo (shardTimestampRange .getMin ()));
369
+ assertThat (timestampRange .getMax (), equalTo (shardTimestampRange .getMax ()));
358
370
}
359
371
}
360
372
373
+ public void testExpandsTimestampRangeForReplica () throws Exception {
374
+ final String indexName = "test" ;
375
+ final ClusterState clusterState = state (indexName , randomBoolean (), ShardRoutingState .STARTED , ShardRoutingState .INITIALIZING );
376
+
377
+ final IndexMetadata indexMetadata = clusterState .metadata ().index (indexName );
378
+ final ShardId shardId = new ShardId (indexMetadata .getIndex (), 0 );
379
+ final long primaryTerm = indexMetadata .primaryTerm (shardId .id ());
380
+
381
+ assertThat (indexMetadata .getTimestampRange (), sameInstance (IndexLongFieldRange .UNKNOWN ));
382
+
383
+ final ShardLongFieldRange shardTimestampRange = randomBoolean () ? ShardLongFieldRange .UNKNOWN
384
+ : randomBoolean () ? ShardLongFieldRange .EMPTY
385
+ : ShardLongFieldRange .of (1606407943000L , 1606407944000L );
386
+
387
+ final ShardRouting replicaShard = clusterState .routingTable ().shardRoutingTable (shardId ).replicaShards ().iterator ().next ();
388
+ final String replicaAllocationId = replicaShard .allocationId ().getId ();
389
+ final var task = new StartedShardUpdateTask (
390
+ new StartedShardEntry (shardId , replicaAllocationId , primaryTerm , "test" , shardTimestampRange ),
391
+ createTestListener ()
392
+ );
393
+ final var resultingState = executeTasks (clusterState , List .of (task ));
394
+ assertNotSame (clusterState , resultingState );
395
+ assertThat (
396
+ resultingState .routingTable ()
397
+ .shardRoutingTable (task .getEntry ().shardId )
398
+ .getByAllocationId (task .getEntry ().allocationId )
399
+ .state (),
400
+ is (ShardRoutingState .STARTED )
401
+ );
402
+
403
+ assertThat (resultingState .metadata ().index (indexName ).getTimestampRange (), sameInstance (IndexLongFieldRange .UNKNOWN ));
404
+ }
405
+
361
406
private ClusterState executeTasks (final ClusterState state , final List <StartedShardUpdateTask > tasks ) throws Exception {
362
407
return ClusterStateTaskExecutorUtils .executeAndAssertSuccessful (state , executor , tasks );
363
408
}
0 commit comments