Skip to content

Commit 3aebaa9

Browse files
committed
Using enum to denote more than two outcomes from reading a signal file
1 parent 1d7f482 commit 3aebaa9

File tree

6 files changed

+62
-34
lines changed

6 files changed

+62
-34
lines changed

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
99
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
10+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
1011
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
1112
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
1213
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -64,11 +65,15 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException
6465
+ " "
6566
+ deserializer.signalType()
6667
+ " bytes from storage.");
67-
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
68-
logger.log(
69-
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
70-
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
71-
return join.isSuccess();
68+
try {
69+
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
70+
logger.log(
71+
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
72+
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
73+
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
74+
} catch (IllegalArgumentException e) {
75+
return ProcessResult.CONTENT_INVALID;
76+
}
7277
});
7378
return result == ReadableResult.SUCCEEDED;
7479
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
1212
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
13+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
1314
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
1415
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
1516
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
@@ -77,11 +78,11 @@ private boolean write(byte[] item, int attemptNumber) throws IOException {
7778
* @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}.
7879
* @throws IOException If an unexpected error happens.
7980
*/
80-
public ReadableResult readAndProcess(Function<byte[], Boolean> processing) throws IOException {
81+
public ReadableResult readAndProcess(Function<byte[], ProcessResult> processing) throws IOException {
8182
return readAndProcess(processing, 1);
8283
}
8384

84-
private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int attemptNumber)
85+
private ReadableResult readAndProcess(Function<byte[], ProcessResult> processing, int attemptNumber)
8586
throws IOException {
8687
if (isClosed.get()) {
8788
logger.log("Refusing to read from storage after being closed.");

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import io.opentelemetry.contrib.disk.buffering.StorageConfiguration;
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader;
12+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
1213
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult;
1314
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader;
1415
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
@@ -83,7 +84,7 @@ public ReadableFile(
8384
* If the processing function returns TRUE, then the provided line will be deleted from the
8485
* source file. If the function returns FALSE, no changes will be applied to the source file.
8586
*/
86-
public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> processing)
87+
public synchronized ReadableResult readAndProcess(Function<byte[], ProcessResult> processing)
8788
throws IOException {
8889
if (isClosed.get()) {
8990
return ReadableResult.FAILED;
@@ -97,20 +98,25 @@ public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> proc
9798
cleanUp();
9899
return ReadableResult.FAILED;
99100
}
100-
if (processing.apply(read.content)) {
101-
unconsumedResult = null;
102-
readBytes += read.totalReadLength;
103-
int amountOfBytesToTransfer = originalFileSize - readBytes;
104-
if (amountOfBytesToTransfer > 0) {
105-
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
106-
} else {
101+
switch (processing.apply(read.content)) {
102+
case SUCCEEDED:
103+
unconsumedResult = null;
104+
readBytes += read.totalReadLength;
105+
int amountOfBytesToTransfer = originalFileSize - readBytes;
106+
if (amountOfBytesToTransfer > 0) {
107+
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
108+
} else {
109+
cleanUp();
110+
}
111+
return ReadableResult.SUCCEEDED;
112+
case TRY_LATER:
113+
unconsumedResult = read;
114+
return ReadableResult.PROCESSING_FAILED;
115+
case CONTENT_INVALID:
107116
cleanUp();
108-
}
109-
return ReadableResult.SUCCEEDED;
110-
} else {
111-
unconsumedResult = read;
112-
return ReadableResult.PROCESSING_FAILED;
117+
return ReadableResult.PROCESSING_FAILED;
113118
}
119+
return ReadableResult.FAILED;
114120
}
115121

116122
@Nullable
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader;
2+
3+
/** Result of processing the contents of a file. */
4+
public enum ProcessResult {
5+
SUCCEEDED,
6+
TRY_LATER,
7+
CONTENT_INVALID
8+
}

disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
1919
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
20+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
2021
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
2122
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
2223
import java.io.IOException;
@@ -28,7 +29,7 @@
2829
class StorageTest {
2930
private FolderManager folderManager;
3031
private Storage storage;
31-
private Function<byte[], Boolean> processing;
32+
private Function<byte[], ProcessResult> processing;
3233
private ReadableFile readableFile;
3334
private WritableFile writableFile;
3435

disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
2323
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl;
2424
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
25+
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
2526
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
2627
import io.opentelemetry.contrib.disk.buffering.testutils.TestData;
2728
import io.opentelemetry.sdk.common.Clock;
@@ -120,7 +121,7 @@ void readSingleItemAndRemoveIt() throws IOException {
120121
readableFile.readAndProcess(
121122
bytes -> {
122123
assertEquals(FIRST_LOG_RECORD, deserialize(bytes));
123-
return true;
124+
return ProcessResult.SUCCEEDED;
124125
});
125126

126127
List<LogRecordData> logs = getRemainingDataAndClose(readableFile);
@@ -132,26 +133,29 @@ void readSingleItemAndRemoveIt() throws IOException {
132133

133134
@Test
134135
void whenProcessingSucceeds_returnSuccessStatus() throws IOException {
135-
assertEquals(ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> true));
136+
assertEquals(
137+
ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED));
136138
}
137139

138140
@Test
139141
void whenProcessingFails_returnProcessFailedStatus() throws IOException {
140-
assertEquals(ReadableResult.PROCESSING_FAILED, readableFile.readAndProcess(bytes -> false));
142+
assertEquals(
143+
ReadableResult.PROCESSING_FAILED,
144+
readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER));
141145
}
142146

143147
@Test
144148
void deleteTemporaryFileWhenClosing() throws IOException {
145-
readableFile.readAndProcess(bytes -> true);
149+
readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED);
146150
readableFile.close();
147151

148152
assertFalse(temporaryFile.exists());
149153
}
150154

151155
@Test
152156
void readMultipleLinesAndRemoveThem() throws IOException {
153-
readableFile.readAndProcess(bytes -> true);
154-
readableFile.readAndProcess(bytes -> true);
157+
readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED);
158+
readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED);
155159

156160
List<LogRecordData> logs = getRemainingDataAndClose(readableFile);
157161

@@ -161,7 +165,7 @@ void readMultipleLinesAndRemoveThem() throws IOException {
161165

162166
@Test
163167
void whenConsumerReturnsFalse_doNotRemoveLineFromSource() throws IOException {
164-
readableFile.readAndProcess(bytes -> false);
168+
readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER);
165169

166170
List<LogRecordData> logs = getRemainingDataAndClose(readableFile);
167171

@@ -188,7 +192,8 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent
188192
new ReadableFile(
189193
emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir));
190194

191-
assertEquals(ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> true));
195+
assertEquals(
196+
ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED));
192197

