Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0064a0b
Creating Request
LikeTheSalad May 29, 2025
84bd658
Adding periodic task executor
LikeTheSalad May 30, 2025
194a0ee
Adding RequestService and its http implementation
LikeTheSalad May 30, 2025
815b7c5
Adding tests
LikeTheSalad May 30, 2025
bb084c3
Updating HttpRequestService
LikeTheSalad May 30, 2025
61fb2ee
Creating RetryAfterParser
LikeTheSalad May 30, 2025
acce15e
Adding suggested delay
LikeTheSalad May 30, 2025
3dc6f12
Clean up
LikeTheSalad May 30, 2025
5657876
Spotless
LikeTheSalad May 30, 2025
d94ff62
Applying PR review suggestions
LikeTheSalad Jun 4, 2025
37bf5a9
Applying PR review suggestions
LikeTheSalad Jun 4, 2025
91f876a
Applying PR review suggestions
LikeTheSalad Jun 4, 2025
84d17d8
Applying PR review suggestions
LikeTheSalad Jun 4, 2025
8625f8f
Applying PR review suggestions
LikeTheSalad Jun 4, 2025
70e7d95
Avoid using SimpleDateFormat
LikeTheSalad Jun 4, 2025
b190ad1
Validating when retry-after time is older than current one
LikeTheSalad Jun 4, 2025
7afe527
Updating javadoc based on PR review
LikeTheSalad Jun 4, 2025
51c9e08
Using daemon threads for opamp client, as mentioned in pr comments
LikeTheSalad Jun 4, 2025
9a4e483
Update opamp-client/src/main/java/io/opentelemetry/opamp/client/inter…
LikeTheSalad Jun 4, 2025
b299d2c
Calling completable future when there's no response body
LikeTheSalad Jun 4, 2025
71e6710
Updating retry-after pattern for seconds
LikeTheSalad Jun 4, 2025
451812e
Simplifying retry-after parser patterns
LikeTheSalad Jun 4, 2025
3c3830b
Created BodyWriter to use in HttpSender
LikeTheSalad Jun 4, 2025
260cbc4
Removing nested try with resources as suggested in PR reviews
LikeTheSalad Jun 5, 2025
98d1b7f
Clean up
LikeTheSalad Jun 5, 2025
08c3bc2
Clean up
LikeTheSalad Jun 5, 2025
6d3baed
Updating tests
LikeTheSalad Jun 5, 2025
478e8c4
Updating tests
LikeTheSalad Jun 5, 2025
77ae328
Calling callback.onConnectionFailed
LikeTheSalad Jun 5, 2025
17ac9d6
Removing executor for HttpRequestServiceTest
LikeTheSalad Jun 6, 2025
c272f97
Updating tests
LikeTheSalad Jun 6, 2025
b709a81
Adding initial task validation
LikeTheSalad Jun 6, 2025
7b21419
Simplifying HttpRequestService as suggested in the PR reviews
LikeTheSalad Jun 6, 2025
f49fa67
Spotless
LikeTheSalad Jun 6, 2025
73cb995
Avoiding verifying task cancel call in tests
LikeTheSalad Jun 11, 2025
3bb8d7b
Ensuring only one task is executed at a time and it always schedules …
LikeTheSalad Jun 11, 2025
af0fc20
Clean up
LikeTheSalad Jun 11, 2025
0a62b94
Applying suggested changes in review
LikeTheSalad Jun 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opamp-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ description = "Client implementation of the OpAMP spec."
otelJava.moduleName.set("io.opentelemetry.contrib.opamp.client")

dependencies {
implementation("com.squareup.okhttp3:okhttp")
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("org.mockito:mockito-inline")
}

val opampReleaseInfo = tasks.register<Download>("opampLastReleaseInfo") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.http;

