Skip to content

Disk buffering api changes #2084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

public enum SignalType {
SPAN,
LOG,
METRIC
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import javax.annotation.Nullable;

/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */
public interface ExporterCallback {
/**
* Called when an export to disk operation succeeded.
*
* @param type The type of signal associated to the exporter.
*/
void onExportSuccess(SignalType type);

/**
* Called when an export to disk operation failed.
*
* @param type The type of signal associated to the exporter.
* @param error Optional - provides more information of why the operation failed.
*/
void onExportError(SignalType type, @Nullable Throwable error);

/**
* Called when the exporter is closed.
*
* @param type The type of signal associated to the exporter.
*/
void onShutdown(SignalType type);

static ExporterCallback noop() {
return NoopExporterCallback.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import javax.annotation.Nullable;

final class NoopExporterCallback implements ExporterCallback {
static final NoopExporterCallback INSTANCE = new NoopExporterCallback();

private NoopExporterCallback() {}

@Override
public void onExportSuccess(SignalType type) {}

@Override
public void onExportError(SignalType type, @Nullable Throwable error) {}

@Override
public void onShutdown(SignalType type) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Internal utility for common export to disk operations across all exporters. */
final class SignalStorageExporter<T> {
private final SignalStorage<T> storage;
private final ExporterCallback callback;
private final Duration writeTimeout;
private final SignalType type;

public SignalStorageExporter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have unit test coverage for this new class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just added unit tests for it.

SignalStorage<T> storage, ExporterCallback callback, Duration writeTimeout, SignalType type) {
this.storage = storage;
this.callback = callback;
this.writeTimeout = writeTimeout;
this.type = type;
}

public CompletableResultCode exportToStorage(Collection<T> items) {
CompletableFuture<WriteResult> future = storage.write(items);
try {
WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (operation.isSuccessful()) {
callback.onExportSuccess(type);
return CompletableResultCode.ofSuccess();
}

Throwable error = operation.getError();
callback.onExportError(type, error);
if (error != null) {
return CompletableResultCode.ofExceptionalFailure(error);
}
return CompletableResultCode.ofFailure();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
callback.onExportError(type, e);
return CompletableResultCode.ofExceptionalFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.SignalType;
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.Collection;

/** Exporter that stores spans into disk. */
public final class SpanToDiskExporter implements SpanExporter {
private final SignalStorageExporter<SpanData> storageExporter;
private final ExporterCallback callback;
private static final SignalType TYPE = SignalType.SPAN;

private SpanToDiskExporter(
SignalStorageExporter<SpanData> storageExporter, ExporterCallback callback) {
this.storageExporter = storageExporter;
this.callback = callback;
}

public Builder builder(SignalStorage.Span storage) {
return new Builder(storage);
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return storageExporter.exportToStorage(spans);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
callback.onShutdown(TYPE);
return CompletableResultCode.ofSuccess();
}

public static final class Builder {
private final SignalStorage.Span storage;
private ExporterCallback callback = ExporterCallback.noop();
private Duration writeTimeout = Duration.ofSeconds(10);

@CanIgnoreReturnValue
public Builder setExporterCallback(ExporterCallback value) {
callback = value;
return this;
}

@CanIgnoreReturnValue
public Builder setWriteTimeout(Duration value) {
writeTimeout = value;
return this;
}

public SpanToDiskExporter build() {
SignalStorageExporter<SpanData> storageExporter =
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
return new SpanToDiskExporter(storageExporter, callback);
}

private Builder(SignalStorage.Span storage) {
this.storage = storage;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

@ParametersAreNonnullByDefault
package io.opentelemetry.contrib.disk.buffering.exporters;

import javax.annotation.ParametersAreNonnullByDefault;
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.storage;

import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

/** Default storage implementation where items are stored in multiple protobuf files. */
public final class FileSpanStorage implements SignalStorage.Span {

@Override
public CompletableFuture<WriteResult> write(Collection<SpanData> items) {
throw new UnsupportedOperationException("For next PR");
}

@Override
public CompletableFuture<WriteResult> clear() {
throw new UnsupportedOperationException("For next PR");
}

@Override
public void close() {
throw new UnsupportedOperationException("For next PR");
}

@Nonnull
@Override
public Iterator<Collection<SpanData>> iterator() {
throw new UnsupportedOperationException("For next PR");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage;

import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* Allows writing and iterating over written signal items.
*
* @param <T> The type of signal data supported.
*/
public interface SignalStorage<T> extends Iterable<Collection<T>>, Closeable {

/**
* Stores signal items.
*
* @param items The items to be stored.
* @return A future with {@link WriteResult}.
*/
CompletableFuture<WriteResult> write(Collection<T> items);

/**
* Removes all the previously stored items.
*
* @return A future with {@link WriteResult}.
*/
CompletableFuture<WriteResult> clear();

/**
* Abstraction for Spans. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface Span extends SignalStorage<SpanData> {}

/**
* Abstraction for Logs. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface LogRecord extends SignalStorage<LogRecordData> {}

/**
* Abstraction for Metrics. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface Metric extends SignalStorage<MetricData> {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage.result;

import javax.annotation.Nullable;

final class DefaultWriteResult implements WriteResult {
private final boolean successful;
@Nullable private final Throwable error;

DefaultWriteResult(boolean successful, @Nullable Throwable error) {
this.successful = successful;
this.error = error;
}

@Override
public boolean isSuccessful() {
return successful;
}

@Nullable
@Override
public Throwable getError() {
return error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage.result;

import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import javax.annotation.Nullable;

/** The result of a {@link SignalStorage} write operation. */
public interface WriteResult {
/**
* Whether the operation succeeded or not.
*
* @return `true` if the items have been successfully stored, `false` otherwise.
*/
boolean isSuccessful();

/**
* Provides details of why the operation failed.
*
* @return The error (if any) for the failed operation. It must be null for successful operations.
*/
@Nullable
Throwable getError();

static WriteResult create(boolean successful, @Nullable Throwable error) {
return new DefaultWriteResult(successful, error);
}
}
Loading
Loading