Skip to content

Commit 3f7db8b

Browse files
authored
[disk-buffering] - Single responsibility for disk exporters (#1161)
1 parent fa5d161 commit 3f7db8b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1823
-1077
lines changed

disk-buffering/CONTRIBUTING.md renamed to disk-buffering/DESIGN.md

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
1-
# Contributor Guide
1+
# Design Overview
22

3-
Each one of the three exporters provided by this
4-
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
5-
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
6-
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
7-
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
8-
to be triggered manually by the consumer of this library as explained in the [README](README.md).
3+
There are three main disk-writing exporters provided by this module:
4+
5+
* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java)
6+
* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java)
7+
* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java))
8+
9+
Each is responsible for writing a specific type of telemetry to disk storage for later
10+
harvest/ingest.
11+
12+
For later reading, there are:
13+
14+
* [LogRecordFromToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java)
15+
* [MetricFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java)
16+
* [SpanFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java))
17+
18+
Each one of those has a `create()` method that takes a delegate exporter (to send data
19+
to ingest) and the `StorageConfiguration` that tells them where to find buffered data.
20+
21+
As explained in the [README](README.md), this has to be triggered manually by the consumer of
22+
this library and does not happen automatically.
923

1024
## Writing overview
1125

@@ -14,7 +28,7 @@ to be triggered manually by the consumer of this library as explained in the [RE
1428
* The writing process happens automatically within its `export(Collection<SignalData> signals)`
1529
method, which is called by the configured signal processor.
1630
* When a set of signals is received, these are delegated over to
17-
the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java)
31+
a type-specific wrapper of [ToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java)
1832
class which then serializes them using an implementation
1933
of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java)
2034
and then the serialized data is appended into a File using an instance of

disk-buffering/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
# Disk buffering
22

3-
This module provides signal exporter wrappers that intercept and store signals in files which can be
3+
This module provides exporters that store telemetry data in files which can be
44
sent later on demand. A high level description of how it works is that there are two separate
55
processes in place, one for writing data in disk, and one for reading/exporting the previously
66
stored data.
77

88
* Each exporter stores the received data automatically in disk right after it's received from its
99
processor.
1010
* The reading of the data back from disk and exporting process has to be done manually. At
11-
the moment there's no automatic mechanism to do so. There's more information on it can be
11+
the moment there's no automatic mechanism to do so. There's more information on how it can be
1212
achieved, under [Reading data](#reading-data).
1313

1414
> For a more detailed information on how the whole process works, take a look at
15-
> the [CONTRIBUTING](CONTRIBUTING.md) file.
15+
> the [DESIGN.md](DESIGN.md) file.
1616
1717
## Configuration
1818

@@ -43,11 +43,11 @@ In order to use it, you need to wrap your own exporter with a new instance of
4343
the ones provided in here:
4444

4545
* For a LogRecordExporter, it must be wrapped within
46-
a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java).
46+
a [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java).
4747
* For a MetricExporter, it must be wrapped within
48-
a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java).
48+
a [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java).
4949
* For a SpanExporter, it must be wrapped within
50-
a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java).
50+
a [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java).
5151

5252
Each wrapper will need the following when instantiating them:
5353

disk-buffering/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ wire {
5353
java {}
5454

5555
sourcePath {
56-
srcJar("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
56+
srcJar("io.opentelemetry.proto:opentelemetry-proto:1.1.0-alpha")
5757
}
5858

5959
root(

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java

Lines changed: 0 additions & 95 deletions
This file was deleted.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
9+
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
10+
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
11+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
12+
import io.opentelemetry.sdk.logs.data.LogRecordData;
13+
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
14+
import java.io.IOException;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public class LogRecordFromDiskExporter implements FromDiskExporter {
18+
19+
private final FromDiskExporterImpl<LogRecordData> delegate;
20+
21+
public static LogRecordFromDiskExporter create(
22+
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
23+
FromDiskExporterImpl<LogRecordData> delegate =
24+
FromDiskExporterImpl.<LogRecordData>builder()
25+
.setFolderName("logs")
26+
.setStorageConfiguration(config)
27+
.setDeserializer(SignalSerializer.ofLogs())
28+
.setExportFunction(exporter::export)
29+
.build();
30+
return new LogRecordFromDiskExporter(delegate);
31+
}
32+
33+
private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
34+
this.delegate = delegate;
35+
}
36+
37+
@Override
38+
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
39+
return delegate.exportStoredBatch(timeout, unit);
40+
}
41+
42+
@Override
43+
public void shutdown() throws IOException {
44+
delegate.shutdown();
45+
}
46+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
9+
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
10+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11+
import io.opentelemetry.sdk.common.CompletableResultCode;
12+
import io.opentelemetry.sdk.logs.data.LogRecordData;
13+
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
14+
import java.io.IOException;
15+
import java.util.Collection;
16+
17+
/**
18+
* This class implements a {@link LogRecordExporter} that delegates to an instance of {@code
19+
* ToDiskExporter<LogRecordData>}.
20+
*/
21+
public class LogRecordToDiskExporter implements LogRecordExporter {
22+
private final ToDiskExporter<LogRecordData> delegate;
23+
24+
/**
25+
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
26+
*
27+
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
28+
* @param config - The StorageConfiguration that specifies how storage is managed.
29+
* @return A new LogRecordToDiskExporter instance.
30+
* @throws IOException if the delegate ToDiskExporter could not be created.
31+
*/
32+
public static LogRecordToDiskExporter create(
33+
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
34+
ToDiskExporter<LogRecordData> toDisk =
35+
ToDiskExporter.<LogRecordData>builder()
36+
.setFolderName("logs")
37+
.setStorageConfiguration(config)
38+
.setSerializer(SignalSerializer.ofLogs())
39+
.setExportFunction(delegate::export)
40+
.build();
41+
return new LogRecordToDiskExporter(toDisk);
42+
}
43+
44+
// Visible for testing
45+
LogRecordToDiskExporter(ToDiskExporter<LogRecordData> delegate) {
46+
this.delegate = delegate;
47+
}
48+
49+
@Override
50+
public CompletableResultCode export(Collection<LogRecordData> logs) {
51+
return delegate.export(logs);
52+
}
53+
54+
@Override
55+
public CompletableResultCode flush() {
56+
return CompletableResultCode.ofSuccess();
57+
}
58+
59+
@Override
60+
public CompletableResultCode shutdown() {
61+
try {
62+
delegate.shutdown();
63+
return CompletableResultCode.ofSuccess();
64+
} catch (IOException e) {
65+
return CompletableResultCode.ofFailure();
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)