Skip to content

Commit e7ecd51

Browse files
authored
Replace usages of ThreadContext.stashContext with pluginSubject.runAs (opensearch-project#715)
* Replace usages of ThreadContext.stashContext with pluginSubject.runAs Signed-off-by: Craig Perkins <cwperx@amazon.com> * rearrange Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add initialize to Ip2GeoProcessor Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add System Index Descriptor Signed-off-by: Craig Perkins <cwperx@amazon.com> * Fix test Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add wildcard Signed-off-by: Craig Perkins <cwperx@amazon.com> * fix test Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add to CHANGELOG Signed-off-by: Craig Perkins <cwperx@amazon.com> * Rename to PluginSubject Signed-off-by: Craig Perkins <cwperx@amazon.com> * Run spotlessApply Signed-off-by: Craig Perkins <cwperx@amazon.com> * Allow mget Signed-off-by: Craig Perkins <cwperx@amazon.com> * Rename to pluginClient for clarity and replace instances of AccessController Signed-off-by: Craig Perkins <cwperx@amazon.com> * Use more generic Client Signed-off-by: Craig Perkins <cwperx@amazon.com> * Fix visibility change Signed-off-by: Craig Perkins <cwperx@amazon.com> * Fix visibility change Signed-off-by: Craig Perkins <cwperx@amazon.com> --------- Signed-off-by: Craig Perkins <cwperx@amazon.com>
1 parent dfd6fe6 commit e7ecd51

File tree

15 files changed

+188
-165
lines changed

15 files changed

+188
-165
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
1212
### Documentation
1313
### Maintenance
1414
### Refactoring
15+
- Replace usages of ThreadContext.stashContext with pluginSubject.runAs ([#715](https://github.com/opensearch-project/geospatial/pull/715))

src/main/java/org/opensearch/geospatial/index/mapper/xypoint/XYPointFieldMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,8 @@ public Query shapeQuery(Geometry geometry, String fieldName, ShapeRelation relat
187187
return queryProcessor.shapeQuery(geometry, fieldName, relation, context);
188188
}
189189
}
190+
191+
public Explicit<Boolean> shouldIgnoreMalformed() {
192+
return ignoreMalformed;
193+
}
190194
}

src/main/java/org/opensearch/geospatial/index/mapper/xyshape/XYShapeFieldMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,8 @@ public Query shapeQuery(Geometry geometry, String fieldName, ShapeRelation relat
164164

165165
}
166166

167+
public Explicit<Boolean> shouldIgnoreMalformed() {
168+
return ignoreMalformed;
169+
}
170+
167171
}

src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import java.net.URL;
1212
import java.net.URLConnection;
1313
import java.nio.CharBuffer;
14-
import java.security.AccessController;
15-
import java.security.PrivilegedAction;
1614

1715
import org.opensearch.SpecialPermission;
1816
import org.opensearch.common.SuppressForbidden;
@@ -24,6 +22,7 @@
2422
import org.opensearch.core.xcontent.XContentParser;
2523
import org.opensearch.geospatial.annotation.VisibleForTesting;
2624
import org.opensearch.geospatial.shared.Constants;
25+
import org.opensearch.secure_sm.AccessController;
2726

