Skip to content

Commit bdf2544

Browse files
committed
Properly support multiple sources and targets
1 parent c5b6ab0 commit bdf2544

37 files changed

+503
-423
lines changed

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import kotlinx.coroutines.test.runTest
1616
import kotlinx.coroutines.withContext
1717
import org.junit.jupiter.api.Assertions.assertEquals
1818
import org.junit.jupiter.api.Test
19-
import org.radarbase.output.config.PathConfig
2019
import org.radarbase.output.config.PathFormatterConfig
2120
import org.radarbase.output.config.ResourceConfig
2221
import org.radarbase.output.config.RestructureConfig
@@ -49,9 +48,8 @@ class RestructureS3IntegrationTest {
4948
),
5049
)
5150
val config = RestructureConfig(
52-
source = ResourceConfig("s3", s3 = sourceConfig),
53-
target = ResourceConfig("s3", s3 = targetConfig),
54-
paths = PathConfig(inputs = listOf(Paths.get("in"))),
51+
sources = listOf(ResourceConfig("s3", path = Paths.get("in"), s3 = sourceConfig)),
52+
targets = mapOf("radar-output-storage" to ResourceConfig("s3", path = Paths.get("output"), s3 = targetConfig)),
5553
worker = WorkerConfig(minimumFileAge = 0L),
5654
topics = topicConfig,
5755
)

