Skip to content

Disk buffering api changes #2084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering;

public enum SignalType {
SPAN,
LOG,
METRIC
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import javax.annotation.Nullable;

/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */
public interface ExporterCallback {
/**
* Called when an export to disk operation succeeded.
*
* @param type The type of signal associated to the exporter.
*/
void onExportSuccess(SignalType type);

/**
* Called when an export to disk operation failed.
*
* @param type The type of signal associated to the exporter.
* @param error Optional - provides more information of why the operation failed.
*/
void onExportError(SignalType type, @Nullable Throwable error);

/**
* Called when the exporter is closed.
*
* @param type The type of signal associated to the exporter.
*/
void onShutdown(SignalType type);
Comment on lines +18 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal, but I often find it convenient to provide default empty methods for implementations that may not necessarily care about all of the interface methods. That way, implementers can choose to implement the ones that they are interested in, and it doesn't require many implementations to repeat empty bodies.

Copy link
Contributor Author

@LikeTheSalad LikeTheSalad Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I think I see what you mean, though considering that this interface is only for callback purposes, do you think that also applies here? And if so, how can we know what callbacks to set a default body to and which ones to enforce? Given that none of them are required to make the exporter work? - Usually, my concern with default impls, is that people might miss available options as the compiler won't complain if they're not implemented, unless users read the docs and take the time to scan through the lib's source code. It's not a big deal if we can provide defaults that work for everyone in case an implementation is used as part of the business logic, but for merely informative ones, such as this one, I'm not sure it applies.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO default method bodies should be avoided in interfaces that form public APIs. It makes discoverability harder, doesn't cleanly separate the interface from the implementation, and the behavior can become part of the public API's contract which can constrain changes in future.

Having said that, an empty body is less of a big deal to me than a body containing actual business logic would be


static ExporterCallback noop() {
return NoopExporterCallback.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import javax.annotation.Nullable;

final class NoopExporterCallback implements ExporterCallback {
static final NoopExporterCallback INSTANCE = new NoopExporterCallback();

private NoopExporterCallback() {}

@Override
public void onExportSuccess(SignalType type) {}

@Override
public void onExportError(SignalType type, @Nullable Throwable error) {}

@Override
public void onShutdown(SignalType type) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import io.opentelemetry.contrib.disk.buffering.SignalType;
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Internal utility for common export to disk operations across all exporters. */
final class SignalStorageExporter<T> {
private final SignalStorage<T> storage;
private final ExporterCallback callback;
private final Duration writeTimeout;
private final SignalType type;

public SignalStorageExporter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have unit test coverage for this new class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just added unit tests for it.

SignalStorage<T> storage, ExporterCallback callback, Duration writeTimeout, SignalType type) {
this.storage = storage;
this.callback = callback;
this.writeTimeout = writeTimeout;
this.type = type;
}

public CompletableResultCode exportToStorage(Collection<T> items) {
CompletableFuture<WriteResult> future = storage.write(items);
try {
WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (operation.isSuccessful()) {
callback.onExportSuccess(type);
return CompletableResultCode.ofSuccess();
}

Throwable error = operation.getError();
callback.onExportError(type, error);
if (error != null) {
return CompletableResultCode.ofExceptionalFailure(error);
}
return CompletableResultCode.ofFailure();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
callback.onExportError(type, e);
return CompletableResultCode.ofExceptionalFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.exporters;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.SignalType;
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.Collection;

/** Exporter that stores spans into disk. */
public final class SpanToDiskExporter implements SpanExporter {
private final SignalStorageExporter<SpanData> storageExporter;
private final ExporterCallback callback;
private static final SignalType TYPE = SignalType.SPAN;

private SpanToDiskExporter(
SignalStorageExporter<SpanData> storageExporter, ExporterCallback callback) {
this.storageExporter = storageExporter;
this.callback = callback;
}

public Builder builder(SignalStorage.Span storage) {
return new Builder(storage);
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return storageExporter.exportToStorage(spans);
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
callback.onShutdown(TYPE);
return CompletableResultCode.ofSuccess();
}

public static final class Builder {
private final SignalStorage.Span storage;
private ExporterCallback callback = ExporterCallback.noop();
private Duration writeTimeout = Duration.ofSeconds(10);

@CanIgnoreReturnValue
public Builder setExporterCallback(ExporterCallback value) {
callback = value;
return this;
}

@CanIgnoreReturnValue
public Builder setWriteTimeout(Duration value) {
writeTimeout = value;
return this;
}

public SpanToDiskExporter build() {
SignalStorageExporter<SpanData> storageExporter =
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
return new SpanToDiskExporter(storageExporter, callback);
}

private Builder(SignalStorage.Span storage) {
this.storage = storage;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

@ParametersAreNonnullByDefault
package io.opentelemetry.contrib.disk.buffering.exporters;

import javax.annotation.ParametersAreNonnullByDefault;
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.storage;

import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

/** Default storage implementation where items are stored in multiple protobuf files. */
public final class FileSpanStorage implements SignalStorage.Span {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably there will be analogs for the other signal types? Why not include those in this PR as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably there will be analogs for the other signal types?

Yes, that's the idea.

Why not include those in this PR as well?

I just wanted to put the new API design to test with this PR, not to fully implement it yet; this is to avoid wasting time in case some design changes were requested during the review. If there are no issues with the design itself, I'm planning to create a follow-up PR with all the implementations and refactorings needed to use the new approach.


@Override
public CompletableFuture<WriteResult> write(Collection<SpanData> items) {
throw new UnsupportedOperationException("For next PR");
}

@Override
public CompletableFuture<WriteResult> clear() {
throw new UnsupportedOperationException("For next PR");
}

@Override
public void close() {
throw new UnsupportedOperationException("For next PR");
}

@Nonnull
@Override
public Iterator<Collection<SpanData>> iterator() {
throw new UnsupportedOperationException("For next PR");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage;

import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* Allows writing and iterating over written signal items.
*
* @param <T> The type of signal data supported.
*/
public interface SignalStorage<T> extends Iterable<Collection<T>>, Closeable {

/**
* Stores signal items.
*
* @param items The items to be stored.
* @return A future with {@link WriteResult}.
*/
CompletableFuture<WriteResult> write(Collection<T> items);

/**
* Removes all the previously stored items.
*
* @return A future with {@link WriteResult}.
*/
CompletableFuture<WriteResult> clear();

/**
* Abstraction for Spans. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface Span extends SignalStorage<SpanData> {}

/**
* Abstraction for Logs. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface LogRecord extends SignalStorage<LogRecordData> {}

/**
* Abstraction for Metrics. Implementations should use this instead of {@link SignalStorage}
* directly.
*/
interface Metric extends SignalStorage<MetricData> {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage.result;

import javax.annotation.Nullable;

final class DefaultWriteResult implements WriteResult {
private final boolean successful;
@Nullable private final Throwable error;

DefaultWriteResult(boolean successful, @Nullable Throwable error) {
this.successful = successful;
this.error = error;
}

@Override
public boolean isSuccessful() {
return successful;
}

@Nullable
@Override
public Throwable getError() {
return error;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.storage.result;

import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
import javax.annotation.Nullable;

/** The result of a {@link SignalStorage} write operation. */
public interface WriteResult {
/**
* Whether the operation succeeded or not.
*
* @return `true` if the items have been successfully stored, `false` otherwise.
*/
boolean isSuccessful();

/**
* Provides details of why the operation failed.
*
* @return The error (if any) for the failed operation. It must be null for successful operations.
*/
@Nullable
Throwable getError();

static WriteResult create(boolean successful, @Nullable Throwable error) {
return new DefaultWriteResult(successful, error);
}
}
Loading
Loading