Skip to content

Commit 20ba8f2

Browse files
committed
Simplify topicFile creation
1 parent 01d76ff commit 20ba8f2

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ class S3SourceStorage(
3131
}
3232

3333
override fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
34-
val topicFile = super.createTopicFile(topic, status)
35-
return if (topicFile.range.range.to == null) {
34+
var topicFile = super.createTopicFile(topic, status)
35+
36+
if (topicFile.range.range.to == null) {
3637
val tags = s3Client.getObjectTags(GetObjectTagsArgs.Builder().objectBuild(bucket, status.path))
3738
val endOffset = tags.get()["endOffset"]?.toLongOrNull()
3839
if (endOffset != null) {
39-
topicFile.copy(range = topicFile.range.mapRange { it.copy(to = endOffset) })
40-
} else {
41-
topicFile
40+
topicFile = topicFile.copy(range = topicFile.range.mapRange { it.copy(to = endOffset) })
4241
}
43-
} else topicFile
42+
}
43+
44+
return topicFile
4445
}
4546

4647
override fun delete(path: Path) {

src/main/java/org/radarbase/output/source/SourceStorage.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.radarbase.output.source
22

33
import org.apache.avro.file.SeekableInput
4+
import org.radarbase.output.accounting.TopicPartitionOffsetRange
45
import java.io.Closeable
56
import java.nio.file.Path
67
import java.time.Instant
@@ -15,7 +16,9 @@ interface SourceStorage {
1516
/** Delete given file. Will not delete any directories. */
1617
fun delete(path: Path)
1718
fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
18-
return TopicFile(topic, status.path, status.lastModified ?: Instant.now())
19+
val lastModified = status.lastModified ?: Instant.now()
20+
val range = TopicPartitionOffsetRange.parseFilename(status.path.fileName.toString(), lastModified)
21+
return TopicFile(topic, status.path, lastModified, range)
1922
}
2023

2124
/** Find records and topics. */

0 commit comments

Comments
 (0)