Skip to content

Commit 46dc952

Browse files
committed
Added dedicated coroutine tree lister
1 parent d0ba1c1 commit 46dc952

16 files changed

+339
-234
lines changed

src/main/java/org/radarbase/output/Application.kt

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,15 @@ class Application(
7575

7676
override val workerSemaphore = Semaphore(config.worker.numThreads * 2)
7777

78-
private val jobs = listOfNotNull(
79-
Job("restructure", config.service.interval, ::runRestructure).takeIf { config.worker.enable },
80-
Job("clean", config.cleaner.interval, ::runCleaner).takeIf { config.cleaner.enable },
81-
)
78+
private val jobs: List<Job>
79+
80+
init {
81+
val serviceMutex = Mutex()
82+
jobs = listOfNotNull(
83+
RadarKafkaRestructure.job(config, serviceMutex),
84+
SourceDataCleaner.job(config, serviceMutex),
85+
)
86+
}
8287

8388
@Throws(IOException::class)
8489
override fun newFileCacheStore(accountant: Accountant) = FileCacheStore(this, accountant)
@@ -105,15 +110,10 @@ class Application(
105110
if (config.service.enable) {
106111
runService()
107112
} else {
108-
val serviceMutex = Mutex()
109113
runBlocking {
110-
jobs.map {
111-
async {
112-
serviceMutex.withLock {
113-
it.run()
114-
}
115-
}
116-
}.awaitAll()
114+
jobs.forEach { job ->
115+
launch { job.run(this@Application) }
116+
}
117117
}
118118
}
119119
}
@@ -122,47 +122,18 @@ class Application(
122122
logger.info("Running as a Service with poll interval of {} seconds", config.service.interval)
123123
logger.info("Press Ctrl+C to exit...")
124124

125-
val serviceMutex = Mutex()
126-
127125
runBlocking {
128-
jobs.forEach {
129-
launch { it.schedule(serviceMutex) }
130-
}
131-
}
132-
}
133-
134-
private suspend fun runCleaner() {
135-
SourceDataCleaner(this).useSuspended { cleaner ->
136-
for (input in config.paths.inputs) {
137-
logger.info("Cleaning {}", input)
138-
cleaner.process(input.toString())
139-
}
140-
logger.info("Cleaned up {} files",
141-
cleaner.deletedFileCount.format())
142-
}
143-
}
144-
145-
private suspend fun runRestructure() {
146-
RadarKafkaRestructure(this).useSuspended { restructure ->
147-
for (input in config.paths.inputs) {
148-
logger.info("In: {}", input)
149-
logger.info("Out: {}", pathFactory.root)
150-
restructure.process(input.toString())
126+
jobs.forEach { job ->
127+
launch { job.schedule(this@Application) }
151128
}
152-
153-
logger.info(
154-
"Processed {} files and {} records",
155-
restructure.processedFileCount.format(),
156-
restructure.processedRecordsCount.format(),
157-
)
158129
}
159130
}
160131

161132
companion object {
162133
private val logger = LoggerFactory.getLogger(Application::class.java)
163134
const val CACHE_SIZE_DEFAULT = 100
164135

165-
private fun LongAdder.format(): String =
136+
internal fun LongAdder.format(): String =
166137
NumberFormat.getNumberInstance().format(sum())
167138

168139
private fun parseArgs(args: Array<String>): CommandLineArgs {

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package org.radarbase.output.cleaner
22

33
import kotlinx.coroutines.*
4-
import kotlinx.coroutines.flow.*
4+
import kotlinx.coroutines.sync.Mutex
5+
import kotlinx.coroutines.sync.withLock
56
import kotlinx.coroutines.sync.withPermit
7+
import org.radarbase.output.Application.Companion.format
68
import org.radarbase.output.FileStoreFactory
79
import org.radarbase.output.accounting.Accountant
810
import org.radarbase.output.accounting.AccountantImpl
11+
import org.radarbase.output.config.RestructureConfig
912
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
13+
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
1014
import org.radarbase.output.util.Timer
15+
import org.radarbase.output.worker.Job
1116
import org.slf4j.LoggerFactory
1217
import java.io.Closeable
1318
import java.io.IOException
@@ -33,6 +38,7 @@ class SourceDataCleaner(
3338
.minus(fileStoreFactory.config.cleaner.age.toLong(), ChronoUnit.DAYS)
3439

3540
val deletedFileCount = LongAdder()
41+
private val scope = CoroutineScope(Dispatchers.Default)
3642

3743
@Throws(IOException::class, InterruptedException::class)
3844
suspend fun process(directoryName: String) {
@@ -43,23 +49,21 @@ class SourceDataCleaner(
4349

4450
logger.info("{} topics found", paths.size)
4551

46-
coroutineScope {
47-
paths.forEach { p ->
48-
launch {
49-
try {
50-
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
51-
mapTopic(p)
52-
}
53-
if (deleteCount > 0) {
54-
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
55-
deletedFileCount.add(deleteCount)
56-
}
57-
} catch (ex: Exception) {
58-
logger.warn("Failed to map topic", ex)
52+
paths.map { p ->
53+
scope.launch {
54+
try {
55+
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
56+
mapTopic(p)
5957
}
58+
if (deleteCount > 0) {
59+
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
60+
deletedFileCount.add(deleteCount)
61+
}
62+
} catch (ex: Exception) {
63+
logger.warn("Failed to map topic", ex)
6064
}
6165
}
62-
}
66+
}.joinAll()
6367
}
6468

6569
private suspend fun mapTopic(topicPath: Path): Long {
@@ -93,41 +97,65 @@ class SourceDataCleaner(
9397
): Int {
9498
val offsets = accountant.offsets.copyForTopic(topic)
9599

96-
return sourceStorage.walker.walkRecords(topic, topicPath).consumeAsFlow()
97-
.filter { f ->
98-
f.lastModified.isBefore(deleteThreshold) &&
99-
// ensure that there is a file with a larger offset also
100-
// processed, so the largest offset is never removed.
101-
offsets.contains(f.range.mapRange { r -> r.incrementTo() })
102-
}
103-
.take(maxFilesPerTopic)
104-
.takeWhile { !isClosed.get() }
105-
.count { file ->
106-
if (extractionCheck.isExtracted(file)) {
107-
logger.info("Removing {}", file.path)
108-
Timer.time("cleaner.delete") {
109-
sourceStorage.delete(file.path)
100+
val paths = sourceStorage.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f ->
101+
f.lastModified.isBefore(deleteThreshold) &&
102+
// ensure that there is a file with a larger offset also
103+
// processed, so the largest offset is never removed.
104+
offsets.contains(f.range.mapRange { r -> r.incrementTo() })
105+
}
106+
107+
val accountantMutex = Mutex()
108+
109+
return coroutineScope {
110+
paths
111+
.map { file ->
112+
async {
113+
if (extractionCheck.isExtracted(file)) {
114+
logger.info("Removing {}", file.path)
115+
Timer.time("cleaner.delete") {
116+
sourceStorage.delete(file.path)
117+
}
118+
true
119+
} else {
120+
// extract the file again at a later time
121+
logger.warn("Source file was not completely extracted: {}", file.path)
122+
val fullRange = file.range.mapRange { it.ensureToOffset() }
123+
accountantMutex.withLock {
124+
accountant.remove(fullRange)
125+
}
126+
false
127+
}
110128
}
111-
true
112-
} else {
113-
logger.warn("Source file was not completely extracted: {}", file.path)
114-
// extract the file again at a later time
115-
accountant.remove(file.range.mapRange { it.ensureToOffset() })
116-
false
117129
}
118-
}
130+
.awaitAll()
131+
.count { it }
132+
}
119133
}
120134

121-
private suspend fun topicPaths(path: Path): List<Path> = sourceStorage.walker.walkTopics(path, excludeTopics)
135+
private suspend fun topicPaths(path: Path): List<Path> = sourceStorage.listTopics(path, excludeTopics)
122136
.toMutableList()
123137
// different services start on different topics to decrease lock contention
124138
.also { it.shuffle() }
125139

126140
override fun close() {
127-
isClosed.set(true)
141+
scope.cancel()
128142
}
129143

130144
companion object {
131145
private val logger = LoggerFactory.getLogger(SourceDataCleaner::class.java)
146+
147+
fun job(config: RestructureConfig, serviceMutex: Mutex): Job? = if (config.cleaner.enable) {
148+
Job("cleaner", config.cleaner.interval, ::runCleaner, serviceMutex)
149+
} else null
150+
151+
private suspend fun runCleaner(factory: FileStoreFactory) {
152+
SourceDataCleaner(factory).useSuspended { cleaner ->
153+
for (input in factory.config.paths.inputs) {
154+
logger.info("Cleaning {}", input)
155+
cleaner.process(input.toString())
156+
}
157+
logger.info("Cleaned up {} files", cleaner.deletedFileCount.format())
158+
}
159+
}
132160
}
133161
}

src/main/java/org/radarbase/output/source/AzureSourceStorage.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ class AzureSourceStorage(
5454
blobClient(path).delete()
5555
}
5656

57-
override val walker: SourceStorageWalker = GeneralSourceStorageWalker(this)
58-
5957
override fun createReader(): SourceStorage.SourceStorageReader = AzureSourceStorageReader()
6058

6159
private inner class AzureSourceStorageReader: SourceStorage.SourceStorageReader {

src/main/java/org/radarbase/output/source/GeneralSourceStorageWalker.kt

Lines changed: 0 additions & 49 deletions
This file was deleted.

src/main/java/org/radarbase/output/source/S3SourceStorage.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ class S3SourceStorage(
2828
config: S3Config,
2929
private val tempPath: Path
3030
): SourceStorage {
31-
override val walker: SourceStorageWalker = GeneralSourceStorageWalker(this)
3231
private val bucket = config.bucket
3332
private val readEndOffset = config.endOffsetFromTags
3433

src/main/java/org/radarbase/output/source/SourceStorage.kt

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package org.radarbase.output.source
22

3-
import kotlinx.coroutines.FlowPreview
4-
import kotlinx.coroutines.flow.Flow
53
import org.apache.avro.file.SeekableInput
64
import org.radarbase.output.accounting.TopicPartitionOffsetRange
7-
import org.radarbase.output.util.SuspendedCloseable
8-
import java.io.Closeable
5+
import org.radarbase.output.util.*
6+
import org.radarbase.output.util.AvroFileLister.Companion.avroFileTreeLister
7+
import org.radarbase.output.util.AvroTopicLister.Companion.avroTopicTreeLister
98
import java.nio.file.Path
109
import java.time.Instant
1110

@@ -24,8 +23,28 @@ interface SourceStorage {
2423
return TopicFile(topic, status.path, lastModified, range)
2524
}
2625

27-
/** Find records and topics. */
28-
val walker: SourceStorageWalker
26+
/**
27+
* Recursively returns all record files in a sequence of a given topic with path.
28+
* The path must only contain records of a single topic, this is not verified.
29+
*/
30+
suspend fun listTopicFiles(
31+
topic: String,
32+
topicPath: Path,
33+
limit: Int,
34+
predicate: (TopicFile) -> Boolean,
35+
): List<TopicFile> = avroFileTreeLister()
36+
.list(TopicPath(topic, topicPath), limit, predicate)
37+
38+
/**
39+
* Recursively find all topic root paths of records in the given path.
40+
* Exclude paths belonging to the set of given excluded topics.
41+
*/
42+
suspend fun listTopics(
43+
root: Path,
44+
exclude: Set<String>,
45+
): List<Path> = avroTopicTreeLister()
46+
.listTo(LinkedHashSet(), root)
47+
.filter { it.fileName.toString() !in exclude }
2948

3049
/**
3150
* File reader for the storage medium.

src/main/java/org/radarbase/output/source/SourceStorageWalker.kt

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)