public class HttpErrorException extends Exception {
private final int errorCode;

private static final long serialVersionUID = 1L;

public int getErrorCode() {
return errorCode;
}

/**
* Constructs an HTTP error related exception.
*
* @param errorCode The HTTP error code.
* @param message The HTTP error message associated with the code.
*/
public HttpErrorException(int errorCode, String message) {
super(message);
this.errorCode = errorCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.http;

import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public interface HttpSender {

CompletableFuture<Response> send(Consumer<OutputStream> writer, int contentLength);

interface Response extends Closeable {
int statusCode();

String statusMessage();

InputStream bodyInputStream();

String getHeader(String name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.http;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okio.BufferedSink;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class OkHttpSender implements HttpSender {
private final OkHttpClient client;
private final String url;

public static OkHttpSender create(String url) {
return create(url, new OkHttpClient());
}

public static OkHttpSender create(String url, OkHttpClient client) {
return new OkHttpSender(url, client);
}

private static final String CONTENT_TYPE = "application/x-protobuf";
private static final MediaType MEDIA_TYPE = MediaType.parse(CONTENT_TYPE);

private OkHttpSender(String url, OkHttpClient client) {
this.url = url;
this.client = client;
}

@Override
public CompletableFuture<Response> send(Consumer<OutputStream> writer, int contentLength) {
CompletableFuture<Response> future = new CompletableFuture<>();
okhttp3.Request.Builder builder = new okhttp3.Request.Builder().url(url);
builder.addHeader("Content-Type", CONTENT_TYPE);

RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE);
builder.post(body);

client
.newCall(builder.build())
.enqueue(
new Callback() {
@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) {
if (response.isSuccessful()) {
if (response.body() != null) {
future.complete(new OkHttpResponse(response));
}
} else {
future.completeExceptionally(
new HttpErrorException(response.code(), response.message()));
}
}

@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
future.completeExceptionally(e);
}
});

return future;
}

private static class OkHttpResponse implements Response {
private final okhttp3.Response response;

private OkHttpResponse(okhttp3.Response response) {
if (response.body() == null) {
throw new IllegalStateException();
}
this.response = response;
}

@Override
public int statusCode() {
return response.code();
}

@Override
public String statusMessage() {
return response.message();
}

@Override
public InputStream bodyInputStream() {
return response.body().byteStream();
}

@Override
public String getHeader(String name) {
return response.headers().get(name);
}

@Override
public void close() {
response.close();
}
}

private static class RawRequestBody extends RequestBody {
private final Consumer<OutputStream> writer;
private final int contentLength;
private final MediaType contentType;

private RawRequestBody(
Consumer<OutputStream> writer, int contentLength, MediaType contentType) {
this.writer = writer;
this.contentLength = contentLength;
this.contentType = contentType;
}

@Nullable
@Override
public MediaType contentType() {
return contentType;
}

@Override
public long contentLength() {
return contentLength;
}

@Override
public void writeTo(@NotNull BufferedSink bufferedSink) {
writer.accept(bufferedSink.outputStream());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.connectivity.http;

import io.opentelemetry.opamp.client.internal.tools.SystemTime;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.Optional;
import java.util.regex.Pattern;

public final class RetryAfterParser {
private final SystemTime systemTime;
public static final Pattern SECONDS_PATTERN = Pattern.compile("\\d+");
public static final Pattern DATE_PATTERN =
Pattern.compile(
"^([A-Za-z]{3}, [0-3][0-9] [A-Za-z]{3} [0-9]{4} [0-2][0-9]:[0-5][0-9]:[0-5][0-9] GMT)$");
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);

public static RetryAfterParser getInstance() {
return new RetryAfterParser(SystemTime.getInstance());
}

RetryAfterParser(SystemTime systemTime) {
this.systemTime = systemTime;
}

public Optional<Duration> tryParse(String value) {
Duration duration = null;
if (SECONDS_PATTERN.matcher(value).matches()) {
duration = Duration.ofSeconds(Long.parseLong(value));
} else if (DATE_PATTERN.matcher(value).matches()) {
long difference = toMilliseconds(value) - systemTime.getCurrentTimeMillis();
if (difference > 0) {
duration = Duration.ofMillis(difference);
}
}
return Optional.ofNullable(duration);
}

private static long toMilliseconds(String value) {
return ZonedDateTime.parse(value, DATE_FORMAT).toInstant().toEpochMilli();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.request;

import com.google.auto.value.AutoValue;
import opamp.proto.AgentToServer;

/** Wrapper class for "AgentToServer" request body. */
@AutoValue
public abstract class Request {
public abstract AgentToServer getAgentToServer();

public static Request create(AgentToServer agentToServer) {
return new AutoValue_Request(agentToServer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.request.delay;

import java.time.Duration;

/**
* A {@link PeriodicDelay} implementation that wants to accept delay time suggestions, as explained
* <a
* href="https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#throttling">here</a>,
* must implement this interface.
*/
public interface AcceptsDelaySuggestion {
void suggestDelay(Duration delay);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.request.delay;

import java.time.Duration;

final class FixedPeriodicDelay implements PeriodicDelay {
private final Duration duration;

public FixedPeriodicDelay(Duration duration) {
this.duration = duration;
}

@Override
public Duration getNextDelay() {
return duration;
}

@Override
public void reset() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.request.delay;

import java.time.Duration;

public interface PeriodicDelay {
static PeriodicDelay ofFixedDuration(Duration duration) {
return new FixedPeriodicDelay(duration);
}

Duration getNextDelay();

void reset();
}
Loading
Loading