Skip to content

Commit 85c1056

Browse files
authored
Merge pull request #70 from RADAR-base/release-1.0.1
Release 1.0.1
2 parents b168d12 + 2831165 commit 85c1056

File tree

9 files changed

+67
-33
lines changed

9 files changed

+67
-33
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ plugins {
99
}
1010

1111
group 'org.radarbase'
12-
version '1.0.0'
12+
version '1.0.1'
1313
mainClassName = 'org.radarbase.output.Application'
1414

1515
sourceCompatibility = '1.8'

src/main/java/org/radarbase/output/Application.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ class Application(
6363
override val sourceStorage: SourceStorage
6464
get() = sourceStorageFactory.createSourceStorage()
6565

66-
private val targetStorageFactory = TargetStorageFactory(config.target)
67-
override val targetStorage: TargetStorage
68-
get() = targetStorageFactory.createTargetStorage()
66+
override val targetStorage: TargetStorage = TargetStorageFactory(config.target).createTargetStorage()
6967

7068
override val redisPool: JedisPool = JedisPool(config.redis.uri)
7169
override val remoteLockManager: RemoteLockManager = RedisRemoteLockManager(

src/main/java/org/radarbase/output/accounting/OffsetIntervals.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class OffsetIntervals {
2121
}
2222

2323
fun contains(range: OffsetRangeSet.Range): Boolean {
24+
checkNotNull(range.to)
2425
// -index-1 if not found
2526
val searchIndex = offsetsFrom.binarySearch(range.from)
2627
val index = if (searchIndex >= 0) searchIndex else -searchIndex - 2
@@ -80,6 +81,7 @@ class OffsetIntervals {
8081

8182
fun add(range: OffsetRangeSet.Range) {
8283
val (from, to, lastModified) = range
84+
checkNotNull(to)
8385
var index = offsetsFrom.binarySearch(from)
8486
if (index < 0) {
8587
// index where this range would be entered

src/main/java/org/radarbase/output/accounting/OffsetRangeSet.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class OffsetRangeSet {
148148
.getOrDefault(this, EMPTY_VALUE)
149149
.read(function)
150150

151-
data class Range(val from: Long, val to: Long, val lastProcessed: Instant) {
152-
val size: Long = to - from + 1
151+
data class Range(val from: Long, val to: Long?, val lastProcessed: Instant) {
152+
val size: Long? = to?.let { it - from + 1 }
153153
override fun toString() = "($from - $to, $lastProcessed)"
154154
}
155155

src/main/java/org/radarbase/output/accounting/TopicPartitionOffsetRange.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,25 @@ data class TopicPartitionOffsetRange(
3030
val partition: Int = topicPartition.partition
3131

3232
/** Full constructor. */
33-
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long, lastModified: Instant = Instant.now()) : this(
33+
constructor(topic: String, partition: Int, offsetFrom: Long, offsetTo: Long?, lastModified: Instant = Instant.now()) : this(
3434
TopicPartition(topic, partition),
3535
OffsetRangeSet.Range(offsetFrom, offsetTo, lastModified))
3636

3737
override fun toString(): String = "$topic+$partition+${range.from}+${range.to} (${range.lastProcessed})"
3838

3939
companion object {
40+
private val filenameSplitRegex = "[+.]".toRegex()
41+
4042
@Throws(NumberFormatException::class, IndexOutOfBoundsException::class)
4143
fun parseFilename(filename: String, lastModified: Instant): TopicPartitionOffsetRange {
42-
val fileNameParts = filename.split("[+.]".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
44+
val fileNameParts = filename.split(filenameSplitRegex)
45+
.dropLastWhile { it.isEmpty() || it == "avro" }
4346

4447
return TopicPartitionOffsetRange(
4548
fileNameParts[0],
46-
Integer.parseInt(fileNameParts[1]),
47-
java.lang.Long.parseLong(fileNameParts[2]),
48-
java.lang.Long.parseLong(fileNameParts[3]),
49+
fileNameParts[1].toInt(),
50+
fileNameParts[2].toLong(),
51+
fileNameParts.getOrNull(3)?.toLong(),
4952
lastModified)
5053
}
5154
}

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,12 @@ data class WorkerConfig(
110110
* Number of files to simultaneously keep in cache, including open writer. A higher size will
111111
* decrease overhead but increase memory usage and open file descriptors.
112112
*/
113-
val cacheSize: Int = CACHE_SIZE_DEFAULT) {
113+
val cacheSize: Int = CACHE_SIZE_DEFAULT,
114+
/**
115+
* Number of offsets to simultaneously keep in cache. A higher size will
116+
* decrease overhead but increase memory usage.
117+
*/
118+
val cacheOffsetsSize: Long = 500_000) {
114119
init {
115120
check(cacheSize >= 1) { "Maximum files per topic must be strictly positive" }
116121
maxFilesPerTopic?.let { check(it >= 1) { "Maximum files per topic must be strictly positive" } }

src/main/java/org/radarbase/output/source/TopicFileList.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import java.nio.file.Path
55
import java.time.Instant
66

77
data class TopicFileList(val topic: String, val files: List<TopicFile>) {
8-
val numberOfOffsets: Long = this.files.fold(0L) { sum, f -> sum + f.size }
8+
val numberOfOffsets: Long? = this.files
9+
.takeIf { fs -> fs.none { it.size == null } }
10+
?.fold(0L) { sum, f -> sum + f.size!! }
911
val numberOfFiles: Int = this.files.size
1012
}
1113

1214
data class TopicFile(val topic: String, val path: Path, val lastModified: Instant) {
1315
val range: TopicPartitionOffsetRange = TopicPartitionOffsetRange.parseFilename(path.fileName.toString(), lastModified)
14-
val size: Long = range.range.size
16+
val size: Long? = range.range.size
1517
}
1618

1719
data class SimpleFileStatus(val path: Path, val isDirectory: Boolean, val lastModified: Instant)

src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,12 @@ class RadarKafkaRestructure(
146146
.filter { f -> f.lastModified.isBefore(deleteThreshold) &&
147147
// ensure that there is a file with a larger offset also
148148
// processed, so the largest offset is never removed.
149-
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1))) }
149+
if (f.range.range.to != null) {
150+
seenFiles.contains(f.range.copy(range = f.range.range.copy(to = f.range.range.to + 1)))
151+
} else {
152+
seenFiles.contains(f.range.topicPartition, f.range.range.from, f.range.range.lastProcessed)
153+
}
154+
}
150155
.take(maxFilesPerTopic)
151156
.map { kafkaStorage.delete(it.path) }
152157
.count()

src/main/java/org/radarbase/output/worker/RestructureWorker.kt

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,47 +35,59 @@ internal class RestructureWorker(
3535
var processedRecordsCount: Long = 0
3636
private val reader = storage.reader()
3737
private val pathFactory: RecordPathFactory = fileStoreFactory.pathFactory
38+
private val batchSize = fileStoreFactory.config.worker.cacheOffsetsSize
3839

3940
private val cacheStore = fileStoreFactory.newFileCacheStore(accountant)
4041

4142
fun processPaths(topicPaths: TopicFileList) {
4243
val numFiles = topicPaths.numberOfFiles
4344
val numOffsets = topicPaths.numberOfOffsets
45+
val totalProgress = numOffsets ?: numFiles.toLong()
4446

4547
val topic = topicPaths.topic
4648

4749
val numberFormat = NumberFormat.getNumberInstance()
4850

49-
logger.info("Processing topic {}: converting {} files with {} records",
50-
topic, numberFormat.format(numFiles), numberFormat.format(numOffsets))
51+
if (numOffsets == null) {
52+
logger.info("Processing topic {}: converting {} files",
53+
topic, numberFormat.format(numFiles))
54+
} else {
55+
logger.info("Processing topic {}: converting {} files with {} records",
56+
topic, numberFormat.format(numFiles), numberFormat.format(numOffsets))
57+
}
5158

5259
val seenOffsets = accountant.offsets
5360
.withFactory { ReadOnlyFunctionalValue(it) }
5461

55-
val progressBar = ProgressBar(topic, numOffsets, 50, 5, TimeUnit.SECONDS)
62+
val progressBar = ProgressBar(topic, totalProgress, 50, 5, TimeUnit.SECONDS)
5663
progressBar.update(0)
5764

58-
val batchSize = (BATCH_SIZE * ThreadLocalRandom.current().nextDouble(0.75, 1.25)).roundToLong()
59-
var currentSize: Long = 0
65+
val batchSize = generateBatchSize()
66+
var currentSize = 0L
67+
var currentFile = 0L
6068
try {
6169
for (file in topicPaths.files) {
6270
if (closed.get()) {
6371
break
6472
}
65-
try {
73+
val processedSize = try {
6674
this.processFile(file, progressBar, seenOffsets)
6775
} catch (exc: JsonMappingException) {
6876
logger.error("Cannot map values", exc)
77+
0L
6978
}
7079

71-
processedFileCount++
72-
progressBar.update(currentSize)
73-
74-
currentSize += file.size
80+
currentFile += 1
81+
currentSize += processedSize
7582
if (currentSize >= batchSize) {
7683
currentSize = 0
7784
cacheStore.flush()
7885
}
86+
87+
processedFileCount++
88+
if (numOffsets == null) {
89+
progressBar.update(currentFile)
90+
}
7991
}
8092
} catch (ex: IOException) {
8193
logger.error("Failed to process file", ex)
@@ -85,26 +97,28 @@ internal class RestructureWorker(
8597
logger.warn("Shutting down")
8698
}
8799

88-
progressBar.update(numOffsets)
100+
progressBar.update(totalProgress)
89101
}
90102

91103
@Throws(IOException::class)
92104
private fun processFile(file: TopicFile,
93-
progressBar: ProgressBar, seenOffsets: OffsetRangeSet) {
105+
progressBar: ProgressBar, seenOffsets: OffsetRangeSet): Long {
94106
logger.debug("Reading {}", file.path)
95107

96108
val offset = file.range.range.from
97109

98-
reader.newInput(file).use { input ->
110+
return reader.newInput(file).use { input ->
99111
// processing zero-length files may trigger a stall. See:
100112
// https://github.com/RADAR-base/Restructure-HDFS-topic/issues/3
101113
if (input.length() == 0L) {
102114
logger.warn("File {} has zero length, skipping.", file.path)
103-
return
115+
return 0L
104116
}
105117
val transaction = Accountant.Transaction(file.range.topicPartition, offset, file.lastModified)
118+
var recordsSeen = 0L
106119
extractRecords(input) { relativeOffset, record ->
107120
transaction.offset = offset + relativeOffset
121+
recordsSeen += 1L
108122
val alreadyContains = time("accounting.check") {
109123
seenOffsets.contains(file.range.topicPartition, transaction.offset, transaction.lastModified)
110124
}
@@ -113,8 +127,11 @@ internal class RestructureWorker(
113127
this.writeRecord(transaction, record)
114128
}
115129
processedRecordsCount++
116-
progressBar.update(processedRecordsCount)
130+
if (file.size != null) {
131+
progressBar.update(processedRecordsCount)
132+
}
117133
}
134+
recordsSeen
118135
}
119136
}
120137

@@ -153,15 +170,17 @@ internal class RestructureWorker(
153170
}
154171
}
155172

173+
private fun generateBatchSize(): Long {
174+
val modifier = ThreadLocalRandom.current().nextDouble(0.75, 1.25)
175+
return (batchSize * modifier).roundToLong()
176+
}
177+
156178
override fun close() {
157179
reader.close()
158180
cacheStore.close()
159181
}
160182

161183
companion object {
162184
private val logger = LoggerFactory.getLogger(RestructureWorker::class.java)
163-
164-
/** Number of offsets to process in a single task. */
165-
private const val BATCH_SIZE: Long = 500000
166185
}
167186
}

0 commit comments

Comments
 (0)