-
Notifications
You must be signed in to change notification settings - Fork 166
Disk buffering api changes #2084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
20772f5
524b826
c0f9fde
0fb2a70
54f4d1f
094b462
7903f1c
1199711
61455e5
638db1d
6a52fd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering; | ||
|
||
public enum SignalType { | ||
SPAN, | ||
LOG, | ||
METRIC | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.exporters; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.SignalType; | ||
import javax.annotation.Nullable; | ||
|
||
/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */ | ||
public interface ExporterCallback { | ||
/** | ||
* Called when an export to disk operation succeeded. | ||
* | ||
* @param type The type of signal associated to the exporter. | ||
*/ | ||
void onExportSuccess(SignalType type); | ||
|
||
/** | ||
* Called when an export to disk operation failed. | ||
* | ||
* @param type The type of signal associated to the exporter. | ||
* @param error Optional - provides more information of why the operation failed. | ||
*/ | ||
void onExportError(SignalType type, @Nullable Throwable error); | ||
|
||
/** | ||
* Called when the exporter is closed. | ||
* | ||
* @param type The type of signal associated to the exporter. | ||
*/ | ||
void onShutdown(SignalType type); | ||
breedx-splk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
static ExporterCallback noop() { | ||
return NoopExporterCallback.INSTANCE; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.exporters; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.SignalType; | ||
import javax.annotation.Nullable; | ||
|
||
final class NoopExporterCallback implements ExporterCallback { | ||
static final NoopExporterCallback INSTANCE = new NoopExporterCallback(); | ||
|
||
private NoopExporterCallback() {} | ||
|
||
@Override | ||
public void onExportSuccess(SignalType type) {} | ||
|
||
@Override | ||
public void onExportError(SignalType type, @Nullable Throwable error) {} | ||
|
||
@Override | ||
public void onShutdown(SignalType type) {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.exporters; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.SignalType; | ||
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; | ||
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import java.time.Duration; | ||
import java.util.Collection; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
/** Internal utility for common export to disk operations across all exporters. */ | ||
final class SignalStorageExporter<T> { | ||
private final SignalStorage<T> storage; | ||
private final ExporterCallback callback; | ||
private final Duration writeTimeout; | ||
private final SignalType type; | ||
|
||
public SignalStorageExporter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to have unit test coverage for this new class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've just added unit tests for it. |
||
SignalStorage<T> storage, ExporterCallback callback, Duration writeTimeout, SignalType type) { | ||
this.storage = storage; | ||
this.callback = callback; | ||
this.writeTimeout = writeTimeout; | ||
this.type = type; | ||
} | ||
|
||
public CompletableResultCode exportToStorage(Collection<T> items) { | ||
CompletableFuture<WriteResult> future = storage.write(items); | ||
try { | ||
WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS); | ||
if (operation.isSuccessful()) { | ||
callback.onExportSuccess(type); | ||
return CompletableResultCode.ofSuccess(); | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prefer removing the redundant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I've added the changes. |
||
Throwable error = operation.getError(); | ||
callback.onExportError(type, error); | ||
if (error != null) { | ||
return CompletableResultCode.ofExceptionalFailure(error); | ||
} | ||
return CompletableResultCode.ofFailure(); | ||
} | ||
} catch (ExecutionException | InterruptedException | TimeoutException e) { | ||
callback.onExportError(type, e); | ||
return CompletableResultCode.ofExceptionalFailure(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.exporters; | ||
|
||
import com.google.errorprone.annotations.CanIgnoreReturnValue; | ||
import io.opentelemetry.contrib.disk.buffering.SignalType; | ||
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import io.opentelemetry.sdk.trace.export.SpanExporter; | ||
import java.time.Duration; | ||
import java.util.Collection; | ||
|
||
/** Exporter that stores spans into disk. */ | ||
public final class SpanToDiskExporter implements SpanExporter { | ||
private final SignalStorageExporter<SpanData> storageExporter; | ||
private final ExporterCallback callback; | ||
private static final SignalType TYPE = SignalType.SPAN; | ||
|
||
private SpanToDiskExporter( | ||
SignalStorageExporter<SpanData> storageExporter, ExporterCallback callback) { | ||
this.storageExporter = storageExporter; | ||
this.callback = callback; | ||
} | ||
|
||
public Builder builder(SignalStorage.Span storage) { | ||
return new Builder(storage); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(Collection<SpanData> spans) { | ||
return storageExporter.exportToStorage(spans); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode flush() { | ||
return CompletableResultCode.ofSuccess(); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
callback.onShutdown(TYPE); | ||
return CompletableResultCode.ofSuccess(); | ||
} | ||
|
||
public static final class Builder { | ||
private final SignalStorage.Span storage; | ||
private ExporterCallback callback = ExporterCallback.noop(); | ||
private Duration writeTimeout = Duration.ofSeconds(10); | ||
|
||
@CanIgnoreReturnValue | ||
public Builder setExporterCallback(ExporterCallback value) { | ||
callback = value; | ||
return this; | ||
} | ||
|
||
@CanIgnoreReturnValue | ||
public Builder setWriteTimeout(Duration value) { | ||
writeTimeout = value; | ||
return this; | ||
} | ||
|
||
public SpanToDiskExporter build() { | ||
SignalStorageExporter<SpanData> storageExporter = | ||
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE); | ||
return new SpanToDiskExporter(storageExporter, callback); | ||
} | ||
|
||
private Builder(SignalStorage.Span storage) { | ||
this.storage = storage; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
@ParametersAreNonnullByDefault | ||
package io.opentelemetry.contrib.disk.buffering.exporters; | ||
|
||
import javax.annotation.ParametersAreNonnullByDefault; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.internal.storage; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; | ||
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import java.util.Collection; | ||
import java.util.Iterator; | ||
import java.util.concurrent.CompletableFuture; | ||
import javax.annotation.Nonnull; | ||
|
||
/** Default storage implementation where items are stored in multiple protobuf files. */ | ||
public final class FileSpanStorage implements SignalStorage.Span { | ||
breedx-splk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Override | ||
public CompletableFuture<WriteResult> write(Collection<SpanData> items) { | ||
throw new UnsupportedOperationException("For next PR"); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<WriteResult> clear() { | ||
throw new UnsupportedOperationException("For next PR"); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
throw new UnsupportedOperationException("For next PR"); | ||
} | ||
|
||
@Nonnull | ||
@Override | ||
public Iterator<Collection<SpanData>> iterator() { | ||
throw new UnsupportedOperationException("For next PR"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.storage; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; | ||
import io.opentelemetry.sdk.logs.data.LogRecordData; | ||
import io.opentelemetry.sdk.metrics.data.MetricData; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import java.io.Closeable; | ||
import java.util.Collection; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* Allows writing and iterating over written signal items. | ||
* | ||
* @param <T> The type of signal data supported. | ||
*/ | ||
public interface SignalStorage<T> extends Iterable<Collection<T>>, Closeable { | ||
|
||
/** | ||
* Stores signal items. | ||
* | ||
* @param items The items to be stored. | ||
* @return A future with {@link WriteResult}. | ||
*/ | ||
CompletableFuture<WriteResult> write(Collection<T> items); | ||
|
||
/** | ||
* Removes all the previously stored items. | ||
* | ||
* @return A future with {@link WriteResult}. | ||
*/ | ||
CompletableFuture<WriteResult> clear(); | ||
|
||
/** | ||
* Abstraction for Spans. Implementations should use this instead of {@link SignalStorage} | ||
* directly. | ||
*/ | ||
interface Span extends SignalStorage<SpanData> {} | ||
|
||
/** | ||
* Abstraction for Logs. Implementations should use this instead of {@link SignalStorage} | ||
* directly. | ||
*/ | ||
interface LogRecord extends SignalStorage<LogRecordData> {} | ||
|
||
/** | ||
* Abstraction for Metrics. Implementations should use this instead of {@link SignalStorage} | ||
* directly. | ||
*/ | ||
interface Metric extends SignalStorage<MetricData> {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.storage.result; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
final class DefaultWriteResult implements WriteResult { | ||
private final boolean successful; | ||
@Nullable private final Throwable error; | ||
|
||
DefaultWriteResult(boolean successful, @Nullable Throwable error) { | ||
this.successful = successful; | ||
this.error = error; | ||
} | ||
|
||
@Override | ||
public boolean isSuccessful() { | ||
return successful; | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public Throwable getError() { | ||
return error; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.disk.buffering.storage.result; | ||
|
||
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; | ||
import javax.annotation.Nullable; | ||
|
||
/** The result of a {@link SignalStorage} write operation. */ | ||
public interface WriteResult { | ||
/** | ||
* Whether the operation succeeded or not. | ||
* | ||
* @return `true` if the items have been successfully stored, `false` otherwise. | ||
*/ | ||
boolean isSuccessful(); | ||
|
||
/** | ||
* Provides details of why the operation failed. | ||
* | ||
* @return The error (if any) for the failed operation. It must be null for successful operations. | ||
*/ | ||
@Nullable | ||
Throwable getError(); | ||
|
||
static WriteResult create(boolean successful, @Nullable Throwable error) { | ||
return new DefaultWriteResult(successful, error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LikeTheSalad Any reason you added this instead of using
io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes
?