Skip to content

Commit 1ca3cb2

Browse files
committed
Creating export to disk implementations
1 parent 9af7917 commit 1ca3cb2

File tree

7 files changed

+188
-8
lines changed

7 files changed

+188
-8
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9+
import io.opentelemetry.contrib.disk.buffering.SignalType;
10+
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
11+
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
12+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
13+
import io.opentelemetry.sdk.common.CompletableResultCode;
14+
import io.opentelemetry.sdk.logs.data.LogRecordData;
15+
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
16+
import java.time.Duration;
17+
import java.util.Collection;
18+
19+
/** Exporter that stores logs into disk. */
20+
public final class LogRecordToDiskExporter implements LogRecordExporter {
21+
private final SignalStorageExporter<LogRecordData> storageExporter;
22+
private final ExporterCallback callback;
23+
private static final SignalType TYPE = SignalType.LOG;
24+
25+
private LogRecordToDiskExporter(
26+
SignalStorageExporter<LogRecordData> storageExporter, ExporterCallback callback) {
27+
this.storageExporter = storageExporter;
28+
this.callback = callback;
29+
}
30+
31+
public static Builder builder(SignalStorage.LogRecord storage) {
32+
return new Builder(storage);
33+
}
34+
35+
@Override
36+
public CompletableResultCode export(Collection<LogRecordData> logs) {
37+
return storageExporter.exportToStorage(logs);
38+
}
39+
40+
@Override
41+
public CompletableResultCode flush() {
42+
return CompletableResultCode.ofSuccess();
43+
}
44+
45+
@Override
46+
public CompletableResultCode shutdown() {
47+
callback.onShutdown(TYPE);
48+
return CompletableResultCode.ofSuccess();
49+
}
50+
51+
public static final class Builder {
52+
private final SignalStorage.LogRecord storage;
53+
private ExporterCallback callback = ExporterCallback.noop();
54+
private Duration writeTimeout = Duration.ofSeconds(10);
55+
56+
@CanIgnoreReturnValue
57+
public Builder setExporterCallback(ExporterCallback value) {
58+
callback = value;
59+
return this;
60+
}
61+
62+
@CanIgnoreReturnValue
63+
public Builder setWriteTimeout(Duration value) {
64+
writeTimeout = value;
65+
return this;
66+
}
67+
68+
public LogRecordToDiskExporter build() {
69+
SignalStorageExporter<LogRecordData> storageExporter =
70+
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
71+
return new LogRecordToDiskExporter(storageExporter, callback);
72+
}
73+
74+
private Builder(SignalStorage.LogRecord storage) {
75+
this.storage = storage;
76+
}
77+
}
78+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.exporters;
7+
8+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9+
import io.opentelemetry.contrib.disk.buffering.SignalType;
10+
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
11+
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
12+
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
13+
import io.opentelemetry.sdk.common.CompletableResultCode;
14+
import io.opentelemetry.sdk.metrics.InstrumentType;
15+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
16+
import io.opentelemetry.sdk.metrics.data.MetricData;
17+
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
18+
import io.opentelemetry.sdk.metrics.export.MetricExporter;
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
22+
/** Exporter that stores metrics into disk. */
23+
public final class MetricToDiskExporter implements MetricExporter {
24+
private final SignalStorageExporter<MetricData> storageExporter;
25+
private final AggregationTemporalitySelector aggregationTemporalitySelector;
26+
private final ExporterCallback callback;
27+
private static final SignalType TYPE = SignalType.METRIC;
28+
29+
private MetricToDiskExporter(
30+
SignalStorageExporter<MetricData> storageExporter,
31+
AggregationTemporalitySelector aggregationTemporalitySelector,
32+
ExporterCallback callback) {
33+
this.storageExporter = storageExporter;
34+
this.aggregationTemporalitySelector = aggregationTemporalitySelector;
35+
this.callback = callback;
36+
}
37+
38+
public static Builder builder(SignalStorage.Metric storage) {
39+
return new Builder(storage);
40+
}
41+
42+
@Override
43+
public CompletableResultCode export(Collection<MetricData> metrics) {
44+
return storageExporter.exportToStorage(metrics);
45+
}
46+
47+
@Override
48+
public CompletableResultCode flush() {
49+
return CompletableResultCode.ofSuccess();
50+
}
51+
52+
@Override
53+
public CompletableResultCode shutdown() {
54+
callback.onShutdown(TYPE);
55+
return CompletableResultCode.ofSuccess();
56+
}
57+
58+
@Override
59+
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
60+
return aggregationTemporalitySelector.getAggregationTemporality(instrumentType);
61+
}
62+
63+
public static final class Builder {
64+
private final SignalStorage.Metric storage;
65+
private AggregationTemporalitySelector aggregationTemporalitySelector =
66+
AggregationTemporalitySelector.alwaysCumulative();
67+
private ExporterCallback callback = ExporterCallback.noop();
68+
private Duration writeTimeout = Duration.ofSeconds(10);
69+
70+
@CanIgnoreReturnValue
71+
public Builder setExporterCallback(ExporterCallback value) {
72+
callback = value;
73+
return this;
74+
}
75+
76+
@CanIgnoreReturnValue
77+
public Builder setWriteTimeout(Duration value) {
78+
writeTimeout = value;
79+
return this;
80+
}
81+
82+
@CanIgnoreReturnValue
83+
public Builder setAggregationTemporalitySelector(AggregationTemporalitySelector value) {
84+
aggregationTemporalitySelector = value;
85+
return this;
86+
}
87+
88+
public MetricToDiskExporter build() {
89+
SignalStorageExporter<MetricData> storageExporter =
90+
new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE);
91+
return new MetricToDiskExporter(storageExporter, aggregationTemporalitySelector, callback);
92+
}
93+
94+
private Builder(SignalStorage.Metric storage) {
95+
this.storage = storage;
96+
}
97+
}
98+
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
99
import io.opentelemetry.contrib.disk.buffering.SignalType;
10+
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
11+
import io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter;
1012
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
1113
import io.opentelemetry.sdk.common.CompletableResultCode;
1214
import io.opentelemetry.sdk.trace.data.SpanData;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.contrib.disk.buffering.exporters;
6+
package io.opentelemetry.contrib.disk.buffering.exporters.callback;
77

