Skip to content

Commit 2831165

Browse files
committed
Merge branch 'dev' of github.com:RADAR-base/Restructure-HDFS-topic into dev
2 parents bea5ac7 + 1160f56 commit 2831165

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

src/main/java/org/radarbase/output/config/RestructureConfig.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,12 @@ data class WorkerConfig(
110110
* Number of files to simultaneously keep in cache, including open writer. A higher size will
111111
* decrease overhead but increase memory usage and open file descriptors.
112112
*/
113-
val cacheSize: Int = CACHE_SIZE_DEFAULT) {
113+
val cacheSize: Int = CACHE_SIZE_DEFAULT,
114+
/**
115+
* Number of offsets to simultaneously keep in cache. A higher size will
116+
* decrease overhead but increase memory usage.
117+
*/
118+
val cacheOffsetsSize: Long = 500_000) {
114119
init {
115120
check(cacheSize >= 1) { "Maximum files per topic must be strictly positive" }
116121
maxFilesPerTopic?.let { check(it >= 1) { "Maximum files per topic must be strictly positive" } }

src/main/java/org/radarbase/output/worker/RestructureWorker.kt

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ internal class RestructureWorker(
3535
var processedRecordsCount: Long = 0
3636
private val reader = storage.reader()
3737
private val pathFactory: RecordPathFactory = fileStoreFactory.pathFactory
38+
private val batchSize = fileStoreFactory.config.worker.cacheOffsetsSize
3839

3940
private val cacheStore = fileStoreFactory.newFileCacheStore(accountant)
4041

@@ -61,9 +62,9 @@ internal class RestructureWorker(
6162
val progressBar = ProgressBar(topic, totalProgress, 50, 5, TimeUnit.SECONDS)
6263
progressBar.update(0)
6364

64-
val batchSize = (BATCH_SIZE * ThreadLocalRandom.current().nextDouble(0.75, 1.25)).roundToLong()
65-
var currentSize: Long = 0L
66-
var currentFile: Long = 0L
65+
val batchSize = generateBatchSize()
66+
var currentSize = 0L
67+
var currentFile = 0L
6768
try {
6869
for (file in topicPaths.files) {
6970
if (closed.get()) {
@@ -169,15 +170,17 @@ internal class RestructureWorker(
169170
}
170171
}
171172

173+
private fun generateBatchSize(): Long {
174+
val modifier = ThreadLocalRandom.current().nextDouble(0.75, 1.25)
175+
return (batchSize * modifier).roundToLong()
176+
}
177+
172178
override fun close() {
173179
reader.close()
174180
cacheStore.close()
175181
}
176182

177183
companion object {
178184
private val logger = LoggerFactory.getLogger(RestructureWorker::class.java)
179-
180-
/** Number of offsets to process in a single task. */
181-
private const val BATCH_SIZE: Long = 500000
182185
}
183186
}

0 commit comments

Comments
 (0)