Skip to content

Commit b1ffae2

Browse files
committed
Cluster state and recovery constructs for in-place shard split
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent dbe98aa commit b1ffae2

File tree

13 files changed

+928
-39
lines changed

13 files changed

+928
-39
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.split;
10+
11+
import org.opensearch.cluster.ack.ClusterStateUpdateRequest;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
14+
/**
15+
* Cluster state update request for in-place shard split.
16+
*
17+
* @opensearch.experimental
18+
*/
19+
@ExperimentalApi
20+
public class InPlaceShardSplitClusterStateUpdateRequest extends ClusterStateUpdateRequest<InPlaceShardSplitClusterStateUpdateRequest> {
21+
22+
private final String index;
23+
private final int shardId;
24+
private final int splitInto;
25+
private final String cause;
26+
27+
public InPlaceShardSplitClusterStateUpdateRequest(String cause, String index, int shardId, int splitInto) {
28+
this.index = index;
29+
this.shardId = shardId;
30+
this.splitInto = splitInto;
31+
this.cause = cause;
32+
}
33+
34+
public String getIndex() {
35+
return index;
36+
}
37+
38+
public int getSplitInto() {
39+
return splitInto;
40+
}
41+
42+
public String cause() {
43+
return cause;
44+
}
45+
46+
public int getShardId() {
47+
return shardId;
48+
}
49+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* This package contains actions and requests for splitting indices.
11+
*/
12+
package org.opensearch.action.admin.indices.split;
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.Version;
15+
import org.opensearch.action.admin.indices.split.InPlaceShardSplitClusterStateUpdateRequest;
16+
import org.opensearch.cluster.AckedClusterStateUpdateTask;
17+
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
19+
import org.opensearch.cluster.node.DiscoveryNode;
20+
import org.opensearch.cluster.routing.RoutingTable;
21+
import org.opensearch.cluster.routing.ShardRouting;
22+
import org.opensearch.cluster.routing.allocation.AllocationService;
23+
import org.opensearch.cluster.service.ClusterManagerTask;
24+
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
25+
import org.opensearch.cluster.service.ClusterService;
26+
import org.opensearch.common.Priority;
27+
import org.opensearch.common.annotation.ExperimentalApi;
28+
import org.opensearch.core.action.ActionListener;
29+
30+
import java.util.function.BiFunction;
31+
32+
/**
33+
* Service responsible for applying in-place shard split requests to cluster state.
34+
*
35+
* @opensearch.experimental
36+
*/
37+
@ExperimentalApi
38+
public class MetadataInPlaceShardSplitService {
39+
private static final Logger logger = LogManager.getLogger(MetadataInPlaceShardSplitService.class);
40+
41+
private final ClusterService clusterService;
42+
private final AllocationService allocationService;
43+
private final ClusterManagerTaskThrottler.ThrottlingKey splitShardTaskKey;
44+
45+
public MetadataInPlaceShardSplitService(final ClusterService clusterService, final AllocationService allocationService) {
46+
this.clusterService = clusterService;
47+
this.allocationService = allocationService;
48+
this.splitShardTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTask.IN_PLACE_SHARD_SPLIT, true);
49+
}
50+
51+
/**
52+
* Submits a cluster state update task to split a shard in-place.
53+
*/
54+
public void split(final InPlaceShardSplitClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
55+
clusterService.submitStateUpdateTask(
56+
"in-place-split-shard [" + request.getShardId() + "] of index [" + request.getIndex() + "], cause [" + request.cause() + "]",
57+
new AckedClusterStateUpdateTask<>(Priority.URGENT, request, listener) {
58+
@Override
59+
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
60+
return new ClusterStateUpdateResponse(acknowledged);
61+
}
62+
63+
@Override
64+
public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() {
65+
return splitShardTaskKey;
66+
}
67+
68+
@Override
69+
public ClusterState execute(ClusterState currentState) {
70+
return applyShardSplitRequest(currentState, request, allocationService::reroute);
71+
}
72+
73+
@Override
74+
public void onFailure(String source, Exception e) {
75+
logger.trace(
76+
() -> new ParameterizedMessage(
77+
"[{}] of index [{}] failed to split online",
78+
request.getShardId(),
79+
request.getIndex()
80+
),
81+
e
82+
);
83+
super.onFailure(source, e);
84+
}
85+
}
86+
);
87+
}
88+
89+
/**
90+
* Applies a shard split request to the given cluster state. Updates the split metadata
91+
* in the index metadata and triggers a reroute so that child shards get allocated.
92+
*/
93+
static ClusterState applyShardSplitRequest(
94+
ClusterState currentState,
95+
InPlaceShardSplitClusterStateUpdateRequest request,
96+
BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable
97+
) {
98+
IndexMetadata curIndexMetadata = currentState.metadata().index(request.getIndex());
99+
if (curIndexMetadata == null) {
100+
throw new IllegalArgumentException("Index [" + request.getIndex() + "] not found");
101+
}
102+
103+
if (curIndexMetadata.getNumberOfVirtualShards() != -1) {
104+
throw new IllegalArgumentException(
105+
"In-place shard split is not supported on index [" + request.getIndex() + "] with virtual shards enabled"
106+
);
107+
}
108+
109+
if (currentState.nodes().getMinNodeVersion().equals(currentState.nodes().getMaxNodeVersion()) == false) {
110+
throw new IllegalArgumentException("In-place shard split is not supported in a mixed-version cluster");
111+
}
112+
113+
for (DiscoveryNode node : currentState.nodes()) {
114+
if (node.getVersion().before(Version.V_3_6_0)) {
115+
throw new IllegalArgumentException(
116+
"In-place shard split requires all nodes to be on version " + Version.V_3_6_0 + " or later"
117+
);
118+
}
119+
}
120+
121+
int shardId = request.getShardId();
122+
SplitShardsMetadata splitShardsMetadata = curIndexMetadata.getSplitShardsMetadata();
123+
124+
if (splitShardsMetadata.getInProgressSplitShardIds().contains(shardId)) {
125+
throw new IllegalArgumentException("Splitting of shard [" + shardId + "] is already in progress");
126+
}
127+
128+
if (splitShardsMetadata.isSplitParent(shardId)) {
129+
throw new IllegalArgumentException("Shard [" + shardId + "] has already been split.");
130+
}
131+
132+
ShardRouting primaryShard = currentState.routingTable()
133+
.shardRoutingTable(curIndexMetadata.getIndex().getName(), shardId)
134+
.primaryShard();
135+
if (primaryShard.relocating()) {
136+
throw new IllegalArgumentException(
137+
"Cannot split shard [" + shardId + "] on index [" + request.getIndex() + "] because it is currently relocating"
138+
);
139+
}
140+
141+
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
142+
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
143+
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(curIndexMetadata);
144+
145+
SplitShardsMetadata.Builder splitMetadataBuilder = new SplitShardsMetadata.Builder(splitShardsMetadata);
146+
splitMetadataBuilder.splitShard(shardId, request.getSplitInto());
147+
indexMetadataBuilder.splitShardsMetadata(splitMetadataBuilder.build());
148+
149+
RoutingTable routingTable = routingTableBuilder.build();
150+
metadataBuilder.put(indexMetadataBuilder);
151+
152+
ClusterState updatedState = ClusterState.builder(currentState).metadata(metadataBuilder).routingTable(routingTable).build();
153+
return rerouteRoutingTable.apply(updatedState, "shard [" + shardId + "] of index [" + request.getIndex() + "] split");
154+
}
155+
}

0 commit comments

Comments
 (0)