src/main/java/org/radarbase/output/Application.kt

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ import org.radarbase.output.config.RestructureConfig
3535
import org.radarbase.output.format.RecordConverterFactory
3636
import org.radarbase.output.path.RecordPathFactory
3737
import org.radarbase.output.source.InMemoryStorageIndex
38-
import org.radarbase.output.source.SourceStorage
3938
import org.radarbase.output.source.SourceStorageFactory
40-
import org.radarbase.output.source.StorageIndexManager
39+
import org.radarbase.output.source.SourceStorageManager
4140
import org.radarbase.output.target.TargetStorage
4241
import org.radarbase.output.target.TargetStorageFactory
4342
import org.radarbase.output.util.Timer
@@ -47,9 +46,7 @@ import org.radarbase.output.worker.RadarKafkaRestructure
4746
import org.slf4j.LoggerFactory
4847
import redis.clients.jedis.JedisPool
4948
import java.io.IOException
50-
import java.nio.file.Path
5149
import java.text.NumberFormat
52-
import java.time.Duration
5350
import java.time.LocalDateTime
5451
import java.time.format.DateTimeFormatter
5552
import java.util.concurrent.atomic.LongAdder
@@ -64,18 +61,23 @@ class Application(
6461
override val config = config.apply { validate() }
6562
override val recordConverter: RecordConverterFactory = config.format.createConverter()
6663
override val compression: Compression = config.compression.createCompression()
67-
override val pathFactory: RecordPathFactory = config.paths.createFactory(
68-
config.target,
69-
recordConverter.extension + compression.extension,
70-
config.topics,
71-
)
7264

73-
private val sourceStorageFactory = SourceStorageFactory(config.source, config.paths.temp)
74-
override val sourceStorage: SourceStorage
75-
get() = sourceStorageFactory.createSourceStorage()
65+
private val sourceStorageFactory = SourceStorageFactory(config.paths.temp)
66+
override val sourceStorage: List<SourceStorageManager> = config.consolidatedSources
67+
.map { sourceConfig ->
68+
val storage = sourceStorageFactory.createSourceStorage(sourceConfig)
69+
SourceStorageManager(storage, InMemoryStorageIndex(), sourceConfig.index)
70+
}
71+
72+
override val targetStorage: TargetStorage = TargetStorageFactory()
73+
.createTargetStorage(config.paths.target.defaultName, config.consolidatedTargets)
7674

77-
override val targetStorage: TargetStorage =
78-
TargetStorageFactory(config.target).createTargetStorage()
75+
override val pathFactory: RecordPathFactory =
76+
config.paths.createFactory(
77+
targetStorage,
78+
recordConverter.extension + compression.extension,
79+
config.topics,
80+
)
7981

8082
override val redisHolder: RedisHolder = RedisHolder(JedisPool(config.redis.uri))
8183
override val remoteLockManager: RemoteLockManager = RedisRemoteLockManager(
@@ -88,27 +90,9 @@ class Application(
8890

8991
override val workerSemaphore = Semaphore(config.worker.numThreads * 2)
9092

91-
override val storageIndexManagers: Map<Path, StorageIndexManager>
92-
9393
private val jobs: List<Job>
9494

9595
init {
96-
val indexConfig = config.source.index
97-
val (fullScan, emptyScan) = if (indexConfig == null) {
98-
listOf(3600L, 900L)
99-
} else {
100-
listOf(indexConfig.fullSyncInterval, indexConfig.emptyDirectorySyncInterval)
101-
}.map { Duration.ofSeconds(it) }
102-
103-
storageIndexManagers = config.paths.inputs.associateWith { input ->
104-
StorageIndexManager(
105-
InMemoryStorageIndex(),
106-
sourceStorage,
107-
input,
108-
fullScan,
109-
emptyScan,
110-
)
111-
}
11296
val serviceMutex = Mutex()
11397
jobs = listOfNotNull(
11498
RadarKafkaRestructure.job(config, serviceMutex),
@@ -137,7 +121,7 @@ class Application(
137121
}
138122

139123
runBlocking {
140-
launch { targetStorage.initialize() }
124+
targetStorage.initialize()
141125
}
142126

143127
if (config.service.enable) {

src/main/java/org/radarbase/output/FileStoreFactory.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@ import org.radarbase.output.compression.Compression
2525
import org.radarbase.output.config.RestructureConfig
2626
import org.radarbase.output.format.RecordConverterFactory
2727
import org.radarbase.output.path.RecordPathFactory
28-
import org.radarbase.output.source.SourceStorage
29-
import org.radarbase.output.source.StorageIndexManager
28+
import org.radarbase.output.source.SourceStorageManager
3029
import org.radarbase.output.target.TargetStorage
3130
import org.radarbase.output.worker.FileCacheStore
3231
import java.io.IOException
33-
import java.nio.file.Path
3432

3533
/** Factory for all factory classes and settings. */
3634
interface FileStoreFactory {
37-
val sourceStorage: SourceStorage
35+
val sourceStorage: List<SourceStorageManager>
3836
val targetStorage: TargetStorage
3937
val pathFactory: RecordPathFactory
4038
val compression: Compression
@@ -44,7 +42,6 @@ interface FileStoreFactory {
4442
val redisHolder: RedisHolder
4543
val offsetPersistenceFactory: OffsetPersistenceFactory
4644
val workerSemaphore: Semaphore
47-
val storageIndexManagers: Map<Path, StorageIndexManager>
4845

4946
@Throws(IOException::class)
5047
fun newFileCacheStore(accountant: Accountant): FileCacheStore

src/main/java/org/radarbase/output/accounting/AccountantImpl.kt

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,10 @@ package org.radarbase.output.accounting
22

33
import kotlinx.coroutines.CoroutineScope
44
import org.radarbase.output.FileStoreFactory
5-
import org.radarbase.output.config.RestructureConfig
6-
import org.radarbase.output.target.TargetStorage
75
import org.radarbase.output.util.Timer
86
import org.slf4j.LoggerFactory
97
import java.io.IOException
108
import java.nio.file.Paths
11-
import kotlin.io.path.deleteExisting
12-
import kotlin.io.path.exists
139

1410
open class AccountantImpl(
1511
private val factory: FileStoreFactory,
@@ -27,29 +23,6 @@ open class AccountantImpl(
2723

2824
val offsets = offsetPersistence.read(offsetsKey)
2925
offsetFile = offsetPersistence.writer(scope, offsetsKey, offsets)
30-
readDeprecatedOffsets(factory.config, factory.targetStorage, topic)
31-
?.takeUnless { it.isEmpty }
32-
?.let {
33-
offsetFile.addAll(it)
34-
offsetFile.triggerWrite()
35-
}
36-
}
37-
38-
private suspend fun readDeprecatedOffsets(
39-
config: RestructureConfig,
40-
targetStorage: TargetStorage,
41-
topic: String,
42-
): OffsetRangeSet? {
43-
val offsetsPath = config.paths.output
44-
.resolve(OFFSETS_FILE_NAME)
45-
.resolve("$topic.csv")
46-
47-
return if (offsetsPath.exists()) {
48-
OffsetFilePersistence(targetStorage).read(offsetsPath)
49-
.also { offsetsPath.deleteExisting() }
50-
} else {
51-
null
52-
}
5326
}
5427

5528
override suspend fun remove(range: TopicPartitionOffsetRange) =

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import kotlinx.coroutines.launch
66
import kotlinx.coroutines.sync.Mutex
77
import kotlinx.coroutines.sync.withPermit
88
import kotlinx.coroutines.withContext
9+
import org.radarbase.kotlin.coroutines.launchJoin
910
import org.radarbase.output.Application.Companion.format
1011
import org.radarbase.output.FileStoreFactory
1112
import org.radarbase.output.accounting.Accountant
1213
import org.radarbase.output.accounting.AccountantImpl
1314
import org.radarbase.output.config.RestructureConfig
14-
import org.radarbase.output.source.StorageIndex
15-
import org.radarbase.output.source.StorageNode
15+
import org.radarbase.output.source.SourceStorageManager
1616
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
1717
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
1818
import org.radarbase.output.util.Timer
@@ -21,17 +21,17 @@ import org.slf4j.LoggerFactory
2121
import java.io.Closeable
2222
import java.io.IOException
2323
import java.nio.file.Path
24-
import java.nio.file.Paths
2524
import java.time.Instant
2625
import java.time.temporal.ChronoUnit
2726
import java.util.concurrent.atomic.LongAdder
2827
import kotlin.coroutines.coroutineContext
2928

3029
class SourceDataCleaner(
3130
private val fileStoreFactory: FileStoreFactory,
31+
private val sourceStorageManager: SourceStorageManager,
3232
) : Closeable {
33+
private val sourceStorage = sourceStorageManager.sourceStorage
3334
private val lockManager = fileStoreFactory.remoteLockManager
34-
private val sourceStorage = fileStoreFactory.sourceStorage
3535
private val excludeTopics: Set<String> = fileStoreFactory.config.topics
3636
.mapNotNullTo(HashSet()) { (topic, conf) ->
3737
topic.takeIf { conf.excludeFromDelete }
@@ -45,11 +45,9 @@ class SourceDataCleaner(
4545
private val supervisor = SupervisorJob()
4646

4747
@Throws(IOException::class, InterruptedException::class)
48-
suspend fun process(storageIndex: StorageIndex, directoryName: String) {
48+
suspend fun process() {
4949
// Get files and directories
50-
val absolutePath = Paths.get(directoryName)
51-
52-
val paths = topicPaths(storageIndex, absolutePath)
50+
val paths = topicPaths(sourceStorage.root)
5351

5452
logger.info("{} topics found", paths.size)
5553

@@ -58,7 +56,7 @@ class SourceDataCleaner(
5856
launch {
5957
try {
6058
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
61-
mapTopic(storageIndex, p)
59+
mapTopic(p)
6260
}
6361
if (deleteCount > 0) {
6462
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
@@ -72,7 +70,7 @@ class SourceDataCleaner(
7270
}
7371
}
7472

75-
private suspend fun mapTopic(storageIndex: StorageIndex, topicPath: Path): Long {
73+
private suspend fun mapTopic(topicPath: Path): Long {
7674
val topic = topicPath.fileName.toString()
7775
return try {
7876
lockManager.tryWithLock(topic) {
@@ -86,7 +84,7 @@ class SourceDataCleaner(
8684
fileStoreFactory,
8785
)
8886
}
89-
deleteOldFiles(storageIndex, accountant, extractionCheck, topic, topicPath).toLong()
87+
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
9088
}
9189
}
9290
}
@@ -97,15 +95,14 @@ class SourceDataCleaner(
9795
}
9896

9997
private suspend fun deleteOldFiles(
100-
storageIndex: StorageIndex,
10198
accountant: Accountant,
10299
extractionCheck: ExtractionCheck,
103100
topic: String,
104101
topicPath: Path,
105102
): Int {
106103
val offsets = accountant.offsets.copyForTopic(topic)
107104

108-
val paths = sourceStorage.listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f ->
105+
val paths = sourceStorageManager.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f ->
109106
f.lastModified.isBefore(deleteThreshold) &&
110107
// ensure that there is a file with a larger offset also
111108
// processed, so the largest offset is never removed.
@@ -117,8 +114,7 @@ class SourceDataCleaner(
117114
if (extractionCheck.isExtracted(file)) {
118115
logger.info("Removing {}", file.path)
119116
Timer.time("cleaner.delete") {
120-
sourceStorage.delete(file.path)
121-
storageIndex.remove(StorageNode.StorageFile(file.path, Instant.MIN))
117+
sourceStorageManager.delete(file.path)
122118
}
123119
true
124120
} else {
@@ -131,8 +127,8 @@ class SourceDataCleaner(
131127
}
132128
}
133129

134-
private suspend fun topicPaths(storageIndex: StorageIndex, path: Path): List<Path> =
135-
sourceStorage.listTopics(storageIndex, path, excludeTopics)
130+
private suspend fun topicPaths(path: Path): List<Path> =
131+
sourceStorageManager.listTopics(path, excludeTopics)
136132
// different services start on different topics to decrease lock contention
137133
.shuffled()
138134

@@ -149,14 +145,14 @@ class SourceDataCleaner(
149145
null
150146
}
151147

152-
private suspend fun runCleaner(factory: FileStoreFactory) {
153-
SourceDataCleaner(factory).useSuspended { cleaner ->
154-
for ((input, indexManager) in factory.storageIndexManagers) {
155-
indexManager.update()
156-
logger.info("Cleaning {}", input)
157-
cleaner.process(indexManager.storageIndex, input.toString())
148+
private suspend fun runCleaner(factory: FileStoreFactory) = coroutineScope {
149+
factory.sourceStorage.launchJoin { sourceStorage ->
150+
SourceDataCleaner(factory, sourceStorage).useSuspended { cleaner ->
151+
sourceStorage.storageIndexManager.update()
152+
logger.info("Cleaning {}", sourceStorage.sourceStorage.root)
153+
cleaner.process()
154+
logger.info("Cleaned up {} files", cleaner.deletedFileCount.format())
158155
}
159-
logger.info("Cleaned up {} files", cleaner.deletedFileCount.format())
160156
}
161157
}
162158
}

src/main/java/org/radarbase/output/config/CommandLineArgs.kt

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import com.beust.jcommander.validators.PositiveInteger
2121
import org.radarbase.output.config.RestructureConfig.Companion.RESTRUCTURE_CONFIG_FILE_NAME
2222

2323
class CommandLineArgs {
24-
@Parameter(description = "<input_path_1> [<input_path_2> ...]", variableArity = true)
25-
var inputPaths: List<String>? = null
26-
2724
@Parameter(
2825
names = ["-F", "--config-file"],
2926
description = "Config file. By default, $RESTRUCTURE_CONFIG_FILE_NAME is tried.",
@@ -65,13 +62,6 @@ class CommandLineArgs {
6562
)
6663
var hdfsName: String? = null
6764

68-
@Parameter(
69-
names = ["-o", "--output-directory"],
70-
description = "The output folder where the files are to be extracted.",
71-
validateWith = [NonEmptyValidator::class],
72-
)
73-
var outputDirectory: String? = null
74-
7565
@Parameter(
7666
names = ["-h", "--help"],
7767
help = true,

0 commit comments

Comments
 (0)