diff --git a/CHANGELOG.md b/CHANGELOG.md index d3f261030e..900ad36d78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased 3.x] ### Added - +- Metrics support includes Micrometer integration for custom client-side metrics. [Metrics](./guides/metrics.md) ### Dependencies - Bump `org.apache.httpcomponents.client5:httpclient5` from 5.4.4 to 5.5 ([#1578](https://github.com/opensearch-project/opensearch-java/pull/1578)) - Bump `org.junit:junit-bom` from 5.12.2 to 5.13.0 ([#1587](https://github.com/opensearch-project/opensearch-java/pull/1587)) +- Added micrometer dependency `io.micrometer:micrometer-core` version 1.13.13 ### Changed diff --git a/USER_GUIDE.md b/USER_GUIDE.md index dfe1399bfd..11dceb2479 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -200,6 +200,7 @@ You can find a working sample of the above code in [IndexingBasics.java](./sampl - [Search](./guides/search.md) - [Generic Client](./guides/generic.md) - [Json](./guides/json.md) +- [Metrics](./guides/metrics.md) ## Plugins diff --git a/guides/metrics.md b/guides/metrics.md new file mode 100644 index 0000000000..c2a92b6954 --- /dev/null +++ b/guides/metrics.md @@ -0,0 +1,77 @@ +- [SDK Metrics](#SDK-Metrics) + - [How to enable metrics](#get-client) + - [Metrics Collected](#metrics-collected) + +# SDK Metrics + +We've integrated a robust metrics solution into the OpenSearch Java client to provide comprehensive insights into its API usage and performance. This includes the collection of vital operational metrics, such as overall throughput, request latency, and error rate. Furthermore, we're capturing more granular details like request and response payload sizes, distinct success and failure rates for operations, and real-time OpenSearch node status to provide a holistic view of client behavior and cluster health. + +The implementation utilizes [Micrometer](https://mvnrepository.com/artifact/io.micrometer/micrometer-core), which supports integrations with various popular monitoring systems ([see their overview](https://docs.micrometer.io/micrometer/reference/implementations.html)). This enables seamless collection and export of metrics to tools like Prometheus and Grafana, allowing real-time monitoring of the OpenSearch client's performance and health. + +## How to enable metrics + +We should create a `MetricOptions` instance and set it in the `ApacheHttpClient5TransportBuilder` when creating the client. Take a look at below code snippet for an example of how to create a client with metrics enabled: + +```java +public class CreateClient { + public static OpenSearchClient createClientWithMetrics() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { + var env = System.getenv(); + var https = Boolean.parseBoolean(env.getOrDefault("HTTPS", "true")); + var hostname = env.getOrDefault("HOST", "localhost"); + var port = Integer.parseInt(env.getOrDefault("PORT", "9200")); + var user = env.getOrDefault("USERNAME", "admin"); + var pass = env.getOrDefault("PASSWORD", "admin"); + var metricsEnabled = true; + double PERCENTILE_99 = 0.99; + double PERCENTILE_95 = 0.95; + var meterRegistry = new SimpleMeterRegistry(); + + final var hosts = new HttpHost[]{new HttpHost(https ? "https" : "http", hostname, port)}; + + final var sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build(); + + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(metricEnabled) // required to turn metrics on/off + .setMeterRegistry(meterRegistry) + .setPercentiles(PERCENTILE_99, PERCENTILE_95) + .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) + .setExcludedTags(MetricTag.HOST_CONTACTED) + .build(); + + final var transport = ApacheHttpClient5TransportBuilder.builder(hosts) + .setMapper(new JacksonJsonpMapper()) + .setMetricOptions(metricOptions) + .setHttpClientConfigCallback(httpClientBuilder -> { + final var credentialsProvider = new BasicCredentialsProvider(); + for (final var host : hosts) { + credentialsProvider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, pass.toCharArray())); + } + +// Disable SSL/TLS verification as our local testing clusters use self-signed certificates + final var tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslContext) + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .build(); + + final var connectionManager = PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build(); + + return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager); + }) + .build(); + return new OpenSearchClient(transport); + } +} +``` + +## Metrics Collected + +The OpenSearch Java client collects a variety of metrics to provide insights into its operations. Below is a summary of the key metrics collected, along with their descriptions and important dimensions, such as status code and request type, that can be used for filtering and analysis. + +| Metric | Description | Important Dimensions(tags) | +|-----------------------------------------|-----------------------------------------------------------------|------------------------------------| +| `os.client.request.seconds` | End-to-end request execution latency | `StatusCodeOrException`, `Request` | +| `os.client.request.seconds.count` | request throughput and error rate based on status tags | `StatusCodeOrException`, `Request` | +| `os.client.request.payload.size.bytes` | Request payload size in bytes | `Request` | +| `os.client.response.payload.size.bytes` | Response payload size in bytes | `Request` | +| `os.client.active.nodes` | Number of OpenSearch active nodes from a client's perspective | | +| `os.client.inactive.nodes` | Number of OpenSearch inactive nodes from a client's perspective | | \ No newline at end of file diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index d390126764..ba3886be76 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -246,6 +246,12 @@ dependencies { testImplementation("junit", "junit" , "4.13.2") { exclude(group = "org.hamcrest") } + + // Micrometer + implementation("io.micrometer:micrometer-core:1.13.13") + + // Awaitility + testImplementation("org.awaitility:awaitility:4.2.0") } licenseReport { diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java new file mode 100644 index 0000000000..7e128edf38 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/ExecutionMetricContext.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import java.time.Duration; + +/** + * Contains necessary information about a request execution to be used for metric recordings. + */ +public abstract class ExecutionMetricContext { + + public static final int DEFAULT_EMPTY_STATUS_CODE = -1; + private Throwable throwable = null; + private int statusCode = DEFAULT_EMPTY_STATUS_CODE; + private Duration executionTime = null; + + protected ExecutionMetricContext() {} + + protected ExecutionMetricContext(Throwable throwable, int statusCode, Duration executionTime) { + this.throwable = throwable; + this.statusCode = statusCode; + this.executionTime = executionTime; + } + + public Throwable getThrowable() { + return throwable; + } + + public int getStatusCode() { + return statusCode; + } + + public Duration getRequestExecutionTime() { + return executionTime; + } + + public void setThrowable(Throwable throwable) { + this.throwable = throwable; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setRequestExecutionTime(Duration executionTime) { + this.executionTime = executionTime; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java new file mode 100644 index 0000000000..ed9ccf03aa --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MeterOptions.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; + +import io.micrometer.core.instrument.Tags; +import java.util.Set; + +/** + * Contains settings for configuring a meter + */ +public class MeterOptions { + private final double[] percentiles; + private final Tags commonTags; + private final Set excludedTagNames; + + public MeterOptions(double[] percentiles, Tags commonTags, Set excludedTagNames) { + this.percentiles = percentiles == null ? DEFAULT_PERCENTILES.clone() : percentiles.clone(); + this.commonTags = commonTags == null ? Tags.empty() : commonTags; + this.excludedTagNames = excludedTagNames == null ? DEFAULT_EXCLUDED_TAGS : excludedTagNames; + } + + /** + * Get percentiles to publish for Timer/Distribution meters + * @return a double array + */ + public double[] getPercentiles() { + return percentiles; + } + + /** + * Get common {@link io.micrometer.core.instrument.Tags} that this meter need to have + * @return a {@link io.micrometer.core.instrument.Tags} + */ + public Tags getCommonTags() { + return commonTags; + } + + /** + * Get tag names that a meter are excluded. + * @return a set of {@link MetricTag} + */ + public Set getExcludedTagNames() { + return excludedTagNames; + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java new file mode 100644 index 0000000000..ac988819c1 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricConstants.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import java.util.EnumSet; +import java.util.Set; + +public class MetricConstants { + public static final double[] DEFAULT_PERCENTILES = new double[] { 0.99, 0.95, 0.9, 0.75, 0.5 }; + public static final MeterRegistry DEFAULT_REGISTRY = Metrics.globalRegistry; + public static final Set DEFAULT_EXCLUDED_TAGS = EnumSet.noneOf(MetricTag.class); + public static final Set DEFAULT_ADDITIONAL_METRIC_GROUPS = EnumSet.noneOf(MetricGroup.class); +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java new file mode 100644 index 0000000000..4c8bfa68f9 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricGroup.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import java.util.EnumSet; +import java.util.Set; + +public enum MetricGroup { + GENERAL, + NETWORK_DETAILS; + + public static final Set REQUIRED_GROUPS = EnumSet.of(GENERAL); + public static final Set ALL = EnumSet.allOf(MetricGroup.class); +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java new file mode 100644 index 0000000000..b72d34f83a --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricName.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +public enum MetricName { + REQUEST("request"), + NETWORK_REQUEST("network.request"), + ACTIVE_NODES("active.nodes"), + INACTIVE_NODES("inactive.nodes"), + REQUEST_PAYLOAD_SIZE("request.payload.size"), + RESPONSE_PAYLOAD_SIZE("response.payload.size"); + + private static final String PREFIX = "os.client"; + private final String metricName; + + MetricName(String metricName) { + this.metricName = metricName; + } + + @Override + public String toString() { + return PREFIX + "." + metricName; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java new file mode 100644 index 0000000000..2ae33f0027 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricOptions.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_ADDITIONAL_METRIC_GROUPS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; + +import io.micrometer.core.instrument.MeterRegistry; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Set; + +/** + * Contains settings to configure metrics + */ +public class MetricOptions { + private final boolean isEnabled; + private final MeterRegistry meterRegistry; + private final String clientId; + private final double[] percentiles; + private final Set excludedTags; + private final Set metricGroups; + + public MetricOptions(MetricOptionsBuilder builder) { + meterRegistry = builder.meterRegistry == null ? DEFAULT_REGISTRY : builder.meterRegistry; + clientId = builder.clientId == null || builder.clientId.isEmpty() + ? String.valueOf(TelemetryMetricsManager.generateClientID()) + : builder.clientId; + percentiles = builder.percentiles == null ? DEFAULT_PERCENTILES : builder.percentiles; + isEnabled = builder.isEnabled; + excludedTags = builder.excludedTags == null ? DEFAULT_EXCLUDED_TAGS : builder.excludedTags; + metricGroups = builder.metricGroups == null ? DEFAULT_ADDITIONAL_METRIC_GROUPS : builder.metricGroups; + } + + public static MetricOptionsBuilder builder() { + return new MetricOptionsBuilder(); + } + + public boolean isMetricsEnabled() { + return isEnabled; + } + + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + + public String getClientId() { + return clientId; + } + + public double[] getPercentiles() { + return percentiles; + } + + public Set getExcludedTags() { + return excludedTags; + } + + public Set getMetricGroups() { + return metricGroups; + } + + public static class MetricOptionsBuilder { + private boolean isEnabled; + private MeterRegistry meterRegistry = null; + private String clientId = null; + private double[] percentiles = null; + private Set excludedTags = null; + private Set metricGroups = null; + + /** + * Set whether the metrics system is enabled + * @param enabled true to enable metrics; otherwise, false + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setMetricsEnabled(boolean enabled) { + this.isEnabled = enabled; + return this; + } + + /** + * Set a {@link MeterRegistry} for the client to propagate metrics + * @param meterRegistry a {@link MeterRegistry} instance + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + + /** + * Give the client a name/ID + * @param clientId a name + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + /** + * Set percentiles of distribution metrics to be published. For instance, pass {@code 0.95}, + * {@code 0.99} to publish request latency P99 and P95. + * + * @param percentiles percentile values to publish + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setPercentiles(double... percentiles) { + this.percentiles = percentiles; + return this; + } + + /** + * Set tags that metrics cannot have. Each metric has a set of required tags + * that cannot be excluded. However, for optional tags, they can be dropped based on + * the tags set here. This option allows users to exclude high cardinality tags. + * @param tags {@link MetricTag} names to exclude + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setExcludedTags(MetricTag... tags) { + this.excludedTags = EnumSet.copyOf(Arrays.asList(tags)); + return this; + } + + /** + * Set {@link MetricGroup} that you would like to be enabled in addition to groups under + * {@link MetricGroup#REQUIRED_GROUPS}. + * + * @param metricGroups {@link MetricGroup} names to be enabled + * @return current {@link MetricOptionsBuilder} + */ + public MetricOptionsBuilder setAdditionalMetricGroups(MetricGroup... metricGroups) { + this.metricGroups = EnumSet.copyOf(Arrays.asList(metricGroups)); + return this; + } + + public MetricOptions build() { + return new MetricOptions(this); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java new file mode 100644 index 0000000000..b857636e63 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/MetricTag.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +public enum MetricTag { + REQUEST("Request"), + CLIENT_ID("ClientID"), + STATUS_CODE_OR_EXCEPTION("StatusCodeOrException"), + HOST_CONTACTED("HostContacted"), + HOST("Host"); + + private String name; + + MetricTag(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java new file mode 100644 index 0000000000..c8c0d28b81 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/NetworkRequestMetricContext.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import java.time.Duration; + +public class NetworkRequestMetricContext extends ExecutionMetricContext { + private String hostName; + private long responsePayloadSize = -1; + private long requestPayloadSize = -1; + + public NetworkRequestMetricContext(String hostName, Throwable throwable, int statusCode, Duration executionTime) { + super(throwable, statusCode, executionTime); + this.hostName = hostName; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public long getResponsePayloadSize() { + return responsePayloadSize; + } + + public void setResponsePayloadSize(long responsePayloadSize) { + this.responsePayloadSize = responsePayloadSize; + } + + public long getRequestPayloadSize() { + return requestPayloadSize; + } + + public void setRequestPayloadSize(long requestPayloadSize) { + this.requestPayloadSize = requestPayloadSize; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java new file mode 100644 index 0000000000..6d22ec7cd8 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/RequestMetricContext.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import java.util.ArrayList; +import java.util.List; + +public class RequestMetricContext extends ExecutionMetricContext { + private final List networkRequestContexts = new ArrayList<>(); + + public List getNetworkRequestContexts() { + return networkRequestContexts; + } + + public String getContactedHosts() { + StringBuilder builder = new StringBuilder(); + if (!networkRequestContexts.isEmpty()) { + networkRequestContexts.forEach(networkRequest -> builder.append(networkRequest.getHostName()).append(",")); + return builder.substring(0, builder.length() - 1); + } + return "NONE"; + } + + public void addNetworkRequestContext(NetworkRequestMetricContext nodeContext) { + networkRequestContexts.add(nodeContext); + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java new file mode 100644 index 0000000000..0094395513 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/client_metrics/TelemetryMetricsManager.java @@ -0,0 +1,252 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; +import static org.opensearch.client.transport.client_metrics.MetricName.ACTIVE_NODES; +import static org.opensearch.client.transport.client_metrics.MetricName.INACTIVE_NODES; +import static org.opensearch.client.transport.client_metrics.MetricName.NETWORK_REQUEST; +import static org.opensearch.client.transport.client_metrics.MetricName.REQUEST; +import static org.opensearch.client.transport.client_metrics.MetricName.REQUEST_PAYLOAD_SIZE; +import static org.opensearch.client.transport.client_metrics.MetricName.RESPONSE_PAYLOAD_SIZE; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import java.time.Duration; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TelemetryMetricsManager { + private static final Log logger = LogFactory.getLog(TelemetryMetricsManager.class); + private static final String EXCEPTION_PREFIX = "ClientError-"; + private static final AtomicInteger CLIENT_ID_GENERATOR = new AtomicInteger(0); + private static final CompositeMeterRegistry mainRegistry = new CompositeMeterRegistry(); + + private TelemetryMetricsManager() { + throw new IllegalStateException("Cannot instantiate a utility class"); + } + + public static int generateClientID() { + return CLIENT_ID_GENERATOR.getAndIncrement(); + } + + /** + * Add the provided registry to the main composite registry + * @param registry a {@link MeterRegistry} + */ + public static synchronized void addRegistry(MeterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("Cannot add a null registry"); + } + mainRegistry.add(registry); + } + + /** + * Remove the provided registry from the main composite registry. If the main registry has no + * child registries, it is cleared. + * @param registry a {@link MeterRegistry} + */ + public static synchronized void removeRegistry(MeterRegistry registry) { + if (registry == null) { + throw new IllegalArgumentException("Cannot remove a null registry"); + } + mainRegistry.remove(registry); + if (mainRegistry.getRegistries().isEmpty()) { + mainRegistry.clear(); + } + } + + /** + * Record information for request-related metrics. + * + * @param requestName name of the executed request + * @param meterOptions options for configuring its meter + * @param context a {@link RequestMetricContext} object about the execution + * @param metricGroups {@link MetricGroup} groups to record + */ + public static void recordRequestMetrics( + String requestName, + MeterOptions meterOptions, + RequestMetricContext context, + Set metricGroups + ) { + if (requestName == null) { + throw new IllegalArgumentException("Request name cannot be null"); + } + if (context == null) { + throw new IllegalArgumentException("Metric context cannot be null"); + } + if (meterOptions == null) { + throw new IllegalArgumentException("Meter options cannot be null"); + } + if (metricGroups == null) { + metricGroups = MetricGroup.REQUIRED_GROUPS; + } + + Set excludedTags = Optional.ofNullable(meterOptions.getExcludedTagNames()).orElse(EnumSet.noneOf(MetricTag.class)); + recordOverallRequestMetric(meterOptions, context, getRequestTags(requestName, context), excludedTags); + if (metricGroups.contains(MetricGroup.NETWORK_DETAILS)) { + Tags payloadSizeTags = Tags.of(MetricTag.REQUEST.toString(), requestName).and(meterOptions.getCommonTags()); + for (NetworkRequestMetricContext networkRequestMetricContext : context.getNetworkRequestContexts()) { + recordNetworkRequestMetric( + meterOptions, + networkRequestMetricContext, + getRequestTags(requestName, networkRequestMetricContext), + excludedTags + ); + recordPayloadSizes(networkRequestMetricContext, payloadSizeTags); + } + } + } + + public static void initializeNodeGauges( + MeterOptions meterOptions, + Supplier activeNodeUpdater, + Supplier inactiveNodeUpdater + ) { + if (meterOptions == null) { + throw new IllegalArgumentException("Meter options cannot be null"); + } + if (activeNodeUpdater == null) { + throw new IllegalArgumentException("activeNodeUpdater cannot be null"); + } + if (inactiveNodeUpdater == null) { + throw new IllegalArgumentException("inactiveNodeUpdater cannot be null"); + } + Tags tags = Tags.empty().and(meterOptions.getCommonTags()); + Gauge.builder(ACTIVE_NODES.toString(), activeNodeUpdater) + .description("Number of active nodes to serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); + Gauge.builder(INACTIVE_NODES.toString(), inactiveNodeUpdater) + .description("Number of inactive nodes that cannot serve traffic") + .tags(tags) + .baseUnit("nodes") + .register(mainRegistry); + } + + protected static void recordPayloadSizes(NetworkRequestMetricContext context, Tags tags) { + if (tags == null) { + tags = Tags.empty(); + } + if (context == null) { + throw new IllegalArgumentException("Metric context cannot be null"); + } + + long requestPayloadSize = context.getRequestPayloadSize(); + long responsePayloadSize = context.getResponsePayloadSize(); + if (requestPayloadSize > -1) { + DistributionSummary.builder(REQUEST_PAYLOAD_SIZE.toString()) + .tags(tags) + .description("Request payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(requestPayloadSize); + } + if (responsePayloadSize > -1) { + DistributionSummary.builder(RESPONSE_PAYLOAD_SIZE.toString()) + .tags(tags) + .description("Response payload size") + .baseUnit("bytes") + .register(mainRegistry) + .record(responsePayloadSize); + } + } + + private static void recordNetworkRequestMetric( + MeterOptions meterOptions, + NetworkRequestMetricContext context, + Tags requestTags, + Set excludedTags + ) { + if (context.getRequestExecutionTime() != null) { + Tags networkRequestTags = excludedTags.contains(MetricTag.HOST) + ? requestTags + : requestTags.and(Tag.of(MetricTag.HOST.toString(), context.getHostName())); + Timer.builder(NETWORK_REQUEST.toString()) + .description("Apache HttpClient request latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(networkRequestTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); + } else if (logger.isDebugEnabled()) { + logger.debug("Missing execution duration. Skipping " + NETWORK_REQUEST); + } + } + + private static void recordOverallRequestMetric( + MeterOptions meterOptions, + RequestMetricContext context, + Tags requiredRequestTags, + Set excludedTags + ) { + if (context.getRequestExecutionTime() != null) { + Tags requestMeterTags = excludedTags.contains(MetricTag.HOST_CONTACTED) + ? requiredRequestTags + : requiredRequestTags.and(Tag.of(MetricTag.HOST_CONTACTED.toString(), context.getContactedHosts())); + Timer.builder(REQUEST.toString()) + .description("End-to-end request execution latency") + .publishPercentiles(meterOptions.getPercentiles()) + .tags(requestMeterTags.and(meterOptions.getCommonTags())) + .maximumExpectedValue(Duration.ofSeconds(30)) + .register(mainRegistry) + .record(context.getRequestExecutionTime()); + } else if (logger.isDebugEnabled()) { + logger.debug("Missing execution duration. Skipping " + REQUEST); + } + } + + /** + * Compose required {@link Tags} for metrics {@link MetricName#REQUEST} and {@link MetricName#NETWORK_REQUEST} + * based on the provided information. + * + *

