Skip to content

Commit d6c564e

Browse files
authored
Disk buffering api changes (#2084)
1 parent 2765b97 commit d6c564e

File tree

11 files changed

+502
-0
lines changed

11 files changed

+502
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering;
7+
8+
public enum SignalType {
9+
SPAN,
10+
LOG,
11+
METRIC
12+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import io.opentelemetry.contrib.disk.buffering.SignalType;
9+
import javax.annotation.Nullable;
10+
11+
/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */
12+
public interface ExporterCallback {
13+
/**
14+
* Called when an export to disk operation succeeded.
15+
*
16+
* @param type The type of signal associated to the exporter.
17+
*/
18+
void onExportSuccess(SignalType type);
19+
20+
/**
21+
* Called when an export to disk operation failed.
22+
*
23+
* @param type The type of signal associated to the exporter.
24+
* @param error Optional - provides more information of why the operation failed.
25+
*/
26+
void onExportError(SignalType type, @Nullable Throwable error);
27+
28+
/**
29+
* Called when the exporter is closed.
30+
*
31+
* @param type The type of signal associated to the exporter.
32+
*/
33+
void onShutdown(SignalType type);
34+
35+
static ExporterCallback noop() {
36+
return NoopExporterCallback.INSTANCE;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import io.opentelemetry.contrib.disk.buffering.SignalType;
9+
import javax.annotation.Nullable;
10+
11+
final class NoopExporterCallback implements ExporterCallback {
12+
static final NoopExporterCallback INSTANCE = new NoopExporterCallback();
13+
14+
private NoopExporterCallback() {}
15+
16+
@Override
17+
public void onExportSuccess(SignalType type) {}
18+
19+
@Override
20+
public void onExportError(SignalType type, @Nullable Throwable error) {}
21+
22+
@Override
23+
public void onShutdown(SignalType type) {}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import io.opentelemetry.contrib.disk.buffering.SignalType;
9+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
10+
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
11+
import io.opentelemetry.sdk.common.CompletableResultCode;
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.TimeoutException;
18+
19+
/** Internal utility for common export to disk operations across all exporters. */
20+
final class SignalStorageExporter<T> {
21+
private final SignalStorage<T> storage;
22+
private final ExporterCallback callback;
23+
private final Duration writeTimeout;
24+
private final SignalType type;
25+
26+
public SignalStorageExporter(
27+
SignalStorage<T> storage, ExporterCallback callback, Duration writeTimeout, SignalType type) {
28+
this.storage = storage;
29+
this.callback = callback;
30+
this.writeTimeout = writeTimeout;
31+
this.type = type;
32+
}
33+
34+
public CompletableResultCode exportToStorage(Collection<T> items) {
35+
CompletableFuture<WriteResult> future = storage.write(items);
36+
try {
37+
WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS);
38+
if (operation.isSuccessful()) {
39+
callback.onExportSuccess(type);
40+
return CompletableResultCode.ofSuccess();
41+
}
42+
43+
Throwable error = operation.getError();
44+
callback.onExportError(type, error);
45+
if (error != null) {
46+
return CompletableResultCode.ofExceptionalFailure(error);
47+
}
48+
return CompletableResultCode.ofFailure();
49+
} catch (ExecutionException | InterruptedException | TimeoutException e) {
50+
callback.onExportError(type, e);
51+
return CompletableResultCode.ofExceptionalFailure(e);
52+
}
53+
}
54+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9+
import io.opentelemetry.contrib.disk.buffering.SignalType;
10+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
11+
import io.opentelemetry.sdk.common.CompletableResultCode;
12+
import io.opentelemetry.sdk.trace.data.SpanData;
13+
import io.opentelemetry.sdk.trace.export.SpanExporter;
14+
import java.time.Duration;
15+
import java.util.Collection;
16+
17+
/** Exporter that stores spans into disk. */
18+
public final class SpanToDiskExporter implements SpanExporter {
19+
private final SignalStorageExporter<SpanData> storageExporter;
20+
private final ExporterCallback callback;
21+
private static final SignalType TYPE = SignalType.SPAN;
22+
23+
private SpanToDiskExporter(
24+
SignalStorageExporter<SpanData> storageExporter, ExporterCallback callback) {
25+
this.storageExporter = storageExporter;
26+
this.callback = callback;
27+
}
28+
29+
public Builder builder(SignalStorage.Span storage) {
30+
return new Builder(storage);
31+
}
32+
33+
@Override
34+
public CompletableResultCode export(Collection<SpanData> spans) {
35+
return storageExporter.exportToStorage(spans);
36+
}
37+
38+
@Override
39+
public CompletableResultCode flush() {
40+
return CompletableResultCode.ofSuccess();
41+
}
42+
43+
@Override
44+
public CompletableResultCode shutdown() {
45+
callback.onShutdown(TYPE);
46+
return CompletableResultCode.ofSuccess();
47+
}
48+
49+
public static final class Builder {
50+
private final SignalStorage.Span storage;
51+
private ExporterCallback callback = ExporterCallback.noop();
52+
private Duration writeTimeout = Duration.ofSeconds(10);
53+
54+
@CanIgnoreReturnValue
55+
public Builder setExporterCallback(ExporterCallback value) {
56+
callback = value;
57+
return this;
58+
}
59+
60+
@CanIgnoreReturnValue
61+
public Builder setWriteTimeout(Duration value) {
62+
writeTimeout = value;
63+
return this;
64+
}
65+
66+
public SpanToDiskExporter build() {
67+
SignalStorageExporter<SpanData> storageExporter =
68+
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
69+
return new SpanToDiskExporter(storageExporter, callback);
70+
}
71+
72+
private Builder(SignalStorage.Span storage) {
73+
this.storage = storage;
74+
}
75+
}
76+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
@ParametersAreNonnullByDefault
7+
package io.opentelemetry.contrib.disk.buffering.exporters;
8+
9+
import javax.annotation.ParametersAreNonnullByDefault;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.storage;
7+
8+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
9+
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
10+
import io.opentelemetry.sdk.trace.data.SpanData;
11+
import java.util.Collection;
12+
import java.util.Iterator;
13+
import java.util.concurrent.CompletableFuture;
14+
import javax.annotation.Nonnull;
15+
16+
/** Default storage implementation where items are stored in multiple protobuf files. */
17+
public final class FileSpanStorage implements SignalStorage.Span {
18+
19+
@Override
20+
public CompletableFuture<WriteResult> write(Collection<SpanData> items) {
21+
throw new UnsupportedOperationException("For next PR");
22+
}
23+
24+
@Override
25+
public CompletableFuture<WriteResult> clear() {
26+
throw new UnsupportedOperationException("For next PR");
27+
}
28+
29+
@Override
30+
public void close() {
31+
throw new UnsupportedOperationException("For next PR");
32+
}
33+
34+
@Nonnull
35+
@Override
36+
public Iterator<Collection<SpanData>> iterator() {
37+
throw new UnsupportedOperationException("For next PR");
38+
}
39+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.storage;
7+
8+
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
9+
import io.opentelemetry.sdk.logs.data.LogRecordData;
10+
import io.opentelemetry.sdk.metrics.data.MetricData;
11+
import io.opentelemetry.sdk.trace.data.SpanData;
12+
import java.io.Closeable;
13+
import java.util.Collection;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
/**
17+
* Allows writing and iterating over written signal items.
18+
*
19+
* @param <T> The type of signal data supported.
20+
*/
21+
public interface SignalStorage<T> extends Iterable<Collection<T>>, Closeable {
22+
23+
/**
24+
* Stores signal items.
25+
*
26+
* @param items The items to be stored.
27+
* @return A future with {@link WriteResult}.
28+
*/
29+
CompletableFuture<WriteResult> write(Collection<T> items);
30+
31+
/**
32+
* Removes all the previously stored items.
33+
*
34+
* @return A future with {@link WriteResult}.
35+
*/
36+
CompletableFuture<WriteResult> clear();
37+
38+
/**
39+
* Abstraction for Spans. Implementations should use this instead of {@link SignalStorage}
40+
* directly.
41+
*/
42+
interface Span extends SignalStorage<SpanData> {}
43+
44+
/**
45+
* Abstraction for Logs. Implementations should use this instead of {@link SignalStorage}
46+
* directly.
47+
*/
48+
interface LogRecord extends SignalStorage<LogRecordData> {}
49+
50+
/**
51+
* Abstraction for Metrics. Implementations should use this instead of {@link SignalStorage}
52+
* directly.
53+
*/
54+
interface Metric extends SignalStorage<MetricData> {}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.storage.result;
7+
8+
import javax.annotation.Nullable;
9+
10+
final class DefaultWriteResult implements WriteResult {
11+
private final boolean successful;
12+
@Nullable private final Throwable error;
13+
14+
DefaultWriteResult(boolean successful, @Nullable Throwable error) {
15+
this.successful = successful;
16+
this.error = error;
17+
}
18+
19+
@Override
20+
public boolean isSuccessful() {
21+
return successful;
22+
}
23+
24+
@Nullable
25+
@Override
26+
public Throwable getError() {
27+
return error;
28+
}
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.storage.result;
7+
8+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
9+
import javax.annotation.Nullable;
10+
11+
/** The result of a {@link SignalStorage} write operation. */
12+
public interface WriteResult {
13+
/**
14+
* Whether the operation succeeded or not.
15+
*
16+
* @return `true` if the items have been successfully stored, `false` otherwise.
17+
*/
18+
boolean isSuccessful();
19+
20+
/**
21+
* Provides details of why the operation failed.
22+
*
23+
* @return The error (if any) for the failed operation. It must be null for successful operations.
24+
*/
25+
@Nullable
26+
Throwable getError();
27+
28+
static WriteResult create(boolean successful, @Nullable Throwable error) {
29+
return new DefaultWriteResult(successful, error);
30+
}
31+
}

0 commit comments

Comments
 (0)