diff --git a/disk-buffering/README.md b/disk-buffering/README.md index 67dbb1f52..72e17144d 100644 --- a/disk-buffering/README.md +++ b/disk-buffering/README.md @@ -28,12 +28,6 @@ The configurable parameters are provided **per exporter**, the available ones ar * Max age for file reading, defaults to 18 hours. After that time passes, the file will be considered stale and will be removed when new files are created. No more data will be read from a file past this time. -* An instance - of [TemporaryFileProvider](src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java), - defaults to calling `File.createTempFile`. This provider will be used when reading from the disk - in order create a temporary file from which each line (batch of signals) will be read and - sequentially get removed from the original cache file right after the data has been successfully - exported. ## Usage diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index ef86e9043..09f424aee 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -13,11 +13,6 @@ plugins { description = "Exporter implementations that store signals on disk" otelJava.moduleName.set("io.opentelemetry.contrib.exporters.disk") -java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 -} - val protos by configurations.creating dependencies { @@ -75,7 +70,8 @@ wire { tasks.named("shadowJar") { archiveClassifier.set("") - configurations = emptyList() // To avoid embedding any dependencies as we only need to rename some local packages. + configurations = + emptyList() // To avoid embedding any dependencies as we only need to rename some local packages. relocate("io.opentelemetry.proto", "io.opentelemetry.diskbuffering.proto") mustRunAfter("jar") } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/StorageConfiguration.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/StorageConfiguration.java index 2470b263a..4853ee72f 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/StorageConfiguration.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/StorageConfiguration.java @@ -6,7 +6,6 @@ package io.opentelemetry.contrib.disk.buffering.config; import com.google.auto.value.AutoValue; -import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import java.io.File; import java.util.concurrent.TimeUnit; @@ -49,23 +48,18 @@ public abstract class StorageConfiguration { */ public abstract int getMaxFolderSize(); - /** A creator of temporary files needed to do the disk reading process. */ - public abstract TemporaryFileProvider getTemporaryFileProvider(); - public static StorageConfiguration getDefault(File rootDir) { return builder().setRootDir(rootDir).build(); } public static Builder builder() { - TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); return new AutoValue_StorageConfiguration.Builder() .setMaxFileSize(1024 * 1024) // 1MB .setMaxFolderSize(10 * 1024 * 1024) // 10MB .setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30)) .setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33)) .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18)) - .setDebugEnabled(false) - .setTemporaryFileProvider(fileProvider); + .setDebugEnabled(false); } @AutoValue.Builder @@ -80,8 +74,6 @@ public abstract static class Builder { public abstract Builder setMaxFolderSize(int value); - public abstract Builder setTemporaryFileProvider(TemporaryFileProvider value); - public abstract Builder setRootDir(File rootDir); public abstract Builder setDebugEnabled(boolean debugEnabled); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java deleted file mode 100644 index 3cf803f9f..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.config; - -import java.io.File; -import java.io.IOException; - -/** Provides a temporary file needed to do the disk reading process. */ -public interface TemporaryFileProvider { - - /** - * Creates a temporary file. - * - * @param prefix The prefix for the provided file name. - */ - File createTemporaryFile(String prefix) throws IOException; -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/files/DefaultTemporaryFileProvider.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/files/DefaultTemporaryFileProvider.java deleted file mode 100644 index 9a9dfb8e6..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/files/DefaultTemporaryFileProvider.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.internal.files; - -import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; -import java.io.File; -import java.io.IOException; - -public final class DefaultTemporaryFileProvider implements TemporaryFileProvider { - private static final TemporaryFileProvider INSTANCE = new DefaultTemporaryFileProvider(); - - public static TemporaryFileProvider getInstance() { - return INSTANCE; - } - - private DefaultTemporaryFileProvider() {} - - @Override - public File createTemporaryFile(String prefix) throws IOException { - return File.createTempFile(prefix + "_", ".tmp"); - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/FileOperations.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/FileOperations.java index 21544a991..316f51157 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/FileOperations.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/FileOperations.java @@ -9,15 +9,9 @@ import java.io.File; public interface FileOperations extends Closeable { - long getSize(); - boolean hasExpired(); boolean isClosed(); File getFile(); - - default String getFileName() { - return getFile().getName(); - } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index f7383d60f..710e192bb 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -12,15 +12,11 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader; -import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileStream; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.sdk.common.Clock; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import javax.annotation.Nullable; @@ -37,14 +33,11 @@ */ public final class ReadableFile implements FileOperations { @NotNull private final File file; - private final int originalFileSize; + private final FileStream fileStream; private final StreamReader reader; - private final FileTransferUtil fileTransferUtil; - private final File temporaryFile; private final Clock clock; private final long expireTimeMillis; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private int readBytes = 0; @Nullable private ReadResult unconsumedResult; public ReadableFile( @@ -59,7 +52,7 @@ public ReadableFile( } public ReadableFile( - File file, + @NotNull File file, long createdTimeMillis, Clock clock, StorageConfiguration configuration, @@ -68,12 +61,8 @@ public ReadableFile( this.file = file; this.clock = clock; expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForReadMillis(); - originalFileSize = (int) file.length(); - temporaryFile = configuration.getTemporaryFileProvider().createTemporaryFile(file.getName()); - copyFile(file, temporaryFile); - FileInputStream tempInputStream = new FileInputStream(temporaryFile); - fileTransferUtil = new FileTransferUtil(tempInputStream, file); - reader = readerFactory.create(tempInputStream); + fileStream = FileStream.create(file); + reader = readerFactory.create(fileStream); } /** @@ -101,11 +90,8 @@ public synchronized ReadableResult readAndProcess(Function 0) { - fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer); - } else { + fileStream.truncateTop(); + if (fileStream.size() == 0) { cleanUp(); } return ReadableResult.SUCCEEDED; @@ -124,17 +110,7 @@ private ReadResult readNextItem() throws IOException { if (unconsumedResult != null) { return unconsumedResult; } - return reader.read(); - } - - private void cleanUp() throws IOException { - file.delete(); - close(); - } - - @Override - public long getSize() { - return originalFileSize; + return reader.readNext(); } @Override @@ -153,29 +129,18 @@ public File getFile() { return file; } + private void cleanUp() throws IOException { + close(); + if (!file.delete()) { + throw new IOException("Could not delete file: " + file); + } + } + @Override public synchronized void close() throws IOException { if (isClosed.compareAndSet(false, true)) { unconsumedResult = null; - fileTransferUtil.close(); reader.close(); - temporaryFile.delete(); - } - } - - /** - * This is needed instead of using Files.copy in order to keep it compatible with Android api < - * 26. - */ - private static void copyFile(File from, File to) throws IOException { - try (InputStream in = new FileInputStream(from); - OutputStream out = new FileOutputStream(to)) { - - byte[] buffer = new byte[1024]; - int lengthRead; - while ((lengthRead = in.read(buffer)) > 0) { - out.write(buffer, 0, lengthRead); - } } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 6bf082ca5..0f3d1d475 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -63,7 +63,6 @@ public synchronized WritableResult append(byte[] data) throws IOException { return WritableResult.SUCCEEDED; } - @Override public synchronized long getSize() { return size; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java index 0f9723c4c..60a8e4f45 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java @@ -5,24 +5,21 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader; -import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.CountingInputStream; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; import java.io.IOException; import java.io.InputStream; import javax.annotation.Nullable; -public final class DelimitedProtoStreamReader extends StreamReader { - private final CountingInputStream countingInputStream; +public final class DelimitedProtoStreamReader implements StreamReader { + private final InputStream inputStream; public DelimitedProtoStreamReader(InputStream inputStream) { - super(new CountingInputStream(inputStream)); - countingInputStream = (CountingInputStream) this.inputStream; + this.inputStream = inputStream; } @Override @Nullable - public ReadResult read() throws IOException { - int startingPosition = countingInputStream.getPosition(); + public ReadResult readNext() throws IOException { int itemSize = getNextItemSize(); if (itemSize < 1) { return null; @@ -31,7 +28,7 @@ public ReadResult read() throws IOException { if (inputStream.read(bytes) < 0) { return null; } - return new ReadResult(bytes, countingInputStream.getPosition() - startingPosition); + return new ReadResult(bytes); } private int getNextItemSize() { @@ -46,6 +43,11 @@ private int getNextItemSize() { } } + @Override + public void close() throws IOException { + inputStream.close(); + } + public static class Factory implements StreamReader.Factory { private static final Factory INSTANCE = new DelimitedProtoStreamReader.Factory(); @@ -57,8 +59,8 @@ public static Factory getInstance() { private Factory() {} @Override - public StreamReader create(InputStream stream) { - return new DelimitedProtoStreamReader(stream); + public StreamReader create(InputStream inputStream) { + return new DelimitedProtoStreamReader(inputStream); } } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ReadResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ReadResult.java index 079c2396c..a9f5d1116 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ReadResult.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ReadResult.java @@ -9,14 +9,7 @@ public final class ReadResult { /** The consumable data. */ public final byte[] content; - /** - * The total amount of data read from the stream. This number can be greater than the content - * length as it also takes into account any delimiters size. - */ - public final int totalReadLength; - - public ReadResult(byte[] content, int totalReadLength) { + public ReadResult(byte[] content) { this.content = content; - this.totalReadLength = totalReadLength; } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/StreamReader.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/StreamReader.java index d263aad71..447315f1e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/StreamReader.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/StreamReader.java @@ -10,22 +10,11 @@ import java.io.InputStream; import javax.annotation.Nullable; -public abstract class StreamReader implements Closeable { - protected final InputStream inputStream; - - protected StreamReader(InputStream inputStream) { - this.inputStream = inputStream; - } - +public interface StreamReader extends Closeable { @Nullable - public abstract ReadResult read() throws IOException; - - @Override - public void close() throws IOException { - inputStream.close(); - } + ReadResult readNext() throws IOException; - public interface Factory { + interface Factory { StreamReader create(InputStream stream); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/CountingInputStream.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/CountingInputStream.java deleted file mode 100644 index 9faa2c018..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/CountingInputStream.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -public final class CountingInputStream extends FilterInputStream { - - private int position; - private int mark = -1; - - public CountingInputStream(InputStream in) { - super(in); - } - - public int getPosition() { - return position; - } - - @Override - public synchronized void mark(int readlimit) { - in.mark(readlimit); - mark = position; - } - - @Override - public long skip(long n) throws IOException { - long result = in.skip(n); - position = (int) (position + result); - return result; - } - - @Override - public int read() throws IOException { - int result = in.read(); - if (result != -1) { - position++; - } - return result; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int result = in.read(b, off, len); - if (result != -1) { - position += result; - } - return result; - } - - @Override - public synchronized void reset() throws IOException { - if (!in.markSupported()) { - throw new IOException("Mark is not supported"); - } - if (mark == -1) { - throw new IOException("Mark is not set"); - } - - in.reset(); - position = mark; - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStream.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStream.java new file mode 100644 index 000000000..c49570922 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStream.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import org.jetbrains.annotations.NotNull; + +public class FileStream extends InputStream { + private final RandomAccessFile file; + private final FileChannel channel; + + public static FileStream create(File file) throws IOException { + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rwd"); + FileChannel channel = randomAccessFile.getChannel(); + channel.force(false); + return new FileStream(randomAccessFile, channel); + } + + private FileStream(RandomAccessFile file, FileChannel channel) { + this.file = file; + this.channel = channel; + } + + @Override + public int read() throws IOException { + return file.read(); + } + + @Override + public int read(@NotNull byte[] bytes) throws IOException { + return file.read(bytes); + } + + @Override + public int read(@NotNull byte[] b, int off, int len) throws IOException { + return file.read(b, off, len); + } + + public long size() throws IOException { + return channel.size(); + } + + @Override + public void close() throws IOException { + channel.close(); + file.close(); + } + + public void truncateTop(long size) throws IOException { + file.seek(Math.min(size(), size)); + truncateTop(); + } + + public void truncateTop() throws IOException { + long position = file.getFilePointer(); + if (position == 0) { + return; + } + long remainingSize = size() - position; + if (remainingSize > 0) { + byte[] remainingBytes = new byte[(int) remainingSize]; + file.read(remainingBytes); + file.seek(0); + channel.truncate(remainingSize); + file.write(remainingBytes); + file.seek(0); + } else { + channel.truncate(0); + } + } + + public long getPosition() throws IOException { + return file.getFilePointer(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileTransferUtil.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileTransferUtil.java deleted file mode 100644 index e4729cb53..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileTransferUtil.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils; - -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.FileChannel; - -public final class FileTransferUtil implements Closeable { - private final File output; - - private final FileChannel inputChannel; - - public FileTransferUtil(FileInputStream input, File output) { - this.output = output; - inputChannel = input.getChannel(); - } - - public void transferBytes(int offset, int length) throws IOException { - try (FileOutputStream out = new FileOutputStream(output, false)) { - inputChannel.transferTo(offset, length, out.getChannel()); - } - } - - @Override - public void close() throws IOException { - inputChannel.close(); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index 65c81b842..e7995c675 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -51,7 +51,7 @@ void setUp() throws IOException { wrapped = mock(); exporter = FromDiskExporterImpl.builder( - TestData.getDefaultStorage(rootDir, SignalTypes.spans, clock)) + TestData.getStorage(rootDir, SignalTypes.spans, clock)) .setDeserializer(deserializer) .setExportFunction(wrapped::export) .build(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java index 0d9e16723..d2b73f13d 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java @@ -37,7 +37,7 @@ class FolderManagerTest { @BeforeEach void setUp() { clock = mock(); - folderManager = new FolderManager(rootDir, TestData.getDefaultConfiguration(rootDir), clock); + folderManager = new FolderManager(rootDir, TestData.getConfiguration(rootDir), clock); } @Test @@ -45,7 +45,7 @@ void createWritableFile_withTimeMillisAsName() throws IOException { when(clock.now()).thenReturn(MILLISECONDS.toNanos(1000L)); WritableFile file = folderManager.createWritableFile(); - assertEquals("1000", file.getFileName()); + assertEquals("1000", file.getFile().getName()); } @Test diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java index 8e66dde04..75d86726e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java @@ -6,8 +6,6 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; -import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import java.io.File; @@ -21,22 +19,15 @@ public final class TestData { public static final int MAX_FILE_SIZE = 100; public static final int MAX_FOLDER_SIZE = 300; - public static StorageConfiguration getDefaultConfiguration(File rootDir) { - TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); - return getConfiguration(fileProvider, rootDir); - } - - public static Storage getDefaultStorage(File rootDir, SignalTypes types, Clock clock) + public static Storage getStorage(File rootDir, SignalTypes types, Clock clock) throws IOException { - TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); return Storage.builder(types) - .setStorageConfiguration(getConfiguration(fileProvider, rootDir)) + .setStorageConfiguration(getConfiguration(rootDir)) .setStorageClock(clock) .build(); } - public static StorageConfiguration getConfiguration( - TemporaryFileProvider fileProvider, File rootDir) { + public static StorageConfiguration getConfiguration(File rootDir) { return StorageConfiguration.builder() .setRootDir(rootDir) .setMaxFileAgeForWriteMillis(MAX_FILE_AGE_FOR_WRITE_MILLIS) @@ -44,7 +35,6 @@ public static StorageConfiguration getConfiguration( .setMaxFileAgeForReadMillis(MAX_FILE_AGE_FOR_READ_MILLIS) .setMaxFileSize(MAX_FILE_SIZE) .setMaxFolderSize(MAX_FOLDER_SIZE) - .setTemporaryFileProvider(fileProvider) .build(); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index dd8cb02aa..6565543dd 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -12,13 +12,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; -import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; @@ -42,10 +40,8 @@ class ReadableFileTest { @TempDir File dir; private File source; - private File temporaryFile; private ReadableFile readableFile; private Clock clock; - private TemporaryFileProvider temporaryFileProvider; private static final long CREATED_TIME_MILLIS = 1000L; private static final SignalSerializer SERIALIZER = SignalSerializer.ofLogs(); private static final SignalDeserializer DESERIALIZER = SignalDeserializer.ofLogs(); @@ -94,14 +90,9 @@ class ReadableFileTest { @BeforeEach void setUp() throws IOException { source = new File(dir, "sourceFile"); - temporaryFile = new File(dir, "temporaryFile"); addFileContents(source); - temporaryFileProvider = mock(); - when(temporaryFileProvider.createTemporaryFile(anyString())).thenReturn(temporaryFile); clock = mock(); - readableFile = - new ReadableFile( - source, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); + readableFile = new ReadableFile(source, CREATED_TIME_MILLIS, clock, getConfiguration(dir)); } private static void addFileContents(File source) throws IOException { @@ -144,18 +135,18 @@ void whenProcessingFails_returnTryLaterStatus() throws IOException { ReadableResult.TRY_LATER, readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER)); } - @Test - void deleteTemporaryFileWhenClosing() throws IOException { - readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); - readableFile.close(); - - assertFalse(temporaryFile.exists()); - } - @Test void readMultipleLinesAndRemoveThem() throws IOException { - readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); - readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); + readableFile.readAndProcess( + bytes -> { + assertDeserializedData(FIRST_LOG_RECORD, bytes); + return ProcessResult.SUCCEEDED; + }); + readableFile.readAndProcess( + bytes -> { + assertDeserializedData(SECOND_LOG_RECORD, bytes); + return ProcessResult.SUCCEEDED; + }); List logs = getRemainingDataAndClose(readableFile); @@ -198,8 +189,7 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent } ReadableFile emptyReadableFile = - new ReadableFile( - emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); + new ReadableFile(emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(dir)); assertEquals( ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); @@ -231,6 +221,15 @@ void whenReadingAfterClosed_returnFailedStatus() throws IOException { ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); } + private static void assertDeserializedData(LogRecordData expected, byte[] bytes) { + try { + List deserialized = DESERIALIZER.deserialize(bytes); + assertEquals(expected, deserialized.get(0)); + } catch (DeserializationException e) { + throw new RuntimeException(e); + } + } + private static List getRemainingDataAndClose(ReadableFile readableFile) throws IOException { List result = new ArrayList<>(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java index 8ff749c1e..6830b471a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java @@ -43,7 +43,7 @@ void setUp() throws IOException { new WritableFile( new File(rootDir, String.valueOf(CREATED_TIME_MILLIS)), CREATED_TIME_MILLIS, - TestData.getDefaultConfiguration(rootDir), + TestData.getConfiguration(rootDir), clock); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStreamTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStreamTest.java new file mode 100644 index 000000000..1d801db7b --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/utils/FileStreamTest.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class FileStreamTest { + @TempDir File dir; + + @Test + void truncateTop() throws IOException { + String initialText = "1,2,3,4,5"; + byte[] readBuffer; + File temporaryFile = new File(dir, "temporaryFile"); + writeString(temporaryFile, initialText); + + FileStream stream = FileStream.create(temporaryFile); + + // Assert initial size + assertThat(stream.size()).isEqualTo(9); + + assertThat((char) stream.read()).asString().isEqualTo("1"); + assertThat(readString(temporaryFile)).isEqualTo(initialText); + assertThat(stream.size()).isEqualTo(9); + + // Truncate until current position + stream.truncateTop(); + assertThat(readString(temporaryFile)).isEqualTo(",2,3,4,5"); + assertThat(stream.size()).isEqualTo(8); + + // Truncate fixed size from the top + stream.truncateTop(3); + + // Ensure that the changes are made before closing the stream. + assertThat(readString(temporaryFile)).isEqualTo("3,4,5"); + assertThat(stream.size()).isEqualTo(5); + + // Truncate again + readBuffer = new byte[3]; + stream.read(readBuffer); + assertThat(readBuffer).asString().isEqualTo("3,4"); + assertThat(stream.size()).isEqualTo(5); + + stream.truncateTop(2); + + // Ensure that the changes are made before closing the stream. + assertThat(readString(temporaryFile)).isEqualTo("4,5"); + assertThat(stream.size()).isEqualTo(3); + + // Truncate all available data + stream.truncateTop(3); + assertThat(stream.size()).isEqualTo(0); + assertThat(readString(temporaryFile)).isEqualTo(""); + + stream.close(); + + // Ensure that the changes are kept after closing the stream. + assertThat(readString(temporaryFile)).isEqualTo(""); + } + + private static void writeString(File file, String text) throws IOException { + Files.write( + file.toPath(), text.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE_NEW); + } + + @NotNull + private static String readString(File file) throws IOException { + return new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java index d413c4aa6..adfc8fb2f 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java @@ -26,7 +26,7 @@ protected byte[] serialize(SIGNAL_SDK_ITEM... items) { protected List deserialize(byte[] source) { try (ByteArrayInputStream in = new ByteArrayInputStream(source)) { StreamReader streamReader = DelimitedProtoStreamReader.Factory.getInstance().create(in); - return getDeserializer().deserialize(Objects.requireNonNull(streamReader.read()).content); + return getDeserializer().deserialize(Objects.requireNonNull(streamReader.readNext()).content); } catch (IOException e) { throw new RuntimeException(e); }