|
5 | 5 |
|
6 | 6 | package org.opensearch.ml.action.connector; |
7 | 7 |
|
8 | | -import static org.opensearch.ml.common.CommonValue.TENANT_ID_FIELD; |
9 | 8 | import static org.opensearch.ml.utils.RestActionUtils.wrapListenerToHandleSearchIndexNotFound; |
10 | 9 |
|
11 | 10 | import java.util.ArrayList; |
|
15 | 14 | import java.util.stream.Collectors; |
16 | 15 |
|
17 | 16 | import org.opensearch.ExceptionsHelper; |
| 17 | +import org.opensearch.OpenSearchStatusException; |
18 | 18 | import org.opensearch.action.search.SearchRequest; |
19 | 19 | import org.opensearch.action.search.SearchResponse; |
20 | 20 | import org.opensearch.action.search.ShardSearchFailure; |
|
25 | 25 | import org.opensearch.common.util.concurrent.ThreadContext; |
26 | 26 | import org.opensearch.commons.authuser.User; |
27 | 27 | import org.opensearch.core.action.ActionListener; |
| 28 | +import org.opensearch.core.rest.RestStatus; |
28 | 29 | import org.opensearch.index.IndexNotFoundException; |
29 | | -import org.opensearch.index.query.BoolQueryBuilder; |
30 | | -import org.opensearch.index.query.QueryBuilders; |
31 | 30 | import org.opensearch.ml.common.CommonValue; |
32 | 31 | import org.opensearch.ml.common.connector.HttpConnector; |
33 | 32 | import org.opensearch.ml.common.transport.connector.MLConnectorSearchAction; |
|
37 | 36 | import org.opensearch.ml.utils.RestActionUtils; |
38 | 37 | import org.opensearch.ml.utils.TenantAwareHelper; |
39 | 38 | import org.opensearch.remote.metadata.client.SdkClient; |
| 39 | +import org.opensearch.remote.metadata.client.SearchDataObjectRequest; |
| 40 | +import org.opensearch.remote.metadata.common.SdkClientUtils; |
40 | 41 | import org.opensearch.search.builder.SearchSourceBuilder; |
41 | 42 | import org.opensearch.search.fetch.subphase.FetchSourceContext; |
42 | 43 | import org.opensearch.search.internal.InternalSearchResponse; |
@@ -108,22 +109,34 @@ private void search(SearchRequest request, String tenantId, ActionListener<Searc |
108 | 109 | final ActionListener<SearchResponse> doubleWrappedListener = ActionListener |
109 | 110 | .wrap(wrappedListener::onResponse, e -> wrapListenerToHandleSearchIndexNotFound(e, wrappedListener)); |
110 | 111 |
|
111 | | - if (tenantId != null) { |
112 | | - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); |
113 | | - if (request.source().query() != null) { |
114 | | - queryBuilder.must(request.source().query()); |
115 | | - } |
116 | | - queryBuilder.filter(QueryBuilders.termQuery(TENANT_ID_FIELD, tenantId)); // Replace with your tenant_id field |
117 | | - request.source().query(queryBuilder); |
118 | | - } |
119 | | - |
120 | | - if (connectorAccessControlHelper.skipConnectorAccessControl(user)) { |
121 | | - client.search(request, doubleWrappedListener); |
122 | | - } else { |
| 112 | + if (!connectorAccessControlHelper.skipConnectorAccessControl(user)) { |
123 | 113 | SearchSourceBuilder sourceBuilder = connectorAccessControlHelper.addUserBackendRolesFilter(user, request.source()); |
124 | 114 | request.source(sourceBuilder); |
125 | | - client.search(request, doubleWrappedListener); |
126 | 115 | } |
| 116 | + |
| 117 | + SearchDataObjectRequest searchDataObjectRequest = SearchDataObjectRequest |
| 118 | + .builder() |
| 119 | + .indices(request.indices()) |
| 120 | + .searchSourceBuilder(request.source()) |
| 121 | + .tenantId(tenantId) |
| 122 | + .build(); |
| 123 | + sdkClient.searchDataObjectAsync(searchDataObjectRequest).whenComplete((r, throwable) -> { |
| 124 | + if (throwable != null) { |
| 125 | + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); |
| 126 | + log.error("Failed to search connector", cause); |
| 127 | + doubleWrappedListener.onFailure(cause); |
| 128 | + } else { |
| 129 | + try { |
| 130 | + SearchResponse searchResponse = SearchResponse.fromXContent(r.parser()); |
| 131 | + log.info("Connector search complete: {}", searchResponse.getHits().getTotalHits()); |
| 132 | + doubleWrappedListener.onResponse(searchResponse); |
| 133 | + } catch (Exception e) { |
| 134 | + log.error("Failed to parse search response", e); |
| 135 | + doubleWrappedListener |
| 136 | + .onFailure(new OpenSearchStatusException("Failed to parse search response", RestStatus.INTERNAL_SERVER_ERROR)); |
| 137 | + } |
| 138 | + } |
| 139 | + }); |
127 | 140 | } catch (Exception e) { |
128 | 141 | log.error(e.getMessage(), e); |
129 | 142 | actionListener.onFailure(e); |
|
0 commit comments