Skip to content

Commit a098528

Browse files
committed
Put all config in separate files and introduce ResourceContext
1 parent 602cd96 commit a098528

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+799
-599
lines changed

src/main/java/org/radarbase/output/Application.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ import org.radarbase.output.worker.RadarKafkaRestructure
3636
import org.slf4j.LoggerFactory
3737
import redis.clients.jedis.JedisPool
3838
import java.io.IOException
39-
import java.nio.file.Files
4039
import java.text.NumberFormat
4140
import java.time.LocalDateTime
4241
import java.time.format.DateTimeFormatter
4342
import java.util.concurrent.Executors
4443
import java.util.concurrent.TimeUnit
4544
import java.util.concurrent.atomic.LongAdder
4645
import kotlin.Long.Companion.MAX_VALUE
46+
import kotlin.io.path.createDirectories
4747
import kotlin.system.exitProcess
4848

4949
/** Main application. */
@@ -84,7 +84,7 @@ class Application(
8484
(config.worker.numThreads - 1).toString())
8585

8686
try {
87-
Files.createDirectories(config.paths.temp)
87+
config.paths.temp.createDirectories()
8888
} catch (ex: IOException) {
8989
logger.error("Failed to create temporary directory")
9090
return

src/main/java/org/radarbase/output/accounting/Accountant.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import org.slf4j.LoggerFactory
2323
import java.io.Closeable
2424
import java.io.Flushable
2525
import java.io.IOException
26-
import java.nio.file.Files
2726
import java.nio.file.Paths
2827
import java.time.Instant
28+
import kotlin.io.path.deleteExisting
29+
import kotlin.io.path.exists
2930

3031
open class Accountant @Throws(IOException::class)
3132
constructor(factory: FileStoreFactory, topic: String) : Flushable, Closeable {
@@ -53,9 +54,9 @@ constructor(factory: FileStoreFactory, topic: String) : Flushable, Closeable {
5354
.resolve(OFFSETS_FILE_NAME)
5455
.resolve("$topic.csv")
5556

56-
return if (Files.exists(offsetsPath)) {
57+
return if (offsetsPath.exists()) {
5758
OffsetFilePersistence(factory.targetStorage).read(offsetsPath)
58-
.also { Files.delete(offsetsPath) }
59+
.also { offsetsPath.deleteExisting() }
5960
} else null
6061
}
6162

src/main/java/org/radarbase/output/accounting/OffsetFilePersistence.kt

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import org.radarbase.output.util.PostponedWriter
2121
import org.radarbase.output.util.Timer.time
2222
import org.slf4j.LoggerFactory
2323
import java.io.IOException
24-
import java.nio.file.Files
2524
import java.nio.file.Path
2625
import java.time.Instant
2726
import java.util.concurrent.TimeUnit
2827
import java.util.regex.Pattern
28+
import kotlin.io.path.bufferedWriter
29+
import kotlin.io.path.createTempFile
2930

3031
/**
3132
* Accesses a OffsetRange file using the CSV format. On writing, this will create the file if
@@ -38,13 +39,11 @@ class OffsetFilePersistence(
3839
return try {
3940
if (targetStorage.status(path) != null) {
4041
OffsetRangeSet().also { set ->
41-
targetStorage.newBufferedReader(path).use { br ->
42-
// ignore header
43-
br.readLine() ?: return@use
44-
45-
generateSequence { br.readLine() }
46-
.map(::parseLine)
47-
.forEach(set::add)
42+
targetStorage.newBufferedReader(path).useLines { lines ->
43+
lines
44+
.drop(1) // ignore header
45+
.map(::parseLine)
46+
.forEach(set::add)
4847
}
4948
}
5049
} else null
@@ -94,9 +93,9 @@ class OffsetFilePersistence(
9493

9594
override fun doWrite() = time("accounting.offsets") {
9695
try {
97-
val tmpPath = Files.createTempFile("offsets", ".csv")
96+
val tmpPath = createTempFile("offsets", ".csv")
9897

99-
Files.newBufferedWriter(tmpPath).use { writer ->
98+
tmpPath.bufferedWriter().use { writer ->
10099
writer.append("offsetFrom,offsetTo,partition,topic\n")
101100
offsets.forEach { topicPartition, offsetIntervals ->
102101
offsetIntervals.forEach { offsetFrom, offsetTo, lastModified ->

src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.radarbase.output.cleaner
22

33
import org.radarbase.output.FileStoreFactory
44
import org.radarbase.output.accounting.Accountant
5+
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
56
import org.radarbase.output.util.Timer
67
import org.slf4j.LoggerFactory
78
import java.io.Closeable
@@ -61,10 +62,10 @@ class SourceDataCleaner(
6162

6263
return try {
6364
lockManager.tryRunLocked(topic) {
64-
Accountant(fileStoreFactory, topic).use { accountant ->
65-
TimestampExtractionCheck(sourceStorage, fileStoreFactory).use { extractionCheck ->
66-
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
67-
}
65+
resourceContext {
66+
val accountant = createResource { Accountant(fileStoreFactory, topic) }
67+
val extractionCheck = createResource { TimestampExtractionCheck(sourceStorage, fileStoreFactory) }
68+
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
6869
}
6970
}
7071
} catch (ex: IOException) {

src/main/java/org/radarbase/output/cleaner/TimestampFileCacheStore.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory
2323
import java.io.FileNotFoundException
2424
import java.io.IOException
2525
import java.nio.file.Path
26-
import java.util.*
2726

2827
/**
2928
* Caches open file handles. If more than the limit is cached, the half of the files that were used
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.radarbase.output.config
2+
3+
import com.azure.core.credential.BasicAuthenticationCredential
4+
import com.azure.storage.blob.BlobServiceClient
5+
import com.azure.storage.blob.BlobServiceClientBuilder
6+
import com.azure.storage.common.StorageSharedKeyCredential
7+
import org.radarbase.output.config.RestructureConfig.Companion.copyEnv
8+
import org.slf4j.LoggerFactory
9+
10+
data class AzureConfig(
11+
/** URL to reach object store at. */
12+
val endpoint: String,
13+
/** Name of the Azure Blob Storage container. */
14+
val container: String,
15+
/** If no endOffset is in the filename, read it from object metadata. */
16+
val endOffsetFromMetadata: Boolean = false,
17+
/** Azure username. */
18+
val username: String?,
19+
/** Azure password. */
20+
val password: String?,
21+
/** Shared Azure Blob Storage account name. */
22+
val accountName: String?,
23+
/** Shared Azure Blob Storage account key. */
24+
val accountKey: String?,
25+
/** Azure SAS token for a configured service. */
26+
val sasToken: String?,
27+
) {
28+
fun createAzureClient(): BlobServiceClient = BlobServiceClientBuilder().apply {
29+
endpoint(endpoint)
30+
when {
31+
!username.isNullOrEmpty() && !password.isNullOrEmpty() -> credential(
32+
BasicAuthenticationCredential(username, password))
33+
!accountName.isNullOrEmpty() && !accountKey.isNullOrEmpty() -> credential(
34+
StorageSharedKeyCredential(accountName, accountKey))
35+
!sasToken.isNullOrEmpty() -> sasToken(sasToken)
36+
else -> logger.warn("No Azure credentials supplied. Assuming a public blob storage.")
37+
}
38+
}.buildClient()
39+
40+
fun withEnv(prefix: String): AzureConfig = this
41+
.copyEnv("${prefix}AZURE_USERNAME") { copy(username = it) }
42+
.copyEnv("${prefix}AZURE_PASSWORD") { copy(password = it) }
43+
.copyEnv("${prefix}AZURE_ACCOUNT_NAME") { copy(accountName = it) }
44+
.copyEnv("${prefix}AZURE_ACCOUNT_KEY") { copy(accountKey = it) }
45+
.copyEnv("${prefix}AZURE_SAS_TOKEN") { copy(sasToken = it) }
46+
47+
companion object {
48+
private val logger = LoggerFactory.getLogger(AzureConfig::class.java)
49+
}
50+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.radarbase.output.config
2+
3+
data class CleanerConfig(
4+
/** Whether to enable the cleaner. */
5+
val enable: Boolean = false,
6+
/** How often to run the cleaner in seconds. */
7+
val interval: Long = 1260L,
8+
/** Age in days after an avro file can be removed. Must be strictly positive. */
9+
val age: Int = 7,
10+
/** Maximum number of files to clean in a given topic. */
11+
val maxFilesPerTopic: Int? = null,
12+
) {
13+
fun validate() {
14+
check(age > 0) { "Cleaner file age must be strictly positive" }
15+
check(interval > 0) { "Cleaner interval must be strictly positive" }
16+
if (maxFilesPerTopic != null) check(maxFilesPerTopic > 0) { "Maximum files per topic must be strictly positive" }
17+
}
18+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.radarbase.output.config
2+
3+
import org.radarbase.output.compression.Compression
4+
import org.radarbase.output.compression.CompressionFactory
5+
6+
data class CompressionConfig(
7+
override val factory: String = CompressionFactory::class.qualifiedName!!,
8+
override val properties: Map<String, String> = emptyMap(),
9+
/** Compression type. Currently one of gzip, zip or none. */
10+
val type: String = "none",
11+
) : PluginConfig {
12+
fun createFactory(): CompressionFactory = factory.toPluginInstance(properties)
13+
fun createCompression(): Compression = createFactory()[type]
14+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.radarbase.output.config
2+
3+
import org.radarbase.output.config.RestructureConfig.Companion.copyOnChange
4+
5+
data class DeduplicationConfig(
6+
/** Whether to enable deduplication. */
7+
val enable: Boolean? = null,
8+
/**
9+
* Only deduplicate using given fields. Fields not specified here are ignored
10+
* for determining duplication.
11+
*/
12+
val distinctFields: Set<String>? = null,
13+
/**
14+
* Ignore given fields for determining whether a row is identical to another.
15+
*/
16+
val ignoreFields: Set<String>? = null,
17+
) {
18+
fun withDefaults(deduplicationDefaults: DeduplicationConfig): DeduplicationConfig =
19+
deduplicationDefaults
20+
.copyOnChange<DeduplicationConfig, Boolean?>(null, { enable }) { copy(enable = it) }
21+
.copyOnChange<DeduplicationConfig, Set<String>?>(null, { distinctFields }) {
22+
copy(distinctFields = it)
23+
}
24+
.copyOnChange<DeduplicationConfig, Set<String>?>(null, { ignoreFields }) {
25+
copy(ignoreFields = it)
26+
}
27+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.radarbase.output.config
2+
3+
import org.radarbase.output.Plugin
4+
5+
internal inline fun <reified T : Plugin> String.toPluginInstance(properties: Map<String, String>): T {
6+
return try {
7+
(Class.forName(this).getConstructor().newInstance() as T)
8+
.also { it.init(properties) }
9+
} catch (ex: ReflectiveOperationException) {
10+
throw IllegalStateException("Cannot map class $this to ${T::class.java.name}")
11+
}
12+
}

0 commit comments

Comments
 (0)