Skip to content

Commit a3deeb2

Browse files
Disk buffering reading refactor (#1917)
Co-authored-by: otelbot <[email protected]>
1 parent 3be75ed commit a3deeb2

21 files changed

+227
-298
lines changed

disk-buffering/README.md

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ The configurable parameters are provided **per exporter**, the available ones ar
2828
* Max age for file reading, defaults to 18 hours. After that time passes, the file will be
2929
considered stale and will be removed when new files are created. No more data will be read from a
3030
file past this time.
31-
* An instance
32-
of [TemporaryFileProvider](src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java),
33-
defaults to calling `File.createTempFile`. This provider will be used when reading from the disk
34-
in order create a temporary file from which each line (batch of signals) will be read and
35-
sequentially get removed from the original cache file right after the data has been successfully
36-
exported.
3731

3832
## Usage
3933

disk-buffering/build.gradle.kts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,6 @@ plugins {
1313
description = "Exporter implementations that store signals on disk"
1414
otelJava.moduleName.set("io.opentelemetry.contrib.exporters.disk")
1515

16-
java {
17-
sourceCompatibility = JavaVersion.VERSION_1_8
18-
targetCompatibility = JavaVersion.VERSION_1_8
19-
}
20-
2116
val protos by configurations.creating
2217

2318
dependencies {
@@ -75,7 +70,8 @@ wire {
7570

7671
tasks.named<ShadowJar>("shadowJar") {
7772
archiveClassifier.set("")
78-
configurations = emptyList() // To avoid embedding any dependencies as we only need to rename some local packages.
73+
configurations =
74+
emptyList() // To avoid embedding any dependencies as we only need to rename some local packages.
7975
relocate("io.opentelemetry.proto", "io.opentelemetry.diskbuffering.proto")
8076
mustRunAfter("jar")
8177
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/StorageConfiguration.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.contrib.disk.buffering.config;
77

88
import com.google.auto.value.AutoValue;
9-
import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider;
109
import java.io.File;
1110
import java.util.concurrent.TimeUnit;
1211

@@ -49,23 +48,18 @@ public abstract class StorageConfiguration {
4948
*/
5049
public abstract int getMaxFolderSize();
5150

52-
/** A creator of temporary files needed to do the disk reading process. */
53-
public abstract TemporaryFileProvider getTemporaryFileProvider();
54-
5551
public static StorageConfiguration getDefault(File rootDir) {
5652
return builder().setRootDir(rootDir).build();
5753
}
5854

5955
public static Builder builder() {
60-
TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance();
6156
return new AutoValue_StorageConfiguration.Builder()
6257
.setMaxFileSize(1024 * 1024) // 1MB
6358
.setMaxFolderSize(10 * 1024 * 1024) // 10MB
6459
.setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30))
6560
.setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33))
6661
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18))
67-
.setDebugEnabled(false)
68-
.setTemporaryFileProvider(fileProvider);
62+
.setDebugEnabled(false);
6963
}
7064