193198
assertTrue(emptyReadableFile.isClosed());
194199
assertFalse(emptyFile.exists());
@@ -198,21 +203,23 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent
198203
void
199204
whenReadingAfterTheConfiguredReadingTimeExpired_deleteOriginalFile_close_and_returnFileExpiredException()
200205
throws IOException {
201-
readableFile.readAndProcess(bytes -> true);
206+
readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED);
202207
when(clock.now())
203208
.thenReturn(MILLISECONDS.toNanos(CREATED_TIME_MILLIS + MAX_FILE_AGE_FOR_READ_MILLIS));
204209

205-
assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true));
210+
assertEquals(
211+
ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED));
206212

207213
assertTrue(readableFile.isClosed());
208214
}
209215

210216
@Test
211217
void whenReadingAfterClosed_returnFailedStatus() throws IOException {
212-
readableFile.readAndProcess(bytes -> true);
218+
readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED);
213219
readableFile.close();
214220

215-
assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true));
221+
assertEquals(
222+
ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED));
216223
}
217224

218225
private static List<LogRecordData> getRemainingDataAndClose(ReadableFile readableFile)
@@ -224,7 +231,7 @@ private static List<LogRecordData> getRemainingDataAndClose(ReadableFile readabl
224231
readableFile.readAndProcess(
225232
bytes -> {
226233
result.add(deserialize(bytes));
227-
return true;
234+
return ProcessResult.SUCCEEDED;
228235
});
229236
}
230237

0 commit comments

Comments
 (0)