88
import io.opentelemetry.contrib.disk.buffering.SignalType;
99
import javax.annotation.Nullable;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.contrib.disk.buffering.exporters;
6+
package io.opentelemetry.contrib.disk.buffering.exporters.callback;
77

88
import io.opentelemetry.contrib.disk.buffering.SignalType;
99
import javax.annotation.Nullable;
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.contrib.disk.buffering.exporters;
6+
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
77

88
import io.opentelemetry.contrib.disk.buffering.SignalType;
9+
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
910
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
1011
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1112
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -17,7 +18,7 @@
1718
import java.util.concurrent.TimeoutException;
1819

1920
/** Internal utility for common export to disk operations across all exporters. */
20-
final class SignalStorageExporter<T> {
21+
public final class SignalStorageExporter<T> {
2122
private final SignalStorage<T> storage;
2223
private final ExporterCallback callback;
2324
private final Duration writeTimeout;

disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporterTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.mockito.Mockito.when;
1515

1616
import io.opentelemetry.contrib.disk.buffering.SignalType;
17+
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
1718
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
1819
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1920
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -40,8 +41,8 @@ class SignalStorageExporterTest {
4041
void verifyExportToStorage_success() {
4142
SignalStorage.Span storage = new TestSpanStorage();
4243
SignalType signalType = SignalType.SPAN;
43-
SignalStorageExporter<SpanData> storageExporter =
44-
new SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType);
44+
io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter<SpanData> storageExporter =
45+
new io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType);
4546
SpanData item1 = mock();
4647
SpanData item2 = mock();
4748
SpanData item3 = mock();
@@ -72,8 +73,8 @@ void verifyExportToStorage_success() {
7273
void verifyExportToStorage_failure() {
7374
SignalStorage.Span storage = mock();
7475
SignalType signalType = SignalType.SPAN;
75-
SignalStorageExporter<SpanData> storageExporter =
76-
new SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType);
76+
io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter<SpanData> storageExporter =
77+
new io.opentelemetry.contrib.disk.buffering.internal.exporters.SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType);
7778
SpanData item1 = mock();
7879

7980
// Without exception

0 commit comments

Comments
 (0)