Skip to content

Commit 8b460cb

Browse files
A few additional TransportAction subclasses implements interface TransportIndicesResolvingAction
1 parent 9c61172 commit 8b460cb

File tree

9 files changed

+178
-12
lines changed

9 files changed

+178
-12
lines changed

server/src/main/java/org/opensearch/action/OriginalIndices.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939

4040
import java.io.IOException;
4141
import java.util.Arrays;
42+
import java.util.LinkedHashSet;
43+
import java.util.Set;
4244

4345
/**
4446
* Used to keep track of original indices within internal (e.g. shard level) requests
@@ -92,4 +94,20 @@ public static void writeOriginalIndices(OriginalIndices originalIndices, StreamO
9294
public String toString() {
9395
return "OriginalIndices{" + "indices=" + Arrays.toString(indices) + ", indicesOptions=" + indicesOptions + '}';
9496
}
97+
98+
public OriginalIndices mergeWith(OriginalIndices value) {
99+
if(value == null) {
100+
return this;
101+
}
102+
Set<String> set = new LinkedHashSet<>();
103+
if (this.indices != null) {
104+
java.util.Collections.addAll(set, this.indices);
105+
}
106+
if (value.indices != null) {
107+
java.util.Collections.addAll(set, value.indices);
108+
}
109+
String[] newIndices = set.toArray(new String[0]);
110+
IndicesOptions newOptions = this.indicesOptions.mergeWith(value.indicesOptions);
111+
return new OriginalIndices(newIndices, newOptions);
112+
}
95113
}

server/src/main/java/org/opensearch/action/get/TransportMultiGetAction.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
import org.opensearch.action.RoutingMissingException;
3636
import org.opensearch.action.support.ActionFilters;
3737
import org.opensearch.action.support.HandledTransportAction;
38+
import org.opensearch.action.support.TransportIndicesResolvingAction;
3839
import org.opensearch.cluster.ClusterState;
3940
import org.opensearch.cluster.block.ClusterBlockLevel;
4041
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
4142
import org.opensearch.cluster.metadata.Metadata;
43+
import org.opensearch.cluster.metadata.ResolvedIndices;
4244
import org.opensearch.cluster.routing.Preference;
4345
import org.opensearch.cluster.service.ClusterService;
4446
import org.opensearch.common.inject.Inject;
@@ -48,7 +50,9 @@
4850
import org.opensearch.tasks.Task;
4951
import org.opensearch.transport.TransportService;
5052

53+
import java.util.ArrayList;
5154
import java.util.HashMap;
55+
import java.util.List;
5256
import java.util.Map;
5357
import java.util.concurrent.atomic.AtomicInteger;
5458

@@ -57,7 +61,7 @@
5761
*
5862
* @opensearch.internal
5963
*/
60-
public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequest, MultiGetResponse> {
64+
public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequest, MultiGetResponse> implements TransportIndicesResolvingAction<MultiGetRequest> {
6165

6266
private final ClusterService clusterService;
6367
private final TransportShardMultiGetAction shardAction;
@@ -174,4 +178,24 @@ private void finishHim() {
174178
private static MultiGetItemResponse newItemFailure(String index, String id, Exception exception) {
175179
return new MultiGetItemResponse(null, new MultiGetResponse.Failure(index, id, exception));
176180
}
181+
182+
@Override
183+
public ResolvedIndices resolveIndices(MultiGetRequest request) {
184+
// TODO do we need these preconditions?
185+
ClusterState clusterState = clusterService.state();
186+
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
187+
188+
List<String> indices = new ArrayList<>();
189+
190+
for (MultiGetRequest.Item item : request.items) {
191+
try {
192+
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName();
193+
indices.add(concreteSingleIndex);
194+
} catch (Exception e) {
195+
// Multi get should not fail if one request is invalid
196+
}
197+
198+
}
199+
return ResolvedIndices.of(indices);
200+
}
177201
}

server/src/main/java/org/opensearch/action/search/CreatePitController.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.StepListener;
1616
import org.opensearch.action.support.GroupedActionListener;
1717
import org.opensearch.cluster.ClusterState;
18+
import org.opensearch.cluster.metadata.ResolvedIndices;
1819
import org.opensearch.cluster.node.DiscoveryNode;
1920
import org.opensearch.cluster.service.ClusterService;
2021
import org.opensearch.common.inject.Inject;
@@ -86,11 +87,7 @@ public void executeCreatePit(
8687
StepListener<SearchResponse> createPitListener,
8788
ActionListener<CreatePitResponse> updatePitIdListener
8889
) {
89-
SearchRequest searchRequest = new SearchRequest(request.getIndices());
90-
searchRequest.preference(request.getPreference());
91-
searchRequest.routing(request.getRouting());
92-
searchRequest.indicesOptions(request.getIndicesOptions());
93-
searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation());
90+
SearchRequest searchRequest = request.toSearchRequest();
9491
SearchTask searchTask = searchRequest.createTask(
9592
task.getId(),
9693
task.getType(),
@@ -326,4 +323,10 @@ public void onFailure(Exception e) {
326323
}
327324
pitService.deletePitContexts(nodeToContextsMap, deleteListener);
328325
}
326+
327+
ResolvedIndices resolveIndices(CreatePitRequest request) {
328+
SearchRequest searchRequest = request.toSearchRequest();
329+
searchRequest.setCcsMinimizeRoundtrips(false);
330+
return transportSearchAction.resolveIndices(searchRequest);
331+
}
329332
}

server/src/main/java/org/opensearch/action/search/CreatePitRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
196196
}
197197
return builder;
198198
}
199+
200+
SearchRequest toSearchRequest() {
201+
SearchRequest searchRequest = new SearchRequest(this.getIndices());
202+
searchRequest.preference(this.getPreference());
203+
searchRequest.routing(this.getRouting());
204+
searchRequest.indicesOptions(this.getIndicesOptions());
205+
searchRequest.allowPartialSearchResults(this.shouldAllowPartialPitCreation());
206+
return searchRequest;
207+
}
199208
}