7165
@AutoValue.Builder
@@ -80,8 +74,6 @@ public abstract static class Builder {
8074

8175
public abstract Builder setMaxFolderSize(int value);
8276

83-
public abstract Builder setTemporaryFileProvider(TemporaryFileProvider value);
84-
8577
public abstract Builder setRootDir(File rootDir);
8678

8779
public abstract Builder setDebugEnabled(boolean debugEnabled);

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/files/DefaultTemporaryFileProvider.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/FileOperations.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,9 @@
99
import java.io.File;
1010

1111
public interface FileOperations extends Closeable {
12-
long getSize();
13-
1412
boolean hasExpired();
1513

1614
boolean isClosed();
1715

1816
File getFile();
19-
20-
default String getFileName() {
21-
return getFile().getName();
22-
}
2317
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@
1212
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
1313
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult;
1414
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader;
15-
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
15+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileStream;
1616
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
1717
import io.opentelemetry.sdk.common.Clock;
1818
import java.io.File;
19-
import java.io.FileInputStream;
20-
import java.io.FileOutputStream;
2119
import java.io.IOException;
22-
import java.io.InputStream;
23-
import java.io.OutputStream;
2420
import java.util.concurrent.atomic.AtomicBoolean;
2521
import java.util.function.Function;
2622
import javax.annotation.Nullable;
@@ -37,14 +33,11 @@
3733
*/
3834
public final class ReadableFile implements FileOperations {
3935
@NotNull private final File file;
40-
private final int originalFileSize;
36+
private final FileStream fileStream;
4137
private final StreamReader reader;
42-
private final FileTransferUtil fileTransferUtil;
43-
private final File temporaryFile;
4438
private final Clock clock;
4539
private final long expireTimeMillis;
4640
private final AtomicBoolean isClosed = new AtomicBoolean(false);
47-
private int readBytes = 0;
4841
@Nullable private ReadResult unconsumedResult;
4942

5043
public ReadableFile(
@@ -59,7 +52,7 @@ public ReadableFile(
5952
}
6053

6154
public ReadableFile(
62-
File file,
55+
@NotNull File file,
6356
long createdTimeMillis,
6457
Clock clock,
6558
StorageConfiguration configuration,
@@ -68,12 +61,8 @@ public ReadableFile(
6861
this.file = file;
6962
this.clock = clock;
7063
expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForReadMillis();
71-
originalFileSize = (int) file.length();
72-
temporaryFile = configuration.getTemporaryFileProvider().createTemporaryFile(file.getName());
73-
copyFile(file, temporaryFile);
74-
FileInputStream tempInputStream = new FileInputStream(temporaryFile);
75-
fileTransferUtil = new FileTransferUtil(tempInputStream, file);
76-
reader = readerFactory.create(tempInputStream);
64+
fileStream = FileStream.create(file);
65+
reader = readerFactory.create(fileStream);
7766
}
7867

7968
/**
@@ -101,11 +90,8 @@ public synchronized ReadableResult readAndProcess(Function<byte[], ProcessResult
10190
switch (processing.apply(read.content)) {
10291
case SUCCEEDED:
10392
unconsumedResult = null;
104-
readBytes += read.totalReadLength;
105-
int amountOfBytesToTransfer = originalFileSize - readBytes;
106-
if (amountOfBytesToTransfer > 0) {
107-
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
108-
} else {
93+
fileStream.truncateTop();
94+
if (fileStream.size() == 0) {
10995
cleanUp();
11096
}
11197
return ReadableResult.SUCCEEDED;
@@ -124,17 +110,7 @@ private ReadResult readNextItem() throws IOException {
124110
if (unconsumedResult != null) {
125111
return unconsumedResult;
126112
}
127-
return reader.read();
128-
}
129-
130-
private void cleanUp() throws IOException {
131-
file.delete();
132-
close();
133-
}
134-
135-
@Override
136-
public long getSize() {
137-
return originalFileSize;
113+
return reader.readNext();
138114
}
139115

140116
@Override
@@ -153,29 +129,18 @@ public File getFile() {
153129
return file;
154130
}
155131

132+
private void cleanUp() throws IOException {
133+
close();
134+
if (!file.delete()) {
135+
throw new IOException("Could not delete file: " + file);
136+
}
137+
}
138+
156139
@Override
157140
public synchronized void close() throws IOException {
158141
if (isClosed.compareAndSet(false, true)) {
159142
unconsumedResult = null;
160-
fileTransferUtil.close();
161143
reader.close();
162-
temporaryFile.delete();
163-
}
164-
}
165-
166-
/**
167-
* This is needed instead of using Files.copy in order to keep it compatible with Android api <
168-
* 26.
169-
*/
170-
private static void copyFile(File from, File to) throws IOException {
171-
try (InputStream in = new FileInputStream(from);
172-
OutputStream out = new FileOutputStream(to)) {
173-
174-
byte[] buffer = new byte[1024];
175-
int lengthRead;
176-
while ((lengthRead = in.read(buffer)) > 0) {
177-
out.write(buffer, 0, lengthRead);
178-
}
179144
}
180145
}
181146

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public synchronized WritableResult append(byte[] data) throws IOException {
6363
return WritableResult.SUCCEEDED;
6464
}
6565

66-
@Override
6766
public synchronized long getSize() {
6867
return size;
6968
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,21 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader;
77

8-
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.CountingInputStream;
98
import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools;
109
import java.io.IOException;
1110
import java.io.InputStream;
1211
import javax.annotation.Nullable;
1312

14-
public final class DelimitedProtoStreamReader extends StreamReader {
15-
private final CountingInputStream countingInputStream;
13+
public final class DelimitedProtoStreamReader implements StreamReader {
14+
private final InputStream inputStream;
1615

1716
public DelimitedProtoStreamReader(InputStream inputStream) {
18-
super(new CountingInputStream(inputStream));
19-
countingInputStream = (CountingInputStream) this.inputStream;
17+
this.inputStream = inputStream;
2018
}
2119

2220
@Override
2321
@Nullable
24-
public ReadResult read() throws IOException {
25-
int startingPosition = countingInputStream.getPosition();
22+
public ReadResult readNext() throws IOException {
2623
int itemSize = getNextItemSize();
2724
if (itemSize < 1) {
2825
return null;
@@ -31,7 +28,7 @@ public ReadResult read() throws IOException {
3128
if (inputStream.read(bytes) < 0) {
3229
return null;
3330
}
34-
return new ReadResult(bytes, countingInputStream.getPosition() - startingPosition);
31+
return new ReadResult(bytes);
3532
}
3633

3734
private int getNextItemSize() {
@@ -46,6 +43,11 @@ private int getNextItemSize() {
4643
}
4744
}
4845

46+
@Override
47+
public void close() throws IOException {
48+
inputStream.close();
49+
}
50+
4951
public static class Factory implements StreamReader.Factory {
5052

5153
private static final Factory INSTANCE = new DelimitedProtoStreamReader.Factory();
@@ -57,8 +59,8 @@ public static Factory getInstance() {
5759
private Factory() {}
5860

5961
@Override
60-
public StreamReader create(InputStream stream) {
61-
return new DelimitedProtoStreamReader(stream);
62+
public StreamReader create(InputStream inputStream) {
63+
return new DelimitedProtoStreamReader(inputStream);
6264
}
6365
}
6466
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ReadResult.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,7 @@ public final class ReadResult {
99
/** The consumable data. */
1010
public final byte[] content;
1111

12-
/**
13-
* The total amount of data read from the stream. This number can be greater than the content
14-
* length as it also takes into account any delimiters size.
15-
*/
16-
public final int totalReadLength;
17-
18-
public ReadResult(byte[] content, int totalReadLength) {
12+
public ReadResult(byte[] content) {
1913
this.content = content;
20-
this.totalReadLength = totalReadLength;
2114
}
2215
}

0 commit comments

Comments
 (0)