Skip to content

Commit 7344b1d

Browse files
authored
Merge pull request #68 from RADAR-base/offset-range-without-end
Offset range without end
2 parents 900c262 + 848b057 commit 7344b1d

File tree

6 files changed

+52
-24
lines changed

6 files changed

+52
-24
lines changed

src/main/java/org/radarbase/output/accounting/OffsetIntervals.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class OffsetIntervals {
2121
}
2222

2323
fun contains(range: OffsetRangeSet.Range): Boolean {
24+
checkNotNull(range.to)
2425
// -index-1 if not found
2526
val searchIndex = offsetsFrom.binarySearch(range.from)
2627
val index = if (searchIndex >= 0) searchIndex else -searchIndex - 2
@@ -80,6 +81,7 @@ class OffsetIntervals {
8081

8182
fun add(range: OffsetRangeSet.Range) {
8283
val (from, to, lastModified) = range
84+
checkNotNull(to)
8385
var index = offsetsFrom.binarySearch(from)
8486
if (index < 0) {
8587
// index where this range would be entered

src/main/java/org/radarbase/output/accounting/OffsetRangeSet.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class OffsetRangeSet {
148148
.getOrDefault(this, EMPTY_VALUE)
149149
.read(function)
150150

151-
data class Range(val from: Long, val to: Long, val lastProcessed: Instant) {
152-
val size: Long = to - from + 1
151+
data class Range(val from: Long, val to: Long?, val lastProcessed: Instant) {
152+
val size: Long? = to?.let { it - from + 1 }
153153
override fun toString() = "($from - $to, $lastProcessed)"
154154
}
155155

src/main/java/org/radarbase/output/accounting/TopicPartitionOffsetRange.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,25 @@ data class TopicPartitionOffsetRange(
3030
val partition: Int = topicPartition.partition
3131

3232
/** Full constructor. */
33-
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long, lastModified: Instant = Instant.now()) : this(
33+
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long?, lastModified: Instant = Instant.now()) : this(
3434
TopicPartition(topic, partition),
3535
OffsetRangeSet.Range(offsetFrom, offsetTo, lastModified))
3636

3737
override fun toString(): String = "$topic+$partition+${range.from}+${range.to} (${range.lastProcessed})"
3838

3939
companion object {
40+
private val filenameSplitRegex = "[+.]".toRegex()
41+
4042
@Throws(NumberFormatException::class, IndexOutOfBoundsException::class)
4143
fun parseFilename(filename: String, lastModified: Instant): TopicPartitionOffsetRange {
42-
val fileNameParts = filename.split("[+.]".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
44+
val fileNameParts = filename.split(filenameSplitRegex)
45+
.dropLastWhile { it.isEmpty() || it == "avro" }
4346

4447
return TopicPartitionOffsetRange(
4548
fileNameParts[0],
46-
Integer.parseInt(fileNameParts[1]),
47-
java.lang.Long.parseLong(fileNameParts[2]),
48-
java.lang.Long.parseLong(fileNameParts[3]),
49+
fileNameParts[1].toInt(),
50+
fileNameParts[2].toLong(),
51+
fileNameParts.getOrNull(3)?.toLong(),
4952
lastModified)
5053
}
5154
}

src/main/java/org/radarbase/output/source/TopicFileList.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import java.nio.file.Path
55
import java.time.Instant
66

77
data class TopicFileList(val topic: String, val files: List<TopicFile>) {
8-
val numberOfOffsets: Long = this.files.fold(0L) { sum, f -> sum + f.size }
8+
val numberOfOffsets: Long? = this.files
9+
.takeIf { fs -> fs.none { it.size == null } }
10+
?.fold(0L) { sum, f -> sum + f.size!! }
911
val numberOfFiles: Int = this.files.size
1012
}
1113

1214
data class TopicFile(val topic: String, val path: Path, val lastModified: Instant) {
1315
val range: TopicPartitionOffsetRange = TopicPartitionOffsetRange.parseFilename(path.fileName.toString(), lastModified)
14-
val size: Long = range.range.size
16+
val size: Long? = range.range.size
1517
}
1618

1719
data class SimpleFileStatus(val path: Path, val isDirectory: Boolean, val lastModified: Instant)

src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,12 @@ class RadarKafkaRestructure(
146146
.filter { f -> f.lastModified.isBefore(deleteThreshold) &&
147147
// ensure that there is a file with a larger offset also
148148
// processed, so the largest offset is never removed.
149-
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1))) }
149+
if (f.range.range.to != null) {
150+
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1)))
151+
} else {
152+
seenFiles.contains(f.range.topicPartition, f.range.range.from, f.range.range.lastProcessed)
153+
}
154+
}
150155
.take(maxFilesPerTopic)
151156
.map { kafkaStorage.delete(it.path) }
152157
.count()

src/main/java/org/radarbase/output/worker/RestructureWorker.kt

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,41 +41,52 @@ internal class RestructureWorker(
4141
fun processPaths(topicPaths: TopicFileList) {
4242
val numFiles = topicPaths.numberOfFiles
4343
val numOffsets = topicPaths.numberOfOffsets
44+
val totalProgress = numOffsets ?: numFiles.toLong()
4445

4546
val topic = topicPaths.topic
4647

4748
val numberFormat = NumberFormat.getNumberInstance()
4849

49-
logger.info("Processing topic {}: converting {} files with {} records",
50-
topic, numberFormat.format(numFiles), numberFormat.format(numOffsets))
50+
if (numOffsets == null) {
51+
logger.info("Processing topic {}: converting {} files",
52+
topic, numberFormat.format(numFiles))
53+
} else {
54+
logger.info("Processing topic {}: converting {} files with {} records",
55+
topic, numberFormat.format(numFiles), numberFormat.format(numOffsets))
56+
}
5157

5258
val seenOffsets = accountant.offsets
5359
.withFactory { ReadOnlyFunctionalValue(it) }
5460

55-
val progressBar = ProgressBar(topic, numOffsets, 50, 5, TimeUnit.SECONDS)
61+
val progressBar = ProgressBar(topic, totalProgress, 50, 5, TimeUnit.SECONDS)
5662
progressBar.update(0)
5763

5864
val batchSize = (BATCH_SIZE * ThreadLocalRandom.current().nextDouble(0.75, 1.25)).roundToLong()
59-
var currentSize: Long = 0
65+
var currentSize: Long = 0L
66+
var currentFile: Long = 0L
6067
try {
6168
for (file in topicPaths.files) {
6269
if (closed.get()) {
6370
break
6471
}
65-
try {
72+
val processedSize = try {
6673
this.processFile(file, progressBar, seenOffsets)
6774
} catch (exc: JsonMappingException) {
6875
logger.error("Cannot map values", exc)
76+
0L
6977
}
7078

71-
processedFileCount++
72-
progressBar.update(currentSize)
73-
74-
currentSize += file.size
79+
currentFile += 1
80+
currentSize += processedSize
7581
if (currentSize >= batchSize) {
7682
currentSize = 0
7783
cacheStore.flush()
7884
}
85+
86+
processedFileCount++
87+
if (numOffsets == null) {
88+
progressBar.update(currentFile)
89+
}
7990
}
8091
} catch (ex: IOException) {
8192
logger.error("Failed to process file", ex)
@@ -85,26 +96,28 @@ internal class RestructureWorker(
8596
logger.warn("Shutting down")
8697
}
8798

88-
progressBar.update(numOffsets)
99+
progressBar.update(totalProgress)
89100
}
90101

91102
@Throws(IOException::class)
92103
private fun processFile(file: TopicFile,
93-
progressBar: ProgressBar, seenOffsets: OffsetRangeSet) {
104+
progressBar: ProgressBar, seenOffsets: OffsetRangeSet): Long {
94105
logger.debug("Reading {}", file.path)
95106

96107
val offset = file.range.range.from
97108

98-
reader.newInput(file).use { input ->
109+
return reader.newInput(file).use { input ->
99110
// processing zero-length files may trigger a stall. See:
100111
// https://github.com/RADAR-base/Restructure-HDFS-topic/issues/3
101112
if (input.length() == 0L) {
102113
logger.warn("File {} has zero length, skipping.", file.path)
103-
return
114+
return 0L
104115
}
105116
val transaction = Accountant.Transaction(file.range.topicPartition, offset, file.lastModified)
117+
var recordsSeen = 0L
106118
extractRecords(input) { relativeOffset, record ->
107119
transaction.offset = offset + relativeOffset
120+
recordsSeen += 1L
108121
val alreadyContains = time("accounting.check") {
109122
seenOffsets.contains(file.range.topicPartition, transaction.offset, transaction.lastModified)
110123
}
@@ -113,8 +126,11 @@ internal class RestructureWorker(
113126
this.writeRecord(transaction, record)
114127
}
115128
processedRecordsCount++
116-
progressBar.update(processedRecordsCount)
129+
if (file.size != null) {
130+
progressBar.update(processedRecordsCount)
131+
}
117132
}
133+
recordsSeen
118134
}
119135
}
120136

0 commit comments

Comments
 (0)