|
63 | 63 | import org.elasticsearch.repositories.RepositoryException;
|
64 | 64 | import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
65 | 65 | import org.elasticsearch.snapshots.mockstore.MockRepository;
|
| 66 | +import org.elasticsearch.test.ClusterServiceUtils; |
66 | 67 | import org.elasticsearch.threadpool.ThreadPool;
|
67 | 68 |
|
68 | 69 | import java.nio.channels.SeekableByteChannel;
|
|
76 | 77 | import java.util.List;
|
77 | 78 | import java.util.Map;
|
78 | 79 | import java.util.Set;
|
| 80 | +import java.util.concurrent.CyclicBarrier; |
79 | 81 | import java.util.concurrent.ExecutionException;
|
80 | 82 | import java.util.concurrent.TimeUnit;
|
| 83 | +import java.util.concurrent.atomic.AtomicBoolean; |
81 | 84 | import java.util.concurrent.atomic.AtomicInteger;
|
82 | 85 | import java.util.function.Consumer;
|
83 | 86 | import java.util.function.Predicate;
|
|
87 | 90 | import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
88 | 91 | import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
|
89 | 92 | import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
|
| 93 | +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; |
90 | 94 | import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT;
|
91 | 95 | import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
92 | 96 | import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT;
|
@@ -1405,6 +1409,88 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
|
1405 | 1409 | assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
|
1406 | 1410 | }
|
1407 | 1411 |
|
| 1412 | + public void testCloseOrReallocateDuringPartialSnapshot() { |
| 1413 | + final var repoName = randomIdentifier(); |
| 1414 | + createRepository(repoName, "mock"); |
| 1415 | + |
| 1416 | + final var blockingNode = internalCluster().startNode( |
| 1417 | + Settings.builder().put(NODE_ROLES_SETTING.getKey(), "data").put("thread_pool." + ThreadPool.Names.SNAPSHOT + ".max", 1) |
| 1418 | + ); |
| 1419 | + |
| 1420 | + // blocking the snapshot thread pool to ensure that we only retain the shard lock while actively running snapshot tasks |
| 1421 | + final var barrier = new CyclicBarrier(2); |
| 1422 | + final var keepGoing = new AtomicBoolean(true); |
| 1423 | + final var blockingNodeExecutor = internalCluster().getInstance(ThreadPool.class, blockingNode).executor(ThreadPool.Names.SNAPSHOT); |
| 1424 | + blockingNodeExecutor.execute(new Runnable() { |
| 1425 | + @Override |
| 1426 | + public void run() { |
| 1427 | + safeAwait(barrier); |
| 1428 | + safeAwait(barrier); |
| 1429 | + if (keepGoing.get()) { |
| 1430 | + blockingNodeExecutor.execute(this); |
| 1431 | + } |
| 1432 | + } |
| 1433 | + }); |
| 1434 | + |
| 1435 | + final var indexName = randomIdentifier(); |
| 1436 | + createIndex(indexName, indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", blockingNode).build()); |
| 1437 | + indexRandomDocs(indexName, between(1, 100)); |
| 1438 | + flushAndRefresh(indexName); |
| 1439 | + |
| 1440 | + safeAwait(barrier); |
| 1441 | + |
| 1442 | + final var snapshotName = randomIdentifier(); |
| 1443 | + final var partialSnapshot = randomBoolean(); |
| 1444 | + ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin().prepareCreateSnapshot( |
| 1445 | + TEST_REQUEST_TIMEOUT, |
| 1446 | + repoName, |
| 1447 | + snapshotName |
| 1448 | + ).setIndices(indexName).setWaitForCompletion(true).setPartial(partialSnapshot).execute(); |
| 1449 | + |
| 1450 | + // we have currently blocked the start-snapshot task from running, and it will be followed by at least three blob uploads |
| 1451 | + // (segments_N, .cfe, .cfs), executed one-at-a-time because of throttling to the max threadpool size, so it's safe to let up to |
| 1452 | + // three tasks through without the snapshot being able to complete |
| 1453 | + final var snapshotTasks = between(0, 3); |
| 1454 | + logger.info("--> running (at most) {} tasks", snapshotTasks); |
| 1455 | + for (int i = 0; i < snapshotTasks; i++) { |
| 1456 | + safeAwait(barrier); |
| 1457 | + safeAwait(barrier); |
| 1458 | + } |
| 1459 | + assertFalse(snapshotFuture.isDone()); |
| 1460 | + |
| 1461 | + try { |
| 1462 | + if (partialSnapshot && randomBoolean()) { |
| 1463 | + logger.info("--> closing index [{}]", indexName); |
| 1464 | + safeGet(indicesAdmin().prepareClose(indexName).execute()); |
| 1465 | + ensureGreen(indexName); |
| 1466 | + } else { |
| 1467 | + logger.info("--> failing index [{}] to trigger recovery", indexName); |
| 1468 | + IndexShard indexShard = null; |
| 1469 | + for (IndexService indexService : internalCluster().getInstance(IndicesService.class, blockingNode)) { |
| 1470 | + if (indexService.index().getName().equals(indexName)) { |
| 1471 | + indexShard = indexService.getShard(0); |
| 1472 | + break; |
| 1473 | + } |
| 1474 | + } |
| 1475 | + assertNotNull(indexShard); |
| 1476 | + final var primaryTerm = indexShard.getOperationPrimaryTerm(); |
| 1477 | + indexShard.failShard("simulated", new ElasticsearchException("simulated")); |
| 1478 | + safeAwait( |
| 1479 | + ClusterServiceUtils.addTemporaryStateListener( |
| 1480 | + internalCluster().getInstance(ClusterService.class), |
| 1481 | + cs -> cs.metadata().index(indexName).primaryTerm(0) > primaryTerm |
| 1482 | + ) |
| 1483 | + ); |
| 1484 | + ensureGreen(indexName); |
| 1485 | + } |
| 1486 | + } finally { |
| 1487 | + keepGoing.set(false); |
| 1488 | + safeAwait(barrier); |
| 1489 | + } |
| 1490 | + |
| 1491 | + safeGet(snapshotFuture); |
| 1492 | + } |
| 1493 | + |
1408 | 1494 | public void testCloseIndexDuringRestore() throws Exception {
|
1409 | 1495 | Client client = client();
|
1410 | 1496 |
|
|
0 commit comments