Skip to content

Commit 1d64de8

Browse files
committed
Making callbacks aware of the signal type
1 parent 8153313 commit 1d64de8

File tree

9 files changed

+78
-103
lines changed

9 files changed

+78
-103
lines changed

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SignalType.java

Lines changed: 0 additions & 12 deletions
This file was deleted.

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.contrib.disk.buffering.exporters;
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9-
import io.opentelemetry.contrib.disk.buffering.SignalType;
109
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
1110
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
1211
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
@@ -19,11 +18,11 @@
1918
/** Exporter that stores logs into disk. */
2019
public final class LogRecordToDiskExporter implements LogRecordExporter {
2120
private final SignalStorageExporter<LogRecordData> storageExporter;
22-
private final ExporterCallback callback;
23-
private static final SignalType TYPE = SignalType.LOG;
21+
private final ExporterCallback<LogRecordData> callback;
2422

2523
private LogRecordToDiskExporter(
26-
SignalStorageExporter<LogRecordData> storageExporter, ExporterCallback callback) {
24+
SignalStorageExporter<LogRecordData> storageExporter,
25+
ExporterCallback<LogRecordData> callback) {
2726
this.storageExporter = storageExporter;
2827
this.callback = callback;
2928
}
@@ -44,17 +43,17 @@ public CompletableResultCode flush() {
4443

4544
@Override
4645
public CompletableResultCode shutdown() {
47-
callback.onShutdown(TYPE);
46+
callback.onShutdown();
4847
return CompletableResultCode.ofSuccess();
4948
}
5049

5150
public static final class Builder {
5251
private final SignalStorage.LogRecord storage;
53-
private ExporterCallback callback = ExporterCallback.noop();
52+
private ExporterCallback<LogRecordData> callback = ExporterCallback.noop();
5453
private Duration writeTimeout = Duration.ofSeconds(10);
5554

5655
@CanIgnoreReturnValue
57-
public Builder setExporterCallback(ExporterCallback value) {
56+
public Builder setExporterCallback(ExporterCallback<LogRecordData> value) {
5857
callback = value;
5958
return this;
6059
}
@@ -67,7 +66,7 @@ public Builder setWriteTimeout(Duration value) {
6766

6867
public LogRecordToDiskExporter build() {
6968
SignalStorageExporter<LogRecordData> storageExporter =
70-
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
69+
new SignalStorageExporter<>(storage, callback, writeTimeout);
7170
return new LogRecordToDiskExporter(storageExporter, callback);
7271
}
7372

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.contrib.disk.buffering.exporters;
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9-
import io.opentelemetry.contrib.disk.buffering.SignalType;
109
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
1110
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
1211
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
@@ -23,13 +22,12 @@
2322
public final class MetricToDiskExporter implements MetricExporter {
2423
private final SignalStorageExporter<MetricData> storageExporter;
2524
private final AggregationTemporalitySelector aggregationTemporalitySelector;
26-
private final ExporterCallback callback;
27-
private static final SignalType TYPE = SignalType.METRIC;
25+
private final ExporterCallback<MetricData> callback;
2826

2927
private MetricToDiskExporter(
3028
SignalStorageExporter<MetricData> storageExporter,
3129
AggregationTemporalitySelector aggregationTemporalitySelector,
32-
ExporterCallback callback) {
30+
ExporterCallback<MetricData> callback) {
3331
this.storageExporter = storageExporter;
3432
this.aggregationTemporalitySelector = aggregationTemporalitySelector;
3533
this.callback = callback;
@@ -51,7 +49,7 @@ public CompletableResultCode flush() {
5149

5250
@Override
5351
public CompletableResultCode shutdown() {
54-
callback.onShutdown(TYPE);
52+
callback.onShutdown();
5553
return CompletableResultCode.ofSuccess();
5654
}
5755

@@ -64,11 +62,11 @@ public static final class Builder {
6462
private final SignalStorage.Metric storage;
6563
private AggregationTemporalitySelector aggregationTemporalitySelector =
6664
AggregationTemporalitySelector.alwaysCumulative();
67-
private ExporterCallback callback = ExporterCallback.noop();
65+
private ExporterCallback<MetricData> callback = ExporterCallback.noop();
6866
private Duration writeTimeout = Duration.ofSeconds(10);
6967

7068
@CanIgnoreReturnValue
71-
public Builder setExporterCallback(ExporterCallback value) {
69+
public Builder setExporterCallback(ExporterCallback<MetricData> value) {
7270
callback = value;
7371
return this;
7472
}
@@ -87,7 +85,7 @@ public Builder setAggregationTemporalitySelector(AggregationTemporalitySelector
8785

8886
public MetricToDiskExporter build() {
8987
SignalStorageExporter<MetricData> storageExporter =
90-
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
88+
new SignalStorageExporter<>(storage, callback, writeTimeout);
9189
return new MetricToDiskExporter(storageExporter, aggregationTemporalitySelector, callback);
9290
}
9391

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.contrib.disk.buffering.exporters;
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9-
import io.opentelemetry.contrib.disk.buffering.SignalType;
109
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
1110
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
1211
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
@@ -19,11 +18,10 @@
1918
/** Exporter that stores spans into disk. */
2019
public final class SpanToDiskExporter implements SpanExporter {
2120
private final SignalStorageExporter<SpanData> storageExporter;
22-
private final ExporterCallback callback;
23-
private static final SignalType TYPE = SignalType.SPAN;
21+
private final ExporterCallback<SpanData> callback;
2422

2523
private SpanToDiskExporter(
26-
SignalStorageExporter<SpanData> storageExporter, ExporterCallback callback) {
24+
SignalStorageExporter<SpanData> storageExporter, ExporterCallback<SpanData> callback) {
2725
this.storageExporter = storageExporter;
2826
this.callback = callback;
2927
}
@@ -44,17 +42,17 @@ public CompletableResultCode flush() {
4442

4543
@Override
4644
public CompletableResultCode shutdown() {
47-
callback.onShutdown(TYPE);
45+
callback.onShutdown();
4846
return CompletableResultCode.ofSuccess();
4947
}
5048

5149
public static final class Builder {
5250
private final SignalStorage.Span storage;
53-
private ExporterCallback callback = ExporterCallback.noop();
51+
private ExporterCallback<SpanData> callback = ExporterCallback.noop();
5452
private Duration writeTimeout = Duration.ofSeconds(10);
5553

5654
@CanIgnoreReturnValue
57-
public Builder setExporterCallback(ExporterCallback value) {
55+
public Builder setExporterCallback(ExporterCallback<SpanData> value) {
5856
callback = value;
5957
return this;
6058
}
@@ -67,7 +65,7 @@ public Builder setWriteTimeout(Duration value) {
6765

6866
public SpanToDiskExporter build() {
6967
SignalStorageExporter<SpanData> storageExporter =
70-
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
68+
new SignalStorageExporter<>(storage, callback, writeTimeout);
7169
return new SpanToDiskExporter(storageExporter, callback);
7270
}
7371

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/callback/ExporterCallback.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,30 @@
55

66
package io.opentelemetry.contrib.disk.buffering.exporters.callback;
77

8-
import io.opentelemetry.contrib.disk.buffering.SignalType;
8+
import java.util.Collection;
99
import javax.annotation.Nullable;
1010

1111
/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */
12-
public interface ExporterCallback {
12+
public interface ExporterCallback<T> {
1313
/**
1414
* Called when an export to disk operation succeeded.
1515
*
16-
* @param type The type of signal associated to the exporter.
16+
* @param items The items successfully stored in disk.
1717
*/
18-
void onExportSuccess(SignalType type);
18+
void onExportSuccess(Collection<T> items);
1919

2020
/**
2121
* Called when an export to disk operation failed.
2222
*
23-
* @param type The type of signal associated to the exporter.
23+
* @param items The items that couldn't get stored in disk.
2424
* @param error Optional - provides more information of why the operation failed.
2525
*/
26-
void onExportError(SignalType type, @Nullable Throwable error);
26+
void onExportError(Collection<T> items, @Nullable Throwable error);
2727

28-
/**
29-
* Called when the exporter is closed.
30-
*
31-
* @param type The type of signal associated to the exporter.
32-
*/
33-
void onShutdown(SignalType type);
28+
/** Called when the exporter is closed. */
29+
void onShutdown();
3430

35-
static ExporterCallback noop() {
36-
return NoopExporterCallback.INSTANCE;
31+
static <T> ExporterCallback<T> noop() {
32+
return new NoopExporterCallback<>();
3733
}
3834
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/callback/NoopExporterCallback.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,19 @@
55

66
package io.opentelemetry.contrib.disk.buffering.exporters.callback;
77

8-
import io.opentelemetry.contrib.disk.buffering.SignalType;
8+
import java.util.Collection;
99
import javax.annotation.Nullable;
1010

11-
final class NoopExporterCallback implements ExporterCallback {
12-
static final NoopExporterCallback INSTANCE = new NoopExporterCallback();
11+
final class NoopExporterCallback<T> implements ExporterCallback<T> {
1312

14-
private NoopExporterCallback() {}
13+
NoopExporterCallback() {}
1514

1615
@Override
17-
public void onExportSuccess(SignalType type) {}
16+
public void onExportSuccess(Collection<T> items) {}
1817

1918
@Override
20-
public void onExportError(SignalType type, @Nullable Throwable error) {}
19+
public void onExportError(Collection<T> items, @Nullable Throwable error) {}
2120

2221
@Override
23-
public void onShutdown(SignalType type) {}
22+
public void onShutdown() {}
2423
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
77

8-
import io.opentelemetry.contrib.disk.buffering.SignalType;
98
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
109
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
1110
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
@@ -20,35 +19,33 @@
2019
/** Internal utility for common export to disk operations across all exporters. */
2120
public final class SignalStorageExporter<T> {
2221
private final SignalStorage<T> storage;
23-
private final ExporterCallback callback;
22+
private final ExporterCallback<T> callback;
2423
private final Duration writeTimeout;
25-
private final SignalType type;
2624

2725
public SignalStorageExporter(
28-
SignalStorage<T> storage, ExporterCallback callback, Duration writeTimeout, SignalType type) {
26+
SignalStorage<T> storage, ExporterCallback<T> callback, Duration writeTimeout) {
2927
this.storage = storage;
3028
this.callback = callback;
3129
this.writeTimeout = writeTimeout;
32-
this.type = type;
3330
}
3431

3532
public CompletableResultCode exportToStorage(Collection<T> items) {
3633
CompletableFuture<WriteResult> future = storage.write(items);
3734
try {
3835
WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS);
3936
if (operation.isSuccessful()) {
40-
callback.onExportSuccess(type);
37+
callback.onExportSuccess(items);
4138
return CompletableResultCode.ofSuccess();
4239
}
4340

4441
Throwable error = operation.getError();
45-
callback.onExportError(type, error);
42+
callback.onExportError(items, error);
4643
if (error != null) {
4744
return CompletableResultCode.ofExceptionalFailure(error);
4845
}
4946
return CompletableResultCode.ofFailure();
5047
} catch (ExecutionException | InterruptedException | TimeoutException e) {
51-
callback.onExportError(type, e);
48+
callback.onExportError(items, e);
5249
return CompletableResultCode.ofExceptionalFailure(e);
5350
}
5451
}

0 commit comments

Comments
 (0)