2827
import lombok.AllArgsConstructor;
2928
import lombok.Getter;
@@ -116,7 +115,7 @@ public static class Builder {
116115
@SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file")
117116
public static DatasourceManifest build(final URL url) {
118117
SpecialPermission.check();
119-
return AccessController.doPrivileged((PrivilegedAction<DatasourceManifest>) () -> {
118+
return AccessController.doPrivileged(() -> {
120119
try {
121120
URLConnection connection = url.openConnection();
122121
return internalBuild(connection);

src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java

Lines changed: 45 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
5050
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
5151
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
52-
import org.opensearch.geospatial.shared.StashedThreadContext;
5352
import org.opensearch.index.IndexNotFoundException;
5453
import org.opensearch.index.query.QueryBuilders;
5554
import org.opensearch.search.SearchHit;
@@ -63,12 +62,12 @@
6362
@Log4j2
6463
public class DatasourceDao {
6564
private static final Integer MAX_SIZE = 1000;
66-
private final Client client;
65+
private final Client pluginClient;
6766
private final ClusterService clusterService;
6867
private final ClusterSettings clusterSettings;
6968

70-
public DatasourceDao(final Client client, final ClusterService clusterService) {
71-
this.client = client;
69+
public DatasourceDao(final Client pluginClient, final ClusterService clusterService) {
70+
this.pluginClient = pluginClient;
7271
this.clusterService = clusterService;
7372
this.clusterSettings = clusterService.getClusterSettings();
7473
}
@@ -85,7 +84,7 @@ public void createIndexIfNotExists(final StepListener<Void> stepListener) {
8584
}
8685
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
8786
.settings(DatasourceExtension.INDEX_SETTING);
88-
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
87+
pluginClient.admin().indices().create(createIndexRequest, new ActionListener<>() {
8988
@Override
9089
public void onResponse(final CreateIndexResponse createIndexResponse) {
9190
stepListener.onResponse(null);
@@ -100,7 +99,7 @@ public void onFailure(final Exception e) {
10099
}
101100
stepListener.onFailure(e);
102101
}
103-
}));
102+
});
104103
}
105104

106105
private String getIndexMapping() {
@@ -122,19 +121,17 @@ private String getIndexMapping() {
122121
*/
123122
public IndexResponse updateDatasource(final Datasource datasource) {
124123
datasource.setLastUpdateTime(Instant.now());
125-
return StashedThreadContext.run(client, () -> {
126-
try {
127-
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
128-
.setId(datasource.getName())
129-
.setOpType(DocWriteRequest.OpType.INDEX)
130-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
131-
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
132-
.execute()
133-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
134-
} catch (IOException e) {
135-
throw new RuntimeException(e);
136-
}
137-
});
124+
try {
125+
return pluginClient.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
126+
.setId(datasource.getName())
127+
.setOpType(DocWriteRequest.OpType.INDEX)
128+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
129+
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
130+
.execute()
131+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
132+
} catch (IOException e) {
133+
throw new RuntimeException(e);
134+
}
138135
}
139136

140137
/**
@@ -148,7 +145,7 @@ public void updateDatasource(final List<Datasource> datasources, final ActionLis
148145
datasource.setLastUpdateTime(Instant.now());
149146
return datasource;
150147
}).map(this::toIndexRequest).forEach(indexRequest -> bulkRequest.add(indexRequest));
151-
StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener));
148+
pluginClient.bulk(bulkRequest, listener);
152149
}
153150

154151
private IndexRequest toIndexRequest(Datasource datasource) {
@@ -173,18 +170,16 @@ private IndexRequest toIndexRequest(Datasource datasource) {
173170
*/
174171
public void putDatasource(final Datasource datasource, final ActionListener listener) {
175172
datasource.setLastUpdateTime(Instant.now());
176-
StashedThreadContext.run(client, () -> {
177-
try {
178-
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
179-
.setId(datasource.getName())
180-
.setOpType(DocWriteRequest.OpType.CREATE)
181-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
182-
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
183-
.execute(listener);
184-
} catch (IOException e) {
185-
new RuntimeException(e);
186-
}
187-
});
173+
try {
174+
pluginClient.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
175+
.setId(datasource.getName())
176+
.setOpType(DocWriteRequest.OpType.CREATE)
177+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
178+
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
179+
.execute(listener);
180+
} catch (IOException e) {
181+
new RuntimeException(e);
182+
}
188183
}
189184

190185
/**
@@ -194,7 +189,7 @@ public void putDatasource(final Datasource datasource, final ActionListener list
194189
*
195190
*/
196191
public void deleteDatasource(final Datasource datasource) {
197-
DeleteResponse response = client.prepareDelete()
192+
DeleteResponse response = pluginClient.prepareDelete()
198193
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
199194
.setId(datasource.getName())
200195
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
@@ -220,7 +215,7 @@ public Datasource getDatasource(final String name) throws IOException {
220215
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
221216
GetResponse response;
222217
try {
223-
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
218+
response = pluginClient.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
224219
if (response.isExists() == false) {
225220
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME);
226221
return null;
@@ -245,7 +240,7 @@ public Datasource getDatasource(final String name) throws IOException {
245240
*/
246241
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
247242
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
248-
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
243+
pluginClient.get(request, new ActionListener<>() {
249244
@Override
250245
public void onResponse(final GetResponse response) {
251246
if (response.isExists() == false) {
@@ -269,7 +264,7 @@ public void onResponse(final GetResponse response) {
269264
public void onFailure(final Exception e) {
270265
actionListener.onFailure(e);
271266
}
272-
}));
267+
});
273268
}
274269

275270
/**
@@ -278,42 +273,33 @@ public void onFailure(final Exception e) {
278273
* @param actionListener the action listener
279274
*/
280275
public void getDatasources(final String[] names, final ActionListener<List<Datasource>> actionListener) {
281-
StashedThreadContext.run(
282-
client,
283-
() -> client.prepareMultiGet()
284-
.add(DatasourceExtension.JOB_INDEX_NAME, names)
285-
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
286-
);
276+
pluginClient.prepareMultiGet()
277+
.add(DatasourceExtension.JOB_INDEX_NAME, names)
278+
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener));
287279
}
288280

289281
/**
290282
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
291283
* @param actionListener the action listener
292284
*/
293285
public void getAllDatasources(final ActionListener<List<Datasource>> actionListener) {
294-
StashedThreadContext.run(
295-
client,
296-
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
297-
.setQuery(QueryBuilders.matchAllQuery())
298-
.setPreference(Preference.PRIMARY.type())
299-
.setSize(MAX_SIZE)
300-
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
301-
);
286+
pluginClient.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
287+
.setQuery(QueryBuilders.matchAllQuery())
288+
.setPreference(Preference.PRIMARY.type())
289+
.setSize(MAX_SIZE)
290+
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener));
302291
}
303292

304293
/**
305294
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
306295
*/
307296
public List<Datasource> getAllDatasources() {
308-
SearchResponse response = StashedThreadContext.run(
309-
client,
310-
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
311-
.setQuery(QueryBuilders.matchAllQuery())
312-
.setPreference(Preference.PRIMARY.type())
313-
.setSize(MAX_SIZE)
314-
.execute()
315-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
316-
);
297+
SearchResponse response = pluginClient.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
298+
.setQuery(QueryBuilders.matchAllQuery())
299+
.setPreference(Preference.PRIMARY.type())
300+
.setSize(MAX_SIZE)
301+
.execute()
302+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
317303

318304
List<BytesReference> bytesReferences = toBytesReferences(response);
319305
return bytesReferences.stream().map(bytesRef -> toDatasource(bytesRef)).collect(Collectors.toList());

src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import java.net.URL;
1515
import java.net.URLConnection;
1616
import java.nio.charset.StandardCharsets;
17-
import java.security.AccessController;
18-
import java.security.PrivilegedAction;
1917
import java.util.Arrays;
2018
import java.util.Collections;
2119
import java.util.Iterator;
@@ -58,8 +56,8 @@
5856
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
5957
import org.opensearch.geospatial.ip2geo.common.URLDenyListChecker;
6058
import org.opensearch.geospatial.shared.Constants;
61-
import org.opensearch.geospatial.shared.StashedThreadContext;
6259
import org.opensearch.index.query.QueryBuilders;
60+
import org.opensearch.secure_sm.AccessController;
6361
import org.opensearch.transport.client.Client;
6462
import org.opensearch.transport.client.Requests;
6563

@@ -91,13 +89,13 @@ public class GeoIpDataDao {
9189
);
9290
private final ClusterService clusterService;
9391
private final ClusterSettings clusterSettings;
94-
private final Client client;
92+
private final Client pluginClient;
9593
private final URLDenyListChecker urlDenyListChecker;
9694

97-
public GeoIpDataDao(final ClusterService clusterService, final Client client, final URLDenyListChecker urlDenyListChecker) {
95+
public GeoIpDataDao(final ClusterService clusterService, final Client pluginClient, final URLDenyListChecker urlDenyListChecker) {
9896
this.clusterService = clusterService;
9997
this.clusterSettings = clusterService.getClusterSettings();
100-
this.client = client;
98+
this.pluginClient = pluginClient;
10199
this.urlDenyListChecker = urlDenyListChecker;
102100
}
103101

@@ -117,24 +115,19 @@ public void createIndexIfNotExists(final String indexName) {
117115
}
118116
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE)
119117
.mapping(getIndexMapping());
120-
StashedThreadContext.run(
121-
client,
122-
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
123-
);
118+
pluginClient.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
124119
}
125120

126121
private void freezeIndex(final String indexName) {
127122
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
128-
StashedThreadContext.run(client, () -> {
129-
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
130-
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
131-
client.admin()
132-
.indices()
133-
.prepareUpdateSettings(indexName)
134-
.setSettings(INDEX_SETTING_TO_FREEZE)
135-
.execute()
136-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
137-
});
123+
pluginClient.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
124+
pluginClient.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
125+
pluginClient.admin()
126+
.indices()
127+
.prepareUpdateSettings(indexName)
128+
.setSettings(INDEX_SETTING_TO_FREEZE)
129+
.execute()
130+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
138131
}
139132

140133
/**
@@ -173,7 +166,7 @@ private String getIndexMapping() {
173166
@SuppressForbidden(reason = "Need to connect to http endpoint to read GeoIP database file")
174167
public CSVParser getDatabaseReader(final DatasourceManifest manifest) {
175168
SpecialPermission.check();
176-
return AccessController.doPrivileged((PrivilegedAction<CSVParser>) () -> {
169+
return AccessController.doPrivileged(() -> {
177170
try {
178171
URL zipUrl = urlDenyListChecker.toUrlIfNotInDenyList(manifest.getUrl());
179172
return internalGetDatabaseReader(manifest, zipUrl.openConnection());
@@ -249,15 +242,12 @@ public XContentBuilder createDocument(final String[] fields, final String[] valu
249242
* @return geoIP data
250243
*/
251244
public Map<String, Object> getGeoIpData(final String indexName, final String ip) {
252-
SearchResponse response = StashedThreadContext.run(
253-
client,
254-
() -> client.prepareSearch(indexName)
255-
.setSize(1)
256-
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
257-
.setPreference(Preference.LOCAL.type())
258-
.setRequestCache(true)
259-
.get(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
260-
);
245+
SearchResponse response = pluginClient.prepareSearch(indexName)
246+
.setSize(1)
247+
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
248+
.setPreference(Preference.LOCAL.type())
249+
.setRequestCache(true)
250+
.get(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
261251

262252
if (response.getHits().getHits().length == 0) {
263253
return Collections.emptyMap();
@@ -297,7 +287,7 @@ public void putGeoIpData(
297287
indexRequest.id(record.get(0));
298288
bulkRequest.add(indexRequest);
299289
if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) {
300-
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
290+
BulkResponse response = pluginClient.bulk(bulkRequest).actionGet(timeout);
301291
if (response.hasFailures()) {
302292
throw new OpenSearchException(
303293
"error occurred while ingesting GeoIP data in {} with an error {}",
@@ -334,15 +324,12 @@ public void deleteIp2GeoDataIndex(final List<String> indices) {
334324
);
335325
}
336326

337-
AcknowledgedResponse response = StashedThreadContext.run(
338-
client,
339-
() -> client.admin()
340-
.indices()
341-
.prepareDelete(indices.toArray(new String[0]))
342-
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
343-
.execute()
344-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
345-
);
327+
AcknowledgedResponse response = pluginClient.admin()
328+
.indices()
329+
.prepareDelete(indices.toArray(new String[0]))
330+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
331+
.execute()
332+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
346333

347334
if (response.isAcknowledged() == false) {
348335
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices));

0 commit comments

Comments
 (0)