Skip to content

Commit 8600fa4

Browse files
committed
Cluster state and recovery constructs for in-place shard split
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent 568189b commit 8600fa4

File tree

12 files changed

+865
-37
lines changed

12 files changed

+865
-37
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.split;
10+
11+
import org.opensearch.cluster.ack.ClusterStateUpdateRequest;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
14+
/**
15+
* Cluster state update request for in-place shard split.
16+
*
17+
* @opensearch.experimental
18+
*/
19+
@ExperimentalApi
20+
public class InPlaceShardSplitClusterStateUpdateRequest extends ClusterStateUpdateRequest<InPlaceShardSplitClusterStateUpdateRequest> {
21+
22+
private final String index;
23+
private final int shardId;
24+
private final int splitInto;
25+
private final String cause;
26+
27+
public InPlaceShardSplitClusterStateUpdateRequest(String cause, String index, int shardId, int splitInto) {
28+
this.index = index;
29+
this.shardId = shardId;
30+
this.splitInto = splitInto;
31+
this.cause = cause;
32+
}
33+
34+
public String getIndex() {
35+
return index;
36+
}
37+
38+
public int getSplitInto() {
39+
return splitInto;
40+
}
41+
42+
public String cause() {
43+
return cause;
44+
}
45+
46+
public int getShardId() {
47+
return shardId;
48+
}
49+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.action.admin.indices.split.InPlaceShardSplitClusterStateUpdateRequest;
15+
import org.opensearch.cluster.AckedClusterStateUpdateTask;
16+
import org.opensearch.cluster.ClusterState;
17+
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
18+
import org.opensearch.cluster.routing.RoutingTable;
19+
import org.opensearch.cluster.routing.allocation.AllocationService;
20+
import org.opensearch.cluster.service.ClusterManagerTask;
21+
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
22+
import org.opensearch.cluster.service.ClusterService;
23+
import org.opensearch.common.Priority;
24+
import org.opensearch.common.annotation.ExperimentalApi;
25+
import org.opensearch.core.action.ActionListener;
26+
27+
import java.util.function.BiFunction;
28+
29+
/**
30+
* Service responsible for applying in-place shard split requests to cluster state.
31+
*
32+
* @opensearch.experimental
33+
*/
34+
@ExperimentalApi
35+
public class MetadataInPlaceShardSplitService {
36+
private static final Logger logger = LogManager.getLogger(MetadataInPlaceShardSplitService.class);
37+
38+
private final ClusterService clusterService;
39+
private final AllocationService allocationService;
40+
private final ClusterManagerTaskThrottler.ThrottlingKey splitShardTaskKey;
41+
42+
public MetadataInPlaceShardSplitService(final ClusterService clusterService, final AllocationService allocationService) {
43+
this.clusterService = clusterService;
44+
this.allocationService = allocationService;
45+
this.splitShardTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTask.IN_PLACE_SHARD_SPLIT, true);
46+
}
47+
48+
/**
49+
* Submits a cluster state update task to split a shard in-place.
50+
*/
51+
public void split(final InPlaceShardSplitClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
52+
clusterService.submitStateUpdateTask(
53+
"in-place-split-shard [" + request.getShardId() + "] of index [" + request.getIndex() + "], cause [" + request.cause() + "]",
54+
new AckedClusterStateUpdateTask<>(Priority.URGENT, request, listener) {
55+
@Override
56+
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
57+
return new ClusterStateUpdateResponse(acknowledged);
58+
}
59+
60+
@Override
61+
public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() {
62+
return splitShardTaskKey;
63+
}
64+
65+
@Override
66+
public ClusterState execute(ClusterState currentState) {
67+
return applyShardSplitRequest(currentState, request, allocationService::reroute);
68+
}
69+
70+
@Override
71+
public void onFailure(String source, Exception e) {
72+
logger.trace(
73+
() -> new ParameterizedMessage(
74+
"[{}] of index [{}] failed to split online",
75+
request.getShardId(),
76+
request.getIndex()
77+
),
78+
e
79+
);
80+
super.onFailure(source, e);
81+
}
82+
}
83+
);
84+
}
85+
86+
/**
87+
* Applies a shard split request to the given cluster state. Updates the split metadata
88+
* in the index metadata and triggers a reroute so that child shards get allocated.
89+
*/
90+
static ClusterState applyShardSplitRequest(
91+
ClusterState currentState,
92+
InPlaceShardSplitClusterStateUpdateRequest request,
93+
BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable
94+
) {
95+
IndexMetadata curIndexMetadata = currentState.metadata().index(request.getIndex());
96+
if (curIndexMetadata == null) {
97+
throw new IllegalArgumentException("Index [" + request.getIndex() + "] not found");
98+
}
99+
100+
if (curIndexMetadata.getNumberOfVirtualShards() != -1) {
101+
throw new IllegalArgumentException(
102+
"In-place shard split is not supported on index [" + request.getIndex() + "] with virtual shards enabled"
103+
);
104+
}
105+
106+
int shardId = request.getShardId();
107+
SplitShardsMetadata splitShardsMetadata = curIndexMetadata.getSplitShardsMetadata();
108+
109+
if (splitShardsMetadata.getInProgressSplitShardIds().contains(shardId)) {
110+
throw new IllegalArgumentException("Splitting of shard [" + shardId + "] is already in progress");
111+
}
112+
113+
if (splitShardsMetadata.isSplitParent(shardId)) {
114+
throw new IllegalArgumentException("Shard [" + shardId + "] has already been split.");
115+
}
116+
117+
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
118+
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
119+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(curIndexMetadata);
120+
121+
SplitShardsMetadata.Builder splitMetadataBuilder = new SplitShardsMetadata.Builder(splitShardsMetadata);
122+
splitMetadataBuilder.splitShard(shardId, request.getSplitInto());
123+
indexMetadataBuilder.splitShardsMetadata(splitMetadataBuilder.build());
124+
125+
RoutingTable routingTable = routingTableBuilder.build();
126+
metadataBuilder.put(indexMetadataBuilder);
127+
128+
ClusterState updatedState = ClusterState.builder(currentState).metadata(metadataBuilder).routingTable(routingTable).build();
129+
return rerouteRoutingTable.apply(updatedState, "shard [" + shardId + "] of index [" + request.getIndex() + "] split");
130+
}
131+
}

0 commit comments

Comments
 (0)