Skip to content

Commit 04d0b0f

Browse files
authored
Simplified plain json call to opensearch, refactored cluster adapter for os3 (#24948)
1 parent c13fe0e commit 04d0b0f

20 files changed

+523
-177
lines changed

graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/ClusterAdapterOS.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,12 @@ public class ClusterAdapterOS implements ClusterAdapter {
7676
private static final Logger LOG = LoggerFactory.getLogger(ClusterAdapterOS.class);
7777
private final Duration requestTimeout;
7878
private final OfficialOpensearchClient opensearchClient;
79-
private final PlainJsonApi jsonApi;
80-
private final OpenSearchCatClient catClient;
81-
private final OpenSearchClusterClient clusterClient;
8279

8380
@Inject
8481
public ClusterAdapterOS(OfficialOpensearchClient opensearchClient,
85-
@Named("elasticsearch_socket_timeout") Duration requestTimeout,
86-
PlainJsonApi jsonApi) {
82+
@Named("elasticsearch_socket_timeout") Duration requestTimeout) {
8783
this.requestTimeout = requestTimeout;
88-
this.jsonApi = jsonApi;
8984
this.opensearchClient = opensearchClient;
90-
this.catClient = opensearchClient.sync().cat();
91-
this.clusterClient = opensearchClient.sync().cluster();
9285
}
9386

9487
@Override
@@ -117,8 +110,8 @@ public Set<NodeFileDescriptorStats> fileDescriptorStats() {
117110
}
118111

119112
List<NodesRecord> nodes() {
120-
List<NodesRecord> allNodes = opensearchClient.execute(() ->
121-
catClient.nodes(NodesRequest.builder()
113+
List<NodesRecord> allNodes = opensearchClient.sync(c ->
114+
c.cat().nodes(NodesRequest.builder()
122115
.fullId(true)
123116
.headers("id", "name", "node.role", "ip", "version",
124117
"file_desc.max", "disk.used", "disk.total", "disk.used_percent")
@@ -153,8 +146,8 @@ public ClusterShardAllocation clusterShardAllocation() {
153146
LOG.warn("Could not retrieve max_shards_per_node setting from cluster settings. Threshold warnings disabled.", e);
154147
}
155148

156-
List<NodeShardAllocation> nodeShardAllocations = opensearchClient.execute(() ->
157-
catClient.allocation().valueBody().stream().map(this::toNodeShardAllocation).toList(),
149+
List<NodeShardAllocation> nodeShardAllocations = opensearchClient.sync(c ->
150+
c.cat().allocation().valueBody().stream().map(this::toNodeShardAllocation).toList(),
158151
"Unable to retrieve node shard allocation"
159152
);
160153
return new ClusterShardAllocation(maxShardsPerNode, nodeShardAllocations);
@@ -216,7 +209,7 @@ private Optional<JsonNode> nodeById(String nodeId) {
216209
.endpoint("/_nodes/" + nodeId)
217210
.method("GET")
218211
.build();
219-
return Optional.of(jsonApi.performRequest(request, "Couldn't read Opensearch nodes data!"))
212+
return Optional.of(opensearchClient.performRequest(request, "Couldn't read Opensearch nodes data!"))
220213
.map(jsonNode -> jsonNode.path("nodes").path(nodeId))
221214
.filter(node -> !node.isMissingNode());
222215
}
@@ -250,8 +243,8 @@ private ClusterHealth clusterHealthFrom(HealthResponse response) {
250243

251244
@Override
252245
public PendingTasksStats pendingTasks() {
253-
PendingTasksResponse pendingTasks = opensearchClient.execute(
254-
clusterClient::pendingTasks,
246+
PendingTasksResponse pendingTasks = opensearchClient.sync(c ->
247+
c.cluster().pendingTasks(),
255248
"Unable to retrieve pending tasks"
256249
);
257250

@@ -302,7 +295,7 @@ public JsonNode rawClusterStats() {
302295
.endpoint("/_cluster/stats/nodes/*")
303296
.method("GET")
304297
.build();
305-
return jsonApi.performRequest(request, "Couldn't read Elasticsearch cluster stats");
298+
return opensearchClient.performRequest(request, "Couldn't read Elasticsearch cluster stats");
306299
}
307300

308301
@Override
@@ -311,7 +304,7 @@ public Map<String, org.graylog2.system.stats.elasticsearch.NodeInfo> nodesInfo()
311304
.endpoint("/_nodes")
312305
.method("GET")
313306
.build();
314-
JsonNode json = jsonApi.performRequest(request, "Couldn't read Opensearch nodes data!");
307+
JsonNode json = opensearchClient.performRequest(request, "Couldn't read Opensearch nodes data!");
315308
JsonNode nodes = json.at("/nodes");
316309
return toStream(nodes.fieldNames()).collect(Collectors.toMap(
317310
n -> n,
@@ -334,7 +327,7 @@ public Map<String, NodeOSInfo> nodesHostInfo() {
334327
.endpoint("/_nodes/stats/os")
335328
.method("GET")
336329
.build();
337-
JsonNode json = jsonApi.performRequest(request, "Couldn't read Opensearch nodes os data!");
330+
JsonNode json = opensearchClient.performRequest(request, "Couldn't read Opensearch nodes os data!");
338331
JsonNode nodes = json.at("/nodes");
339332
return toStream(nodes.fieldNames())
340333
.collect(Collectors.toMap(name -> name, name -> createNodeHostInfo(nodes.get(name))));
@@ -370,7 +363,7 @@ public ShardStats shardStats() {
370363
private Optional<HealthResponse> clusterHealth() {
371364
final Time timeout = new Time.Builder().time(requestTimeout.toSeconds() + "s").build();
372365
try {
373-
HealthResponse health = clusterClient.health(HealthRequest.builder()
366+
HealthResponse health = opensearchClient.syncWithoutErrorMapping().cluster().health(HealthRequest.builder()
374367
.timeout(timeout)
375368
.build()
376369
);
@@ -392,7 +385,7 @@ public Optional<HealthStatus> deflectorHealth(Collection<String> indices) {
392385
}
393386

394387
final Map<String, String> aliasMapping;
395-
aliasMapping = opensearchClient.execute(() -> catClient.aliases().valueBody()
388+
aliasMapping = opensearchClient.sync(c -> c.cat().aliases().valueBody()
396389
.stream()
397390
.filter(alias -> Objects.nonNull(alias.index()))
398391
.collect(Collectors.toMap(AliasesRecord::alias, AliasesRecord::index)),
@@ -404,8 +397,8 @@ public Optional<HealthStatus> deflectorHealth(Collection<String> indices) {
404397
.map(index -> aliasMapping.getOrDefault(index, index))
405398
.collect(Collectors.toSet());
406399

407-
final Set<IndicesRecord> indexSummaries = opensearchClient.execute(() ->
408-
catClient.indices().valueBody()
400+
final Set<IndicesRecord> indexSummaries = opensearchClient.sync(client ->
401+
client.cat().indices().valueBody()
409402
.stream()
410403
.filter(indexSummary -> mappedIndices.contains(indexSummary.index()))
411404
.collect(Collectors.toSet()),
@@ -423,7 +416,7 @@ public Optional<HealthStatus> deflectorHealth(Collection<String> indices) {
423416
}
424417

425418
private GetClusterSettingsResponse getClusterSettings() {
426-
return opensearchClient.execute(() -> clusterClient.getSettings(
419+
return opensearchClient.sync(c -> c.cluster().getSettings(
427420
GetClusterSettingsRequest.builder()
428421
.includeDefaults(true)
429422
.flatSettings(true)

graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OfficialOpensearchClient.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package org.graylog.storage.opensearch3;
1818

19+
import com.fasterxml.jackson.core.JsonProcessingException;
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
1922
import com.github.joschi.jadconfig.util.Duration;
2023
import org.apache.hc.core5.http.ContentTooLongException;
2124
import org.graylog.storage.exceptions.ParsedOpenSearchException;
@@ -30,22 +33,36 @@
3033
import org.opensearch.client.opensearch.OpenSearchClient;
3134
import org.opensearch.client.opensearch._types.ErrorCause;
3235
import org.opensearch.client.opensearch._types.OpenSearchException;
36+
import org.opensearch.client.opensearch.generic.Body;
37+
import org.opensearch.client.opensearch.generic.Request;
38+
import org.opensearch.client.opensearch.generic.Response;
3339
import org.opensearch.client.transport.httpclient5.ResponseException;
3440
import org.slf4j.Logger;
3541
import org.slf4j.LoggerFactory;
3642

3743
import java.io.IOException;
44+
import java.util.Objects;
3845
import java.util.Optional;
3946
import java.util.concurrent.CompletableFuture;
4047
import java.util.concurrent.TimeUnit;
4148
import java.util.regex.Matcher;
4249
import java.util.regex.Pattern;
4350

44-
public record OfficialOpensearchClient(OpenSearchClient sync, OpenSearchAsyncClient async) {
51+
public final class OfficialOpensearchClient {
4552

4653
private static final Logger LOG = LoggerFactory.getLogger(OfficialOpensearchClient.class);
4754
private static final Pattern invalidWriteTarget = Pattern.compile("no write index is defined for alias \\[(?<target>[\\w_]+)\\]");
4855

56+
private final OpenSearchClient sync;
57+
private final OpenSearchAsyncClient async;
58+
private final ObjectMapper objectMapper;
59+
60+
public OfficialOpensearchClient(OpenSearchClient sync, OpenSearchAsyncClient async, ObjectMapper objectMapper) {
61+
this.sync = sync;
62+
this.async = async;
63+
this.objectMapper = objectMapper;
64+
}
65+
4966
public <T> T execute(ThrowingSupplier<T> operation, String errorMessage) {
5067
try {
5168
return operation.get();
@@ -95,6 +112,19 @@ public <T> CompletableFuture<T> async(ThrowingAsyncFunction<CompletableFuture<T>
95112
}
96113
}
97114

115+
/**
116+
* This shouldn't be used unless you have very specific needs and require JsonNode as a response from plain json API
117+
*/
118+
public JsonNode performRequest(Request request, String errorMessage) {
119+
String rawJson;
120+
try (Response response = sync.generic().execute(request)) {
121+
rawJson = response.getBody().map(Body::bodyAsString).orElse("");
122+
return objectMapper.readTree(rawJson);
123+
} catch (Exception e) {
124+
throw mapException(e, errorMessage);
125+
}
126+
}
127+
98128
public void close() {
99129
try {
100130
sync()._transport().close();
@@ -220,4 +250,22 @@ private static boolean isParentCircuitBreakingException(OpenSearchException open
220250
}
221251
return false;
222252
}
253+
254+
@Deprecated
255+
public OpenSearchClient sync() {
256+
return sync;
257+
}
258+
259+
@Deprecated
260+
public OpenSearchAsyncClient async() {
261+
return async;
262+
}
263+
264+
public OpenSearchClient syncWithoutErrorMapping() {
265+
return sync;
266+
}
267+
268+
public OpenSearchAsyncClient asyncWithoutErrorMapping() {
269+
return async;
270+
}
223271
}

graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/OfficialOpensearchClientProvider.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.graylog.storage.opensearch3;
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper;
1920
import com.github.joschi.jadconfig.util.Duration;
2021
import com.google.common.base.Supplier;
2122
import com.google.common.base.Suppliers;
@@ -63,7 +64,6 @@
6364
* along with this program. If not, see
6465
* <http://www.mongodb.com/licensing/server-side-public-license>.
6566
*/
66-
6767
public class OfficialOpensearchClientProvider implements Provider<OfficialOpensearchClient> {
6868

6969
private static Logger log = LoggerFactory.getLogger(OfficialOpensearchClientProvider.class);
@@ -76,8 +76,10 @@ public OfficialOpensearchClientProvider(
7676
IndexerJwtAuthToken indexerJwtAuthToken,
7777
CredentialsProvider credentialsProvider,
7878
ElasticsearchClientConfiguration clientConfiguration,
79-
@Nullable TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider) {
80-
clientCache = Suppliers.memoize(() -> createClient(hosts, indexerJwtAuthToken, credentialsProvider, clientConfiguration, trustManagerAndSocketFactoryProvider));
79+
ObjectMapper objectMapper,
80+
@Nullable TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider
81+
) {
82+
clientCache = Suppliers.memoize(() -> createClient(hosts, indexerJwtAuthToken, credentialsProvider, clientConfiguration, trustManagerAndSocketFactoryProvider, objectMapper));
8183
}
8284

8385
@Override
@@ -86,7 +88,7 @@ public OfficialOpensearchClient get() {
8688
}
8789

8890
@Nonnull
89-
private static OfficialOpensearchClient createClient(List<URI> uris, IndexerJwtAuthToken indexerJwtAuthToken, CredentialsProvider credentialsProvider, ElasticsearchClientConfiguration clientConfiguration, TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider) {
91+
private static OfficialOpensearchClient createClient(List<URI> uris, IndexerJwtAuthToken indexerJwtAuthToken, CredentialsProvider credentialsProvider, ElasticsearchClientConfiguration clientConfiguration, TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider, ObjectMapper objectMapper) {
9092

9193
log.info("Initializing OpenSearch client");
9294

@@ -134,7 +136,7 @@ private static OfficialOpensearchClient createClient(List<URI> uris, IndexerJwtA
134136
});
135137

136138
final OpenSearchTransport transport = builder.build();
137-
return new OfficialOpensearchClient(new CustomOpenSearchClient(transport), new CustomAsyncOpenSearchClient(transport));
139+
return new OfficialOpensearchClient(new CustomOpenSearchClient(transport), new CustomAsyncOpenSearchClient(transport), objectMapper);
138140
}
139141

140142
private static Optional<TlsStrategy> tlsStrategy(@Nullable TrustManagerAndSocketFactoryProvider trustManagerAndSocketFactoryProvider) {

graylog-storage-opensearch3/src/main/java/org/graylog/storage/opensearch3/PlainJsonApi.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.graylog.storage.opensearch3.OfficialOpensearchClient.mapException;
2727

28+
@Deprecated
2829
public class PlainJsonApi {
2930
private final ObjectMapper objectMapper;
3031
private final OfficialOpensearchClient client;
@@ -40,10 +41,6 @@ public PlainJsonApi(ObjectMapper objectMapper,
4041
}
4142

4243
@Deprecated
43-
public PlainJsonApi(ObjectMapper objectMapper, OpenSearchClient client) {
44-
this(objectMapper, client, null);
45-
}
46-
4744
public JsonNode performRequest(Request request, String errorMessage) {
4845
try {
4946
Response response = client.sync().generic().execute(request);

0 commit comments

Comments
 (0)