server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.opensearch.action.StepListener;
1313
import org.opensearch.action.support.ActionFilters;
1414
import org.opensearch.action.support.HandledTransportAction;
15+
import org.opensearch.action.support.TransportIndicesResolvingAction;
16+
import org.opensearch.cluster.metadata.ResolvedIndices;
1517
import org.opensearch.cluster.service.ClusterService;
1618
import org.opensearch.common.inject.Inject;
1719
import org.opensearch.common.unit.TimeValue;
@@ -32,7 +34,8 @@
3234
/**
3335
* Transport action for creating PIT reader context
3436
*/
35-
public class TransportCreatePitAction extends HandledTransportAction<CreatePitRequest, CreatePitResponse> {
37+
public class TransportCreatePitAction extends HandledTransportAction<CreatePitRequest, CreatePitResponse> implements
38+
TransportIndicesResolvingAction<CreatePitRequest> {
3639

3740
public static final String CREATE_PIT_ACTION = "create_pit";
3841
private final TransportService transportService;
@@ -76,6 +79,11 @@ protected void doExecute(Task task, CreatePitRequest request, ActionListener<Cre
7679
createPitController.executeCreatePit(request, task, createPitListener, updatePitIdListener);
7780
}
7881

82+
@Override
83+
public ResolvedIndices resolveIndices(CreatePitRequest request) {
84+
return createPitController.resolveIndices(request);
85+
}
86+
7987
/**
8088
* Request to create pit reader context with keep alive
8189
*/

server/src/main/java/org/opensearch/action/search/TransportMultiSearchAction.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434

3535
import org.opensearch.action.support.ActionFilters;
3636
import org.opensearch.action.support.HandledTransportAction;
37+
import org.opensearch.action.support.TransportIndicesResolvingAction;
3738
import org.opensearch.cluster.ClusterState;
3839
import org.opensearch.cluster.block.ClusterBlockLevel;
40+
import org.opensearch.cluster.metadata.ResolvedIndices;
3941
import org.opensearch.cluster.service.ClusterService;
4042
import org.opensearch.common.inject.Inject;
4143
import org.opensearch.common.settings.Settings;
@@ -51,6 +53,9 @@
5153
import org.opensearch.transport.TransportService;
5254
import org.opensearch.transport.client.node.NodeClient;
5355

56+
import java.util.ArrayList;
57+
import java.util.Collections;
58+
import java.util.List;
5459
import java.util.Queue;
5560
import java.util.concurrent.ConcurrentLinkedQueue;
5661
import java.util.concurrent.TimeUnit;
@@ -62,13 +67,15 @@
6267
*
6368
* @opensearch.internal
6469
*/
65-
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
70+
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> implements
71+
TransportIndicesResolvingAction<MultiSearchRequest> {
6672

6773
private final int allocatedProcessors;
6874
private final ThreadPool threadPool;
6975
private final ClusterService clusterService;
7076
private final LongSupplier relativeTimeProvider;
7177
private final NodeClient client;
78+
private final TransportSearchAction transportSearchAction;
7279

7380
@Inject
7481
public TransportMultiSearchAction(
@@ -77,14 +84,16 @@ public TransportMultiSearchAction(
7784
TransportService transportService,
7885
ClusterService clusterService,
7986
ActionFilters actionFilters,
80-
NodeClient client
87+
NodeClient client,
88+
TransportSearchAction transportSearchAction
8189
) {
8290
super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new);
8391
this.threadPool = threadPool;
8492
this.clusterService = clusterService;
8593
this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
8694
this.relativeTimeProvider = System::nanoTime;
8795
this.client = client;
96+
this.transportSearchAction = transportSearchAction;
8897
}
8998

9099
TransportMultiSearchAction(
@@ -94,14 +103,16 @@ public TransportMultiSearchAction(
94103
ClusterService clusterService,
95104
int allocatedProcessors,
96105
LongSupplier relativeTimeProvider,
97-
NodeClient client
106+
NodeClient client,
107+
TransportSearchAction transportSearchAction
98108
) {
99109
super(MultiSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<MultiSearchRequest>) MultiSearchRequest::new);
100110
this.threadPool = threadPool;
101111
this.clusterService = clusterService;
102112
this.allocatedProcessors = allocatedProcessors;
103113
this.relativeTimeProvider = relativeTimeProvider;
104114
this.client = client;
115+
this.transportSearchAction = transportSearchAction;
105116
}
106117

107118
@Override
@@ -244,6 +255,17 @@ private boolean isCancelled(TaskId taskId) {
244255
return false;
245256
}
246257

258+
@Override
259+
public ResolvedIndices resolveIndices(MultiSearchRequest request) {
260+
List<SearchRequest> requests = request.requests();
261+
List<ResolvedIndices> resolvedIndicesList = new ArrayList<>();
262+
for(SearchRequest searchRequest : requests) {
263+
ResolvedIndices resolvedIndices = transportSearchAction.resolveIndices(searchRequest);
264+
resolvedIndicesList.add(resolvedIndices);
265+
}
266+
return resolvedIndicesList.stream().reduce(ResolvedIndices.of(Collections.emptySet()), ResolvedIndices::add);
267+
}
268+
247269
/**
248270
* Slots a search request
249271
*

server/src/main/java/org/opensearch/action/support/IndicesOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,14 @@ public static IndicesOptions lenientExpandHidden() {
654654
return LENIENT_EXPAND_OPEN_CLOSED_HIDDEN;
655655
}
656656

657+
public IndicesOptions mergeWith(IndicesOptions indicesOptions) {
658+
EnumSet<Option> newOptions = EnumSet.copyOf(this.options);
659+
EnumSet<WildcardStates> newExpandWildcards = EnumSet.copyOf(this.expandWildcards);
660+
newOptions.addAll(indicesOptions.options);
661+
newExpandWildcards.addAll(indicesOptions.expandWildcards);
662+
return new IndicesOptions(newOptions, newExpandWildcards);
663+
}
664+
657665
@Override
658666
public boolean equals(Object obj) {
659667
if (obj == null) {

server/src/main/java/org/opensearch/action/termvectors/TransportMultiTermVectorsAction.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import org.opensearch.action.RoutingMissingException;
3636
import org.opensearch.action.support.ActionFilters;
3737
import org.opensearch.action.support.HandledTransportAction;
38+
import org.opensearch.action.support.TransportIndicesResolvingAction;
3839
import org.opensearch.cluster.ClusterState;
3940
import org.opensearch.cluster.block.ClusterBlockLevel;
4041
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
42+
import org.opensearch.cluster.metadata.ResolvedIndices;
4143
import org.opensearch.cluster.service.ClusterService;
4244
import org.opensearch.common.inject.Inject;
4345
import org.opensearch.common.util.concurrent.AtomicArray;
@@ -47,7 +49,9 @@
4749
import org.opensearch.tasks.Task;
4850
import org.opensearch.transport.TransportService;
4951

52+
import java.util.ArrayList;
5053
import java.util.HashMap;
54+
import java.util.List;
5155
import java.util.Map;
5256
import java.util.concurrent.atomic.AtomicInteger;
5357

@@ -56,7 +60,8 @@
5660
*
5761
* @opensearch.internal
5862
*/
59-
public class TransportMultiTermVectorsAction extends HandledTransportAction<MultiTermVectorsRequest, MultiTermVectorsResponse> {
63+
public class TransportMultiTermVectorsAction extends HandledTransportAction<MultiTermVectorsRequest, MultiTermVectorsResponse> implements
64+
TransportIndicesResolvingAction<MultiTermVectorsRequest> {
6065

6166
private final ClusterService clusterService;
6267
private final TransportShardMultiTermsVectorAction shardAction;
@@ -186,4 +191,27 @@ private void finishHim() {
186191
});
187192
}
188193
}
194+
195+
@Override
196+
public ResolvedIndices resolveIndices(MultiTermVectorsRequest request) {
197+
ClusterState clusterState = clusterService.state();
198+
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
199+
List<String> indices = new ArrayList<>();
200+
201+
202+
Map<ShardId, MultiTermVectorsShardRequest> shardRequests = new HashMap<>();
203+
for (int i = 0; i < request.requests.size(); i++) {
204+
TermVectorsRequest termVectorsRequest = request.requests.get(i);
205+
String routing = clusterState.metadata().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index());
206+
if (!clusterState.metadata().hasConcreteIndex(termVectorsRequest.index())) {
207+
continue;
208+
}
209+
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
210+
if (routing == null && clusterState.getMetadata().routingRequired(concreteSingleIndex)) {
211+
continue;
212+
}
213+
indices.add(concreteSingleIndex);
214+
}
215+
return ResolvedIndices.of(indices);
216+
}
189217
}

server/src/main/java/org/opensearch/cluster/metadata/ResolvedIndices.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ public static ResolvedIndices ofNonNull(String... indices) {
7171
return new ResolvedIndices(new Local(Collections.unmodifiableSet(indexSet), null, false), Collections.emptyMap());
7272
}
7373

74-
private static final ResolvedIndices ALL = new ResolvedIndices(new Local(Set.of(Metadata.ALL), null, true), Collections.emptyMap());
74+
private static final Local LOCAL_ALL = new Local(Set.of(Metadata.ALL), null, true);
75+
private static final ResolvedIndices ALL = new ResolvedIndices(LOCAL_ALL, Collections.emptyMap());
76+
7577

7678
private final Local local;
7779
private final Map<String, OriginalIndices> remote;
@@ -81,6 +83,37 @@ private ResolvedIndices(Local local, Map<String, OriginalIndices> remote) {
8183
this.remote = remote;
8284
}
8385

86+
private ResolvedIndices addLocal(Local local) {
87+
if(local == null || local().isEmpty()) {
88+
return this;
89+
}
90+
return new ResolvedIndices(this.local.addLocal(local), this.remote);
91+
}
92+
93+
private ResolvedIndices addRemote(Map<String, OriginalIndices> remote) {
94+
if(remote == null || remote.isEmpty()) {
95+
return this;
96+
}
97+
Map<String, OriginalIndices> newRemote = new HashMap<>(this.remote);
98+
for(Map.Entry<String, OriginalIndices> entry : remote.entrySet()) {
99+
if (newRemote.containsKey(entry.getKey())) {
100+
OriginalIndices originalIndices = newRemote.get(entry.getKey());
101+
OriginalIndices newOriginalIndices = originalIndices.mergeWith(entry.getValue());
102+
newRemote.put(entry.getKey(), newOriginalIndices);
103+
} else {
104+
newRemote.put(entry.getKey(), entry.getValue());
105+
}
106+
}
107+
return new ResolvedIndices(this.local, Collections.unmodifiableMap(newRemote));
108+
}
109+
110+
public ResolvedIndices add(ResolvedIndices other) {
111+
if (other == null || other.isEmpty()) {
112+
return this;
113+
}
114+
return this.addLocal(other.local).addRemote(other.remote);
115+
}
116+
84117
public Local local() {
85118
return this.local;
86119
}
@@ -149,6 +182,19 @@ public boolean contains(String index) {
149182
return this.names.contains(index);
150183
}
151184
}
185+
186+
public Local addLocal(Local local) {
187+
if(this.isAll) {
188+
return this;
189+
}
190+
if(local.isAll) {
191+
return local;
192+
}
193+
Set<String> namesSum = new HashSet<>(this.names);
194+
namesSum.addAll(local.names);
195+
OriginalIndices newOriginalIndices = this.originalIndices == null ? local.originalIndices : this.originalIndices.mergeWith(local.originalIndices);
196+
return new Local(Collections.unmodifiableSet(namesSum), newOriginalIndices, isAll);
197+
}
152198
}
153199

154200
}

0 commit comments

Comments
 (0)