+ * Required tags: {@link MetricTag#REQUEST}, {@link MetricTag#STATUS_CODE_OR_EXCEPTION} + *

+ * + * @param requestName name of the executed request + * @param context a {@link RequestMetricContext} object about the execution + * @return {@link Tags} + */ + private static Tags getRequestTags(String requestName, ExecutionMetricContext context) { + List tagList = new ArrayList<>(); + tagList.add(Tag.of(MetricTag.REQUEST.toString(), requestName == null ? "" : requestName)); + tagList.add(Tag.of(MetricTag.STATUS_CODE_OR_EXCEPTION.toString(), extractStatusCodeOrException(context))); + return Tags.of(tagList); + } + + private static String extractStatusCodeOrException(ExecutionMetricContext context) { + if (context != null) { + if (context.getStatusCode() != DEFAULT_EMPTY_STATUS_CODE) { + return String.valueOf(context.getStatusCode()); + } else if (context.getThrowable() != null) { + Throwable error = context.getThrowable(); + if (error.getCause() != null) { + return EXCEPTION_PREFIX + error.getCause().getClass().getSimpleName(); + } + return EXCEPTION_PREFIX + error.getClass().getSimpleName(); + } + } + return "UNKNOWN"; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index 2966473dba..77b7346234 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -8,6 +8,12 @@ package org.opensearch.client.transport.httpclient5; +import static org.opensearch.client.transport.client_metrics.ExecutionMetricContext.DEFAULT_EMPTY_STATUS_CODE; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; +import static org.opensearch.client.transport.client_metrics.MetricTag.CLIENT_ID; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import jakarta.json.stream.JsonGenerator; import jakarta.json.stream.JsonParser; import java.io.ByteArrayInputStream; @@ -20,11 +26,13 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -42,6 +50,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -93,6 +102,12 @@ import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.client_metrics.MeterOptions; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricOptions; +import org.opensearch.client.transport.client_metrics.NetworkRequestMetricContext; +import org.opensearch.client.transport.client_metrics.RequestMetricContext; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; import org.opensearch.client.transport.endpoints.BooleanEndpoint; import org.opensearch.client.transport.endpoints.BooleanResponse; import org.opensearch.client.transport.httpclient5.internal.HttpUriRequestProducer; @@ -121,6 +136,12 @@ public class ApacheHttpClient5Transport implements OpenSearchTransport { private final String pathPrefix; private final List
defaultHeaders; + private final boolean isMetricsEnabled; + private MeterOptions meterOptions; + private MeterRegistry meterRegistry; + private Set metricGroups = EnumSet.copyOf(MetricGroup.REQUIRED_GROUPS); + private String clientID = "NONE"; + public ApacheHttpClient5Transport( final CloseableHttpAsyncClient client, final Header[] defaultHeaders, @@ -132,7 +153,8 @@ public ApacheHttpClient5Transport( final NodeSelector nodeSelector, final boolean strictDeprecationMode, final boolean compressionEnabled, - final boolean chunkedEnabled + final boolean chunkedEnabled, + final MetricOptions metricOptions ) { this.mapper = mapper; this.client = client; @@ -145,6 +167,22 @@ public ApacheHttpClient5Transport( this.chunkedEnabled = chunkedEnabled; this.compressionEnabled = compressionEnabled; setNodes(nodes); + + if (metricOptions != null && metricOptions.isMetricsEnabled()) { + if (metricOptions.getClientId() != null && !metricOptions.getClientId().isEmpty()) { + this.clientID = metricOptions.getClientId(); + } + if (metricOptions.getMetricGroups() != null) { + this.metricGroups.addAll(metricOptions.getMetricGroups()); + } + this.isMetricsEnabled = metricOptions.isMetricsEnabled(); + this.meterRegistry = metricOptions.getMeterRegistry() == null ? DEFAULT_REGISTRY : metricOptions.getMeterRegistry(); + this.meterOptions = new MeterOptions(metricOptions.getPercentiles(), getCommonTags(), metricOptions.getExcludedTags()); + TelemetryMetricsManager.addRegistry(this.meterRegistry); + TelemetryMetricsManager.initializeNodeGauges(this.meterOptions, getActiveNodeGaugeUpdater(), getInactiveNodeGaugeUpdater()); + } else { + this.isMetricsEnabled = false; + } } @Override @@ -175,12 +213,14 @@ public CompletableFuture performRequest ) { final ApacheHttpClient5Options requestOptions = (options == null) ? transportOptions : ApacheHttpClient5Options.of(options); - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new OpenSearchRequestFuture<>(); final HttpUriRequestBase clientReq = prepareLowLevelRequest(request, endpoint, requestOptions); final WarningsHandler warningsHandler = (requestOptions.getWarningsHandler() == null) ? this.warningsHandler : requestOptions.getWarningsHandler(); + final long executionStartTime = System.currentTimeMillis(); + try { performRequestAsync(nextNodes(), requestOptions, clientReq, warningsHandler, future); } catch (final IOException ex) { @@ -193,6 +233,15 @@ public CompletableFuture performRequest } catch (final IOException ex) { throw new CompletionException(ex); } + }).whenComplete((responseT, throwable) -> { + RequestMetricContext context = ((OpenSearchRequestFuture) future).getContext(); + if (throwable != null) { + context.setThrowable(throwable); + } + context.setRequestExecutionTime(Duration.ofMillis(System.currentTimeMillis() - executionStartTime)); + if (isMetricsEnabled) { + TelemetryMetricsManager.recordRequestMetrics(request.getClass().getSimpleName(), meterOptions, context, metricGroups); + } }); } @@ -206,9 +255,24 @@ public TransportOptions options() { return transportOptions; } + public boolean isMetricsEnabled() { + return isMetricsEnabled; + } + + public String getClientID() { + return clientID; + } + + public MeterOptions getMeterOptions() { + return meterOptions; + } + @Override public void close() throws IOException { client.close(); + if (meterRegistry != null) { + TelemetryMetricsManager.removeRegistry(meterRegistry); + } } private void performRequestAsync( @@ -219,6 +283,8 @@ private void performRequestAsync( final CompletableFuture listener ) { final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); + final String hostName = context.node.getHost().getHostName() + ":" + context.node.getHost().getPort(); + final long startTime = System.currentTimeMillis(); Future future = client.execute( context.requestProducer, context.asyncResponseConsumer, @@ -226,7 +292,15 @@ private void performRequestAsync( new FutureCallback() { @Override public void completed(ClassicHttpResponse httpResponse) { + long endTime = System.currentTimeMillis(); try { + prepareRequestMetricContext( + ((OpenSearchRequestFuture) listener).getContext(), + request, + httpResponse, + hostName, + endTime - startTime + ); ResponseOrResponseException responseOrResponseException = convertResponse( request, context.node, @@ -249,7 +323,17 @@ public void completed(ClassicHttpResponse httpResponse) { @Override public void failed(Exception failure) { + long endTime = System.currentTimeMillis(); try { + ((OpenSearchRequestFuture) listener).getContext() + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + failure, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(endTime - startTime) + ) + ); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { performRequestAsync(nodeTuple, options, request, warningsHandler, listener); @@ -263,7 +347,17 @@ public void failed(Exception failure) { @Override public void cancelled() { - listener.completeExceptionally(new CancellationException("request was cancelled")); + CancellationException cancellationException = new CancellationException("request was cancelled"); + ((OpenSearchRequestFuture) listener).getContext() + .addNetworkRequestContext( + new NetworkRequestMetricContext( + hostName, + cancellationException, + DEFAULT_EMPTY_STATUS_CODE, + Duration.ofMillis(System.currentTimeMillis() - startTime) + ) + ); + listener.completeExceptionally(cancellationException); } } ); @@ -273,6 +367,30 @@ public void cancelled() { } } + private void prepareRequestMetricContext( + RequestMetricContext requestMetricContext, + HttpUriRequestBase request, + ClassicHttpResponse httpResponse, + String hostName, + long executionTimeMS + ) { + NetworkRequestMetricContext networkRequestMetricContext = new NetworkRequestMetricContext( + hostName, + null, + httpResponse.getCode(), + Duration.ofMillis(executionTimeMS) + ); + // Because we compress data as it is being streamed, we can't obtain compressed size efficiently + if (request.getEntity() != null && !compressionEnabled) { + networkRequestMetricContext.setRequestPayloadSize(request.getEntity().getContentLength()); + } + if (httpResponse.getEntity() != null) { + networkRequestMetricContext.setResponsePayloadSize(httpResponse.getEntity().getContentLength()); + } + requestMetricContext.setStatusCode(httpResponse.getCode()); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContext); + } + /** * Replaces the nodes with which the client communicates. * @@ -824,6 +942,34 @@ private static URI buildUri(String pathPrefix, String path, Map } } + private Tags getCommonTags() { + return Tags.of(CLIENT_ID.toString(), getClientID()); + } + + private Supplier getActiveNodeGaugeUpdater() { + return () -> { + int numActive = 0; + for (Node node : this.nodeTuple.nodes) { + if (!denylist.containsKey(node.getHost())) { + numActive += 1; + } + } + return numActive; + }; + } + + private Supplier getInactiveNodeGaugeUpdater() { + return () -> { + int numInactive = 0; + for (Node node : this.nodeTuple.nodes) { + if (denylist.containsKey(node.getHost())) { + numInactive += 1; + } + } + return numInactive; + }; + } + private static class RequestContext { private final Node node; private final AsyncRequestProducer requestProducer; diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java index 334a48a89b..05946acf7d 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java @@ -39,6 +39,7 @@ import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.client_metrics.MetricOptions; import org.opensearch.client.transport.httpclient5.internal.Node; import org.opensearch.client.transport.httpclient5.internal.NodeSelector; @@ -77,6 +78,7 @@ public class ApacheHttpClient5TransportBuilder { private Optional chunkedEnabled; private JsonpMapper mapper; private TransportOptions options; + private MetricOptions metricOptions; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -261,6 +263,11 @@ public ApacheHttpClient5TransportBuilder setChunkedEnabled(boolean chunkedEnable return this; } + public ApacheHttpClient5TransportBuilder setMetricOptions(MetricOptions metricOptions) { + this.metricOptions = metricOptions; + return this; + } + /** * Creates a new {@link RestClient} based on the provided configuration. */ @@ -287,7 +294,8 @@ public ApacheHttpClient5Transport build() { nodeSelector, strictDeprecationMode, compressionEnabled, - chunkedEnabled.orElse(false) + chunkedEnabled.orElse(false), + metricOptions ); httpClient.start(); diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java new file mode 100644 index 0000000000..8a8a6740ee --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/OpenSearchRequestFuture.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.concurrent.CompletableFuture; +import org.opensearch.client.transport.client_metrics.RequestMetricContext; + +public class OpenSearchRequestFuture extends CompletableFuture { + private final RequestMetricContext context = new RequestMetricContext(); + + public RequestMetricContext getContext() { + return context; + } +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java new file mode 100644 index 0000000000..37b7362ca1 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/AbstractClientMetricsIT.java @@ -0,0 +1,466 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest; + +import static org.awaitility.Awaitility.await; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hc.client5.http.HttpHostConnectException; +import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.opensearch.core.CreateRequest; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; +import org.opensearch.client.opensearch.core.UpdateRequest; +import org.opensearch.client.opensearch.core.UpdateResponse; +import org.opensearch.client.opensearch.integTest.AbstractCrudIT.AppData; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricName; +import org.opensearch.client.transport.client_metrics.MetricTag; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; +import org.opensearch.client.transport.httpclient5.ResponseException; +import org.opensearch.common.settings.Settings; + +public abstract class AbstractClientMetricsIT extends OpenSearchJavaClientTestCase { + private static final HttpHost[] BAD_HOSTS = new HttpHost[] { new HttpHost("localhost", 9201), new HttpHost("localhost", 9202) }; + private static final List ALL_GROUPS = MetricGroup.ALL.stream().map(Enum::toString).collect(Collectors.toList()); + private static final Settings ALL_METRIC_GROUP_SETTING = Settings.builder().putList(METRICS_GROUPS, ALL_GROUPS).build(); + + public void testDefaultMetricGroup() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + Future clusterHealthFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(Settings.EMPTY)) + .cluster() + .health(); + await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ).size() + ); + assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); + } + + public void testAllMetricGroups() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testMetrics"); + Future indexFuture = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)) + .index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(2, TimeUnit.SECONDS).until(indexFuture::isDone); + + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals(1, findMeter(getStubRegistry(), MetricName.ACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals(1, findMeter(getStubRegistry(), MetricName.INACTIVE_NODES.toString(), Tags.empty()).size()); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + assertEquals( + 1, + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).size() + ); + } + + public void testRequestMetricResponse() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + Future clusterHealthFuture = getCustomAsyncClient( + getDefaultHosts(), + restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING) + ).cluster().health(); + await().atMost(2, TimeUnit.SECONDS).until(clusterHealthFuture::isDone); + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Meter clusterHealthMeter = meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), HealthRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Meter clusterHealthNetworkMeter = meter.get(0); + + for (Meter requestMeter : Arrays.asList(clusterHealthMeter, clusterHealthNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals(1, ((Timer) requestMeter).count()); + assertEquals("200", statusCodeTag); + } + + String[] hostsContacted = clusterHealthMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + String hostContacted = clusterHealthNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricResponseExceptionThrown() throws IOException, ExecutionException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String index = "index_metrics"; + OpenSearchAsyncClient client = getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)); + // Force 409 + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricResponseException"); + client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)).get(); + try { + CreateRequest duplicateCreateRequest = new CreateRequest.Builder().index(index) + .id(id) + .document(appData) + .build(); + client.create(duplicateCreateRequest).get(); + fail("Should have failed due to version conflict"); + } catch (ExecutionException e) { + if (e.getCause() instanceof ResponseException) { + assertTrue(e.getMessage().contains("version conflict")); + } else { + fail("Unrecognized cause: " + e.getCause()); + } + } + // Check metrics + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer createMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), CreateRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer createNetworkMeter = (Timer) meter.get(0); + + for (Timer requestMeter : Arrays.asList(createMeter, createNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("409", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = createMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + + String hostContacted = createNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricResponseExceptionHandled() throws IOException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String malformedIndexName = "index_Metrics"; + // Force 400 for SimpleEndpoint + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricResponseExceptionHandled"); + try { + getCustomAsyncClient(getDefaultHosts(), restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( + b -> b.index(malformedIndexName).id(id).document(appData).refresh(Refresh.True) + ).get(); + fail("Should have failed due to invalid index name"); + } catch (ExecutionException e) { + if (e.getCause() instanceof OpenSearchException) { + assertTrue(e.getMessage().contains("invalid_index_name_exception")); + } else { + fail("Unrecognized cause: " + e.getCause()); + } + } + // Check metrics + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexNetworkMeter = (Timer) meter.get(0); + + for (Timer requestMeter : Arrays.asList(indexMeter, indexNetworkMeter)) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("400", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = indexMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + assertArrayEquals(new String[] { getTestRestCluster() }, hostsContacted); + + String hostContacted = indexNetworkMeter.getId().getTag(MetricTag.HOST.toString()); + assertEquals(getTestRestCluster(), hostContacted); + } + + public void testRequestMetricExceptionThrown() throws IOException, InterruptedException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String indexName = "index_metrics"; + // Force HttpHostConnectException + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricClientExceptionThrown"); + + try { + getCustomAsyncClient(BAD_HOSTS, restClientSettingsWithMetrics(ALL_METRIC_GROUP_SETTING)).index( + b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True) + ).get(); + fail("Should have failed due to bad host"); + } catch (ExecutionException e) { + System.out.println("Caught " + e.getMessage()); + if (!(e.getCause() instanceof HttpHostConnectException)) { + fail("Recognized cause: " + e.getCause()); + } + } + // Check metric + List meter = findMeter( + getStubRegistry(), + MetricName.REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(1, meter.size()); + Timer indexMeter = (Timer) meter.get(0); + + meter = findMeter( + getStubRegistry(), + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ); + assertEquals(2, meter.size()); + + for (Timer requestMeter : Arrays.asList(indexMeter, (Timer) meter.get(0), (Timer) meter.get(1))) { + String statusCodeTag = requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + assertEquals("ClientError-HttpHostConnectException", statusCodeTag); + assertEquals(1, requestMeter.count()); + } + + String[] hostsContacted = indexMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString()).split(","); + String[] expectedHosts = new String[] { "localhost:9201", "localhost:9202" }; + assertArrayEquals(expectedHosts, hostsContacted); + + List networkContactedHosts = new ArrayList<>(List.of(expectedHosts)); + networkContactedHosts.remove(meter.get(0).getId().getTag(MetricTag.HOST.toString())); + networkContactedHosts.remove(meter.get(1).getId().getTag(MetricTag.HOST.toString())); + assertTrue(networkContactedHosts.isEmpty()); + } + + public void testNodeGauges() throws InterruptedException, IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "badHostClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + BAD_HOSTS, + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + ); + // Verify active nodes + List activeNodeMeter = findMeter( + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, activeNodeMeter.size()); + assertEquals(2, ((Gauge) activeNodeMeter.get(0)).value(), 0); + // Verify inactive nodes + List inactiveNodeMeter = findMeter( + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, inactiveNodeMeter.size()); + assertEquals(0, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); + + // Force HttpHostConnectException + String indexName = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg("testRequestMetricClientExceptionThrown"); + try { + client.index(b -> b.index(indexName).id(id).document(appData).refresh(Refresh.True)).get(); + fail("Should have failed due to bad host"); + } catch (ExecutionException | IOException e) { + System.out.println("Caught " + e.getMessage()); + if (!(e.getCause() instanceof HttpHostConnectException)) { + fail("Recognized cause: " + e.getCause()); + } + } + + // Verify active nodes + activeNodeMeter = findMeter( + getStubRegistry(), + MetricName.ACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, activeNodeMeter.size()); + assertEquals(0, ((Gauge) activeNodeMeter.get(0)).value(), 0); + // Verify inactive nodes + inactiveNodeMeter = findMeter( + getStubRegistry(), + MetricName.INACTIVE_NODES.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID) + ); + assertEquals(1, inactiveNodeMeter.size()); + assertEquals(2, ((Gauge) inactiveNodeMeter.get(0)).value(), 0); + } + + public void testPayloadSize() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "compressingClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + Stream.concat(Arrays.stream(BAD_HOSTS), Arrays.stream(getDefaultHosts())).toArray(HttpHost[]::new), + restClientSettingsWithMetrics(Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).build()) + ); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + + // Index + AppData appData = new AppData(); + appData.setMsg(generatePayload(500)); + Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); + DistributionSummary indexRequestPayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).get(0); + assertTrue(500 <= indexRequestPayload.max() && indexRequestPayload.max() <= 600); + assertEquals(1, indexRequestPayload.count()); + DistributionSummary indexResponsePayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).get(0); + assertEquals(1, indexResponsePayload.count()); + + // Update + appData.setMsg(generatePayload(5 * 1024)); + CompletableFuture> updateFuture = client.update(b -> b.doc(appData).index(index).id(id), AppData.class); + await().atMost(Duration.ofSeconds(2)).until(updateFuture::isDone); + DistributionSummary updateRequestPayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + ).get(0); + assertTrue(5 * 1024 <= updateRequestPayload.max() && updateRequestPayload.max() <= (5 * 1024 + 100)); + assertEquals(1, updateRequestPayload.count()); + DistributionSummary updateResponsePayload = (DistributionSummary) findMeter( + getStubRegistry(), + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), UpdateRequest.class.getSimpleName()) + ).get(0); + assertTrue(updateResponsePayload.max() > 0); + assertEquals(1, updateResponsePayload.count()); + } + + public void testNoRequestPayloadSizeCompressionEnabled() throws IOException { + TelemetryMetricsManager.addRegistry(getStubRegistry()); + String clientID = "compressingClient"; + OpenSearchAsyncClient client = getCustomAsyncClient( + getDefaultHosts(), + restClientSettingsWithMetrics( + Settings.builder().put(CUSTOM_CLIENT_ID, clientID).put(ALL_METRIC_GROUP_SETTING).put(COMPRESSION_ENABLED, true).build() + ) + ); + String index = "index_metrics"; + String id = UUID.randomUUID().toString(); + AppData appData = new AppData(); + appData.setMsg(generatePayload(5 * 1024)); + Future indexFuture = client.index(b -> b.index(index).id(id).document(appData).refresh(Refresh.True)); + await().atMost(Duration.ofSeconds(2)).until(indexFuture::isDone); + assertTrue( + findMeter( + getStubRegistry(), + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.of(MetricTag.CLIENT_ID.toString(), clientID, MetricTag.REQUEST.toString(), IndexRequest.class.getSimpleName()) + ).isEmpty() + ); + } + + private List findMeter(MeterRegistry registry, String meterName, Tags tags) { + return registry.getMeters().stream().filter(meter -> { + if (meter.getId().getName().equals(meterName)) { + if (tags != null && !tags.equals(Tags.empty())) { + for (Tag tag : tags) { + String tagValue = meter.getId().getTag(tag.getKey()); + if (tagValue == null || !tagValue.equalsIgnoreCase(tag.getValue())) { + return false; + } + } + } + return true; + } + return false; + }).collect(Collectors.toList()); + } + + private String generatePayload(int numBytes) { + if (numBytes < 0) { + throw new IllegalArgumentException("Negative payload size"); + } + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numBytes; i++) { + builder.append("o"); + } + return builder.toString(); + } +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java index e4825420a8..a61bafc419 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchJavaClientTestCase.java @@ -8,6 +8,8 @@ package org.opensearch.client.opensearch.integTest; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +36,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.opensearch.IOUtils; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ExpandWildcard; import org.opensearch.client.opensearch.cat.IndicesResponse; @@ -42,6 +45,7 @@ import org.opensearch.client.opensearch.indices.DeleteIndexRequest; import org.opensearch.client.opensearch.nodes.NodesInfoResponse; import org.opensearch.client.opensearch.nodes.info.NodeInfo; +import org.opensearch.client.transport.client_metrics.TelemetryMetricsManager; import org.opensearch.common.settings.Settings; import org.opensearch.test.rest.OpenSearchRestTestCase; @@ -53,12 +57,20 @@ public abstract class OpenSearchJavaClientTestCase extends OpenSearchRestTestCas ".plugins-ml-model-group", ".ql-datasources" ); + + public static String METRICS_ENABLED = "metrics.enabled"; + public static String CUSTOM_CLIENT_ID = "custom.client.id"; + public static String METRICS_GROUPS = "metrics.groups"; + public static String COMPRESSION_ENABLED = "compression.enabled"; + private static final List customAsyncClients = new ArrayList<>(); private static OpenSearchClient javaClient; private static OpenSearchClient adminJavaClient; private static TreeSet nodeVersions; private static List clusterHosts; + private MeterRegistry stubRegistry = new SimpleMeterRegistry(); + @Before public void initJavaClient() throws IOException { if (javaClient == null) { @@ -88,6 +100,21 @@ public void initJavaClient() throws IOException { } } + protected HttpHost[] getDefaultHosts() { + if (clusterHosts != null) { + return clusterHosts.toArray(new HttpHost[clusterHosts.size()]); + } + return new HttpHost[0]; + } + + protected Settings restClientSettingsWithMetrics(Settings additionalMetricsSettings) { + Settings defaultSettings = Settings.builder().put(restClientSettings()).put(METRICS_ENABLED, true).build(); + if (additionalMetricsSettings == null || additionalMetricsSettings.isEmpty()) { + return defaultSettings; + } + return Settings.builder().put(defaultSettings).put(additionalMetricsSettings).build(); + } + @Override protected String getProtocol() { return isHttps() ? "https" : "http"; @@ -147,6 +174,12 @@ protected static OpenSearchClient adminJavaClient() { return adminJavaClient; } + protected synchronized OpenSearchAsyncClient getCustomAsyncClient(HttpHost[] hosts, Settings clientSettings) throws IOException { + OpenSearchAsyncClient customAsyncClient = buildAsyncJavaClient(clientSettings, hosts); + customAsyncClients.add(customAsyncClient); + return customAsyncClient; + } + protected String getTestRestCluster() { String cluster = System.getProperty("tests.rest.cluster"); if (cluster == null) { @@ -155,6 +188,10 @@ protected String getTestRestCluster() { return cluster; } + public MeterRegistry getStubRegistry() { + return stubRegistry; + } + @After protected void wipeAllOSIndices() throws IOException { // wipe all data streams first, otherwise deleting backing indices will encounter exception @@ -169,6 +206,8 @@ protected void wipeAllOSIndices() throws IOException { adminJavaClient().indices().delete(new DeleteIndexRequest.Builder().index(index.index()).build()); } } + TelemetryMetricsManager.removeRegistry(stubRegistry); + cleanUpCustomAsyncClients(); } @AfterClass @@ -194,6 +233,17 @@ protected boolean preserveIndicesUponCompletion() { return true; } + private synchronized void cleanUpCustomAsyncClients() { + if (!customAsyncClients.isEmpty()) { + for (OpenSearchAsyncClient client : customAsyncClients) { + try { + IOUtils.closeQueitly(client._transport()); + } catch (Exception ignored) {} + } + customAsyncClients.clear(); + } + } + protected Version getServerVersion() throws IOException { final InfoResponse info = javaClient().info(); diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java index 22b6db5de1..8d52bb5a47 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/OpenSearchTransportSupport.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.util.Optional; import org.apache.hc.core5.http.HttpHost; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.common.settings.Settings; @@ -24,5 +25,9 @@ default OpenSearchClient buildJavaClient(Settings settings, HttpHost[] hosts) th return new OpenSearchClient(buildTransport(settings, hosts)); } + default OpenSearchAsyncClient buildAsyncJavaClient(Settings settings, HttpHost[] hosts) throws IOException { + return new OpenSearchAsyncClient(buildTransport(settings, hosts)); + } + OpenSearchTransport buildTransport(Settings settings, HttpHost[] hosts) throws IOException; } diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java new file mode 100644 index 0000000000..40755db018 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/ApacheHttpClient5TransportTest.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.opensearch.integTest.httpclient5; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.IOException; +import java.util.Optional; +import org.apache.hc.core5.http.HttpHost; +import org.junit.Test; +import org.opensearch.client.transport.client_metrics.MetricOptions; +import org.opensearch.client.transport.client_metrics.MetricTag; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; + +public class ApacheHttpClient5TransportTest { + @Test + public void testInitWithMetricsOptions() throws IOException { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + String clientID = "testClient"; + HttpHost host = new HttpHost("localhost", 9200); + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .build(); + ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(host); + builder.setMetricOptions(metricOptions); + try (ApacheHttpClient5Transport transport = builder.build()) { + assertTrue(transport.isMetricsEnabled()); + assertArrayEquals(new double[] { 0.90, 0.8, 0.5 }, transport.getMeterOptions().getPercentiles(), 0); + Optional clientIDTag = transport.getMeterOptions() + .getCommonTags() + .stream() + .filter(tag -> tag.getKey().equals(MetricTag.CLIENT_ID.toString())) + .findFirst(); + assertFalse(clientIDTag.isEmpty()); + assertEquals(clientID, clientIDTag.get().getValue()); + assertEquals(clientID, transport.getClientID()); + } + } +} diff --git a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java index 617304f83b..eb7e91bf83 100644 --- a/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java +++ b/java-client/src/test/java11/org/opensearch/client/opensearch/integTest/httpclient5/HttpClient5TransportSupport.java @@ -8,13 +8,19 @@ package org.opensearch.client.opensearch.integTest.httpclient5; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.COMPRESSION_ENABLED; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.CUSTOM_CLIENT_ID; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.METRICS_ENABLED; +import static org.opensearch.client.opensearch.integTest.OpenSearchJavaClientTestCase.METRICS_GROUPS; import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_PATH_PREFIX; import static org.opensearch.test.rest.OpenSearchRestTestCase.CLIENT_SOCKET_TIMEOUT; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.util.List; import java.util.Map; import java.util.Optional; import javax.net.ssl.SSLContext; @@ -36,6 +42,8 @@ import org.apache.hc.core5.util.Timeout; import org.opensearch.client.opensearch.integTest.OpenSearchTransportSupport; import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.client_metrics.MetricGroup; +import org.opensearch.client.transport.client_metrics.MetricOptions; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -108,5 +116,26 @@ public TlsDetails create(final SSLEngine sslEngine) { if (settings.hasValue(CLIENT_PATH_PREFIX)) { builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); } + if (settings.hasValue(METRICS_ENABLED) && settings.getAsBoolean(METRICS_ENABLED, false)) { + MetricOptions.MetricOptionsBuilder metricOptionsBuilder = MetricOptions.builder() + .setMeterRegistry(new SimpleMeterRegistry()) + .setPercentiles(0.95) + .setMetricsEnabled(true); + if (settings.hasValue(CUSTOM_CLIENT_ID)) { + metricOptionsBuilder.setClientId(settings.get(CUSTOM_CLIENT_ID)); + } + if (settings.hasValue(METRICS_GROUPS)) { + List metricGroupStrList = settings.getAsList(METRICS_GROUPS); + MetricGroup[] metricGroups = new MetricGroup[metricGroupStrList.size()]; + for (int j = 0; j < metricGroupStrList.size(); j++) { + metricGroups[j] = MetricGroup.valueOf(metricGroupStrList.get(j)); + } + metricOptionsBuilder.setAdditionalMetricGroups(metricGroups); + } + builder.setMetricOptions(metricOptionsBuilder.build()); + } + if (settings.hasValue(COMPRESSION_ENABLED)) { + builder.setCompressionEnabled(settings.getAsBoolean(COMPRESSION_ENABLED, false)); + } } } diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java new file mode 100644 index 0000000000..8ae3942792 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MeterOptionsTest.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import io.micrometer.core.instrument.Tags; +import java.util.EnumSet; +import org.junit.Test; + +public class MeterOptionsTest { + @Test + public void testMeterOptions() { + MeterOptions meterOptions = new MeterOptions( + new double[] { 0.80, 0.85 }, + Tags.of("test1", "test2"), + EnumSet.of(MetricTag.CLIENT_ID) + ); + assertArrayEquals(new double[] { 0.80, 0.85 }, meterOptions.getPercentiles(), 0); + assertEquals(Tags.of("test1", "test2"), meterOptions.getCommonTags()); + assertEquals(EnumSet.of(MetricTag.CLIENT_ID), meterOptions.getExcludedTagNames()); + } + + @Test + public void testMeterOptionsNoNull() { + MeterOptions meterOptions = new MeterOptions(null, null, null); + assertArrayEquals(MetricConstants.DEFAULT_PERCENTILES, meterOptions.getPercentiles(), 0.0); + assertEquals(Tags.empty(), meterOptions.getCommonTags()); + assertEquals(MetricConstants.DEFAULT_EXCLUDED_TAGS, meterOptions.getExcludedTagNames()); + } +} diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java new file mode 100644 index 0000000000..c404e8986c --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/MetricOptionsTest.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_ADDITIONAL_METRIC_GROUPS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_EXCLUDED_TAGS; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_PERCENTILES; +import static org.opensearch.client.transport.client_metrics.MetricConstants.DEFAULT_REGISTRY; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.util.EnumSet; +import org.junit.Test; + +public class MetricOptionsTest { + @Test + public void testBuildMetricOptions() { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + String clientID = "testClient"; + MetricOptions metricOptions = MetricOptions.builder() + .setMetricsEnabled(true) + .setMeterRegistry(meterRegistry) + .setClientId(clientID) + .setPercentiles(0.90, 0.8, 0.5) + .setExcludedTags(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION) + .setAdditionalMetricGroups(MetricGroup.NETWORK_DETAILS) + .build(); + assertTrue(metricOptions.isMetricsEnabled()); + assertEquals(meterRegistry, metricOptions.getMeterRegistry()); + assertEquals(clientID, metricOptions.getClientId()); + assertArrayEquals(new double[] { 0.90, 0.8, 0.5 }, metricOptions.getPercentiles(), 0); + assertEquals(EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.STATUS_CODE_OR_EXCEPTION), metricOptions.getExcludedTags()); + assertEquals(EnumSet.of(MetricGroup.NETWORK_DETAILS), metricOptions.getMetricGroups()); + } + + @Test + public void testClientIDAlwaysNotNullOrEmpty() { + MetricOptions metricOptions = MetricOptions.builder().build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + + metricOptions = MetricOptions.builder().setClientId(null).build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + + metricOptions = MetricOptions.builder().setClientId("").build(); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + } + + @Test + public void testDefaults() { + MetricOptions metricOptions = MetricOptions.builder().build(); + validateDefaults(metricOptions); + + metricOptions = MetricOptions.builder().setMeterRegistry(null).setClientId(null).build(); + validateDefaults(metricOptions); + } + + private void validateDefaults(MetricOptions metricOptions) { + assertFalse(metricOptions.isMetricsEnabled()); + assertEquals(DEFAULT_REGISTRY, metricOptions.getMeterRegistry()); + assertNotNull(metricOptions.getClientId()); + assertFalse(metricOptions.getClientId().isEmpty()); + assertArrayEquals(DEFAULT_PERCENTILES, metricOptions.getPercentiles(), 0); + assertEquals(DEFAULT_EXCLUDED_TAGS, metricOptions.getExcludedTags()); + assertEquals(DEFAULT_ADDITIONAL_METRIC_GROUPS, metricOptions.getMetricGroups()); + } +} diff --git a/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java new file mode 100644 index 0000000000..5482573491 --- /dev/null +++ b/java-client/src/test/java11/org/opensearch/client/transport/client_metrics/TelemetryMetricsManagerTest.java @@ -0,0 +1,288 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.client_metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.IOException; +import java.net.http.HttpConnectTimeoutException; +import java.time.Duration; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.client.transport.TransportException; + +public class TelemetryMetricsManagerTest { + private MeterRegistry stubRegistry; + + @Before + public void setUp() { + stubRegistry = new SimpleMeterRegistry(); + TelemetryMetricsManager.addRegistry(stubRegistry); + } + + @After + public void cleanUp() { + TelemetryMetricsManager.removeRegistry(stubRegistry); + } + + @Test + public void testRecordingRequestMetric() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + Duration totalExecLatency = Duration.ofMillis(100); + Duration errorLatency = Duration.ofMillis(1); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(totalExecLatency); + context.setStatusCode(200); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new HttpConnectTimeoutException("error"), -1, errorLatency) + ); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost2", null, 200, totalExecLatency.minus(errorLatency))); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer requestMeter = (Timer) meter.get(); + assertEquals(requestName, requestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("200", requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost,localhost2", requestMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", requestMeter.getId().getTag("CommonTag")); + assertEquals(totalExecLatency.toMillis(), requestMeter.totalTime(TimeUnit.MILLISECONDS), 0); + assertEquals(1, requestMeter.count()); + + // Verify NETWORK_REQUEST meter + List meters = findMeters( + stubRegistry, + MetricName.NETWORK_REQUEST.toString(), + Tags.of(MetricTag.REQUEST.toString(), requestName) + ); + Set hosts = new HashSet<>(Arrays.asList("localhost", "localhost2")); + assertEquals(2, meters.size()); + for (Meter networkRequestMeter : meters) { + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + assertEquals(1, ((Timer) networkRequestMeter).count()); + + String host = networkRequestMeter.getId().getTag(MetricTag.HOST.toString()); + assertNotNull(host); + hosts.remove(host); + String status = networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString()); + if (host.equals("localhost")) { + assertEquals("ClientError-HttpConnectTimeoutException", status); + assertEquals(errorLatency.toMillis(), ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), 0); + } else { + assertEquals("200", status); + assertEquals( + totalExecLatency.minus(errorLatency).toMillis(), + ((Timer) networkRequestMeter).totalTime(TimeUnit.MILLISECONDS), + 0 + ); + } + } + assertTrue("There are unaccounted hosts", hosts.isEmpty()); + } + + @Test + public void testRecordingRequestMetricWithExceptionNoStatus() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(Duration.ofMillis(50)); + context.setThrowable(new IOException("IO Errors")); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new IOException("IO Errors"), -1, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer requestMeter = (Timer) meter.get(); + assertEquals(requestName, requestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("ClientError-IOException", requestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost", requestMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", requestMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + meter = findMeter(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer networkRequestMeter = (Timer) meter.get(); + assertEquals("localhost", networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("ClientError-IOException", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + } + + @Test + public void testRecordingRequestMetricWithBothStatusAndException() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + context.setRequestExecutionTime(Duration.ofMillis(150)); + context.setStatusCode(409); + context.setThrowable(new TransportException("Error")); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost", new TransportException("Error"), 409, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer testMeter = (Timer) meter.get(); + assertEquals(requestName, testMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", testMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("localhost", testMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", testMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + meter = findMeter(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer networkRequestMeter = (Timer) meter.get(); + assertEquals("localhost", networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + } + + @Test + public void testRecordingRequestMetricWithTagExclusion() { + String requestName = "testRequest"; + RequestMetricContext context = new RequestMetricContext(); + MeterOptions meterOptions = new MeterOptions( + new double[] { 0.80, 0.85 }, + Tags.of("CommonTag", "CommonTagValue"), + EnumSet.of(MetricTag.HOST_CONTACTED, MetricTag.HOST) + ); + context.setRequestExecutionTime(Duration.ofMillis(150)); + context.setStatusCode(409); + context.setThrowable(new TransportException("Error")); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost", null, 409, Duration.ofMillis(1))); + context.addNetworkRequestContext(new NetworkRequestMetricContext("localhost1", null, 409, Duration.ofMillis(1))); + context.addNetworkRequestContext( + new NetworkRequestMetricContext("localhost2", new TransportException("Error"), 409, Duration.ofMillis(1)) + ); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, context, MetricGroup.ALL); + + // Verify REQUEST meter + Optional meter = findMeter(stubRegistry, MetricName.REQUEST.toString(), Tags.of(MetricTag.REQUEST.toString(), requestName)); + assertFalse(meter.isEmpty()); + Timer testMeter = (Timer) meter.get(); + assertEquals(requestName, testMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("409", testMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertNull(testMeter.getId().getTag(MetricTag.HOST_CONTACTED.toString())); + assertEquals("CommonTagValue", testMeter.getId().getTag("CommonTag")); + + // Verify NETWORK_REQUEST meter + List meters = findMeters(stubRegistry, MetricName.NETWORK_REQUEST.toString(), Tags.of("CommonTag", "CommonTagValue")); + assertEquals(1, meters.size()); + Timer networkRequestMeter = (Timer) meters.get(0); + assertEquals(requestName, networkRequestMeter.getId().getTag(MetricTag.REQUEST.toString())); + assertEquals("CommonTagValue", networkRequestMeter.getId().getTag("CommonTag")); + assertEquals("409", networkRequestMeter.getId().getTag(MetricTag.STATUS_CODE_OR_EXCEPTION.toString())); + assertNull(networkRequestMeter.getId().getTag(MetricTag.HOST.toString())); + } + + @Test + public void testNoPayLoadSize() { + NetworkRequestMetricContext context = new NetworkRequestMetricContext("temp", null, 0, Duration.ZERO); + TelemetryMetricsManager.recordPayloadSizes(context, Tags.empty()); + assertTrue(findMeter(stubRegistry, MetricName.REQUEST_PAYLOAD_SIZE.toString(), Tags.empty()).isEmpty()); + assertTrue(findMeter(stubRegistry, MetricName.RESPONSE_PAYLOAD_SIZE.toString(), Tags.empty()).isEmpty()); + } + + @Test + public void testMultipleNetworkContextsWithServerResponses() { + String requestName = "testRequest"; + MeterOptions meterOptions = new MeterOptions(new double[] { 0.80, 0.85 }, Tags.of("CommonTag", "CommonTagValue"), null); + + NetworkRequestMetricContext networkRequestMetricContextFail = new NetworkRequestMetricContext( + "host1", + null, + 500, + Duration.ofMillis(10) + ); + networkRequestMetricContextFail.setRequestPayloadSize(500); + + NetworkRequestMetricContext networkRequestMetricContextSuccess = new NetworkRequestMetricContext( + "host1", + null, + 200, + Duration.ofMillis(20) + ); + networkRequestMetricContextSuccess.setRequestPayloadSize(500); + networkRequestMetricContextSuccess.setResponsePayloadSize(200); + + RequestMetricContext requestMetricContext = new RequestMetricContext(); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContextFail); + requestMetricContext.addNetworkRequestContext(networkRequestMetricContextSuccess); + requestMetricContext.setRequestExecutionTime(Duration.ofMillis(30)); + + TelemetryMetricsManager.recordRequestMetrics(requestName, meterOptions, requestMetricContext, MetricGroup.ALL); + + DistributionSummary requestPayloadMeter = (DistributionSummary) findMeter( + stubRegistry, + MetricName.REQUEST_PAYLOAD_SIZE.toString(), + Tags.empty() + ).get(); + assertEquals(2, requestPayloadMeter.count()); + DistributionSummary responsePayloadMeter = (DistributionSummary) findMeter( + stubRegistry, + MetricName.RESPONSE_PAYLOAD_SIZE.toString(), + Tags.empty() + ).get(); + assertEquals(1, responsePayloadMeter.count()); + } + + private List findMeters(MeterRegistry registry, String meterName, Tags tags) { + return registry.getMeters().stream().filter(meter -> { + if (meter.getId().getName().equals(meterName)) { + if (tags != null && !tags.equals(Tags.empty())) { + for (Tag tag : tags) { + String tagValue = meter.getId().getTag(tag.getKey()); + if (tagValue == null || !tagValue.equalsIgnoreCase(tag.getValue())) { + return false; + } + } + } + return true; + } + return false; + }).collect(Collectors.toList()); + } + + private Optional findMeter(MeterRegistry registry, String meterName, Tags tags) { + return findMeters(registry, meterName, tags).stream().findFirst(); + } +}