Skip to content

Commit 34debe8

Browse files
committed
Make "to" range optional
1 parent 900c262 commit 34debe8

File tree

6 files changed

+45
-22
lines changed

6 files changed

+45
-22
lines changed

src/main/java/org/radarbase/output/accounting/OffsetIntervals.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class OffsetIntervals {
2121
}
2222

2323
fun contains(range: OffsetRangeSet.Range): Boolean {
24+
checkNotNull(range.to)
2425
// -index-1 if not found
2526
val searchIndex = offsetsFrom.binarySearch(range.from)
2627
val index = if (searchIndex >= 0) searchIndex else -searchIndex - 2
@@ -80,6 +81,7 @@ class OffsetIntervals {
8081

8182
fun add(range: OffsetRangeSet.Range) {
8283
val (from, to, lastModified) = range
84+
checkNotNull(to)
8385
var index = offsetsFrom.binarySearch(from)
8486
if (index < 0) {
8587
// index where this range would be entered

src/main/java/org/radarbase/output/accounting/OffsetRangeSet.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class OffsetRangeSet {
148148
.getOrDefault(this, EMPTY_VALUE)
149149
.read(function)
150150

151-
data class Range(val from: Long, val to: Long, val lastProcessed: Instant) {
152-
val size: Long = to - from + 1
151+
data class Range(val from: Long, val to: Long?, val lastProcessed: Instant) {
152+
val size: Long? = to?.let { it - from + 1 }
153153
override fun toString() = "($from - $to, $lastProcessed)"
154154
}
155155

src/main/java/org/radarbase/output/accounting/TopicPartitionOffsetRange.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,25 @@ data class TopicPartitionOffsetRange(
3030
val partition: Int = topicPartition.partition
3131

3232
/** Full constructor. */
33-
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long, lastModified: Instant = Instant.now()) : this(
33+
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long?, lastModified: Instant = Instant.now()) : this(
3434
TopicPartition(topic, partition),
3535
OffsetRangeSet.Range(offsetFrom, offsetTo, lastModified))
3636

3737
override fun toString(): String = "$topic+$partition+${range.from}+${range.to} (${range.lastProcessed})"
3838

3939
companion object {
40+
private val filenameSplitRegex = "[+.]".toRegex()
41+
4042
@Throws(NumberFormatException::class, IndexOutOfBoundsException::class)
4143
fun parseFilename(filename: String, lastModified: Instant): TopicPartitionOffsetRange {
42-
val fileNameParts = filename.split("[+.]".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
44+
val fileNameParts = filename.split(filenameSplitRegex)
45+
.dropLastWhile { it.isEmpty() || it == "avro" }
4346

4447
return TopicPartitionOffsetRange(
4548
fileNameParts[0],
46-
Integer.parseInt(fileNameParts[1]),
47-
java.lang.Long.parseLong(fileNameParts[2]),
48-
java.lang.Long.parseLong(fileNameParts[3]),
49+
fileNameParts[1].toInt(),
50+
fileNameParts[2].toLong(),
51+
fileNameParts.getOrNull(3)?.toLong(),
4952
lastModified)
5053
}
5154
}

src/main/java/org/radarbase/output/source/TopicFileList.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import java.nio.file.Path
55
import java.time.Instant
66

77
data class TopicFileList(val topic: String, val files: List<TopicFile>) {
8-
val numberOfOffsets: Long = this.files.fold(0L) { sum, f -> sum + f.size }
8+
val numberOfOffsets: Long? = this.files
9+
.takeIf { fs -> fs.none { it.size == null } }
10+
?.fold(0L) { sum, f -> sum + f.size!! }
911
val numberOfFiles: Int = this.files.size
1012
}
1113

1214
data class TopicFile(val topic: String, val path: Path, val lastModified: Instant) {
1315
val range: TopicPartitionOffsetRange = TopicPartitionOffsetRange.parseFilename(path.fileName.toString(), lastModified)
14-
val size: Long = range.range.size
16+
val size: Long? = range.range.size
1517
}
1618

1719
data class SimpleFileStatus(val path: Path, val isDirectory: Boolean, val lastModified: Instant)

src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,12 @@ class RadarKafkaRestructure(
146146
.filter { f -> f.lastModified.isBefore(deleteThreshold) &&
147147
// ensure that there is a file with a larger offset also
148148
// processed, so the largest offset is never removed.
149-
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1))) }
149+
if (f.range.range.to != null) {
150+
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1)))
151+
} else {
152+
seenFiles.contains(f.range.topicPartition, f.range.range.from, f.range.range.lastProcessed)
153+
}
154+
}
150155
.take(maxFilesPerTopic)
151156
.map { kafkaStorage.delete(it.path) }
152157
.count()

src/main/java/org/radarbase/output/worker/RestructureWorker.kt

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ internal class RestructureWorker(
4141
fun processPaths(topicPaths: TopicFileList) {
4242
val numFiles = topicPaths.numberOfFiles
4343
val numOffsets = topicPaths.numberOfOffsets
44+
val totalProgress = numOffsets ?: numFiles.toLong()
4445

4546
val topic = topicPaths.topic
4647

@@ -52,30 +53,35 @@ internal class RestructureWorker(
5253
val seenOffsets = accountant.offsets
5354
.withFactory { ReadOnlyFunctionalValue(it) }
5455

55-
val progressBar = ProgressBar(topic, numOffsets, 50, 5, TimeUnit.SECONDS)
56+
val progressBar = ProgressBar(topic, totalProgress, 50, 5, TimeUnit.SECONDS)
5657
progressBar.update(0)
5758

5859
val batchSize = (BATCH_SIZE * ThreadLocalRandom.current().nextDouble(0.75, 1.25)).roundToLong()
59-
var currentSize: Long = 0
60+
var currentSize: Long = 0L
61+
var currentFile: Long = 0L
6062
try {
6163
for (file in topicPaths.files) {
6264
if (closed.get()) {
6365
break
6466
}
65-
try {
67+
val processedSize = try {
6668
this.processFile(file, progressBar, seenOffsets)
6769
} catch (exc: JsonMappingException) {
6870
logger.error("Cannot map values", exc)
71+
0L
6972
}
7073

71-
processedFileCount++
72-
progressBar.update(currentSize)
73-
74-
currentSize += file.size
74+
currentFile += 1
75+
currentSize += processedSize
7576
if (currentSize >= batchSize) {
7677
currentSize = 0
7778
cacheStore.flush()
7879
}
80+
81+
processedFileCount++
82+
if (numOffsets == null) {
83+
progressBar.update(currentFile)
84+
}
7985
}
8086
} catch (ex: IOException) {
8187
logger.error("Failed to process file", ex)
@@ -85,26 +91,28 @@ internal class RestructureWorker(
8591
logger.warn("Shutting down")
8692
}
8793

88-
progressBar.update(numOffsets)
94+
progressBar.update(totalProgress)
8995
}
9096

9197
@Throws(IOException::class)
9298
private fun processFile(file: TopicFile,
93-
progressBar: ProgressBar, seenOffsets: OffsetRangeSet) {
99+
progressBar: ProgressBar, seenOffsets: OffsetRangeSet): Long {
94100
logger.debug("Reading {}", file.path)
95101

96102
val offset = file.range.range.from
97103

98-
reader.newInput(file).use { input ->
104+
return reader.newInput(file).use { input ->
99105
// processing zero-length files may trigger a stall. See:
100106
// https://github.com/RADAR-base/Restructure-HDFS-topic/issues/3
101107
if (input.length() == 0L) {
102108
logger.warn("File {} has zero length, skipping.", file.path)
103-
return
109+
return 0L
104110
}
105111
val transaction = Accountant.Transaction(file.range.topicPartition, offset, file.lastModified)
112+
var recordsSeen = 0L
106113
extractRecords(input) { relativeOffset, record ->
107114
transaction.offset = offset + relativeOffset
115+
recordsSeen += 1L
108116
val alreadyContains = time("accounting.check") {
109117
seenOffsets.contains(file.range.topicPartition, transaction.offset, transaction.lastModified)
110118
}
@@ -113,8 +121,11 @@ internal class RestructureWorker(
113121
this.writeRecord(transaction, record)
114122
}
115123
processedRecordsCount++
116-
progressBar.update(processedRecordsCount)
124+
if (file.size != null) {
125+
progressBar.update(processedRecordsCount)
126+
}
117127
}
128+
recordsSeen
118129
}
119130
}
120131

0 commit comments

Comments
 (0)