@@ -11,6 +11,8 @@ import org.radarbase.output.Application.Companion.CACHE_SIZE_DEFAULT
11
11
import org.radarbase.output.Plugin
12
12
import org.radarbase.output.compression.Compression
13
13
import org.radarbase.output.compression.CompressionFactory
14
+ import org.radarbase.output.config.RestructureConfig.Companion.copyEnv
15
+ import org.radarbase.output.config.RestructureConfig.Companion.copyOnChange
14
16
import org.radarbase.output.format.FormatFactory
15
17
import org.radarbase.output.format.RecordConverterFactory
16
18
import org.radarbase.output.path.ObservationKeyPathFactory
@@ -22,26 +24,27 @@ import java.nio.file.Path
22
24
import java.nio.file.Paths
23
25
24
26
data class RestructureConfig (
25
- /* * Whether and how to run as a service. */
26
- val service : ServiceConfig = ServiceConfig (enable = false),
27
- /* * Cleaner of old files. */
28
- val cleaner : CleanerConfig = CleanerConfig (),
29
- /* * Work limits. */
30
- val worker : WorkerConfig = WorkerConfig (),
31
- /* * Topic exceptional handling. */
32
- val topics : Map <String , TopicConfig > = emptyMap(),
33
- /* * Source data resource configuration. */
34
- val source : ResourceConfig = ResourceConfig ("s3"),
35
- /* * Target data resource configration. */
36
- val target : ResourceConfig = ResourceConfig ("local", local = LocalConfig ()),
37
- /* * Redis configuration for synchronization and storing offsets. */
38
- val redis : RedisConfig = RedisConfig (),
39
- /* * Paths to use for processing. */
40
- val paths : PathConfig = PathConfig (),
41
- /* * File compression to use for output files. */
42
- val compression : CompressionConfig = CompressionConfig (),
43
- /* * File format to use for output files. */
44
- val format : FormatConfig = FormatConfig ()) {
27
+ /* * Whether and how to run as a service. */
28
+ val service : ServiceConfig = ServiceConfig (enable = false),
29
+ /* * Cleaner of old files. */
30
+ val cleaner : CleanerConfig = CleanerConfig (),
31
+ /* * Work limits. */
32
+ val worker : WorkerConfig = WorkerConfig (),
33
+ /* * Topic exceptional handling. */
34
+ val topics : Map <String , TopicConfig > = emptyMap(),
35
+ /* * Source data resource configuration. */
36
+ val source : ResourceConfig = ResourceConfig ("s3"),
37
+ /* * Target data resource configration. */
38
+ val target : ResourceConfig = ResourceConfig ("local", local = LocalConfig ()),
39
+ /* * Redis configuration for synchronization and storing offsets. */
40
+ val redis : RedisConfig = RedisConfig (),
41
+ /* * Paths to use for processing. */
42
+ val paths : PathConfig = PathConfig (),
43
+ /* * File compression to use for output files. */
44
+ val compression : CompressionConfig = CompressionConfig (),
45
+ /* * File format to use for output files. */
46
+ val format : FormatConfig = FormatConfig (),
47
+ ) {
45
48
46
49
fun validate () {
47
50
source.validate()
@@ -69,6 +72,11 @@ data class RestructureConfig(
69
72
args.noRestructure?.let { copy(worker = worker.copy(enable = ! it)) }
70
73
}
71
74
75
+ fun withEnv (): RestructureConfig = this
76
+ .copyOnChange(source, { it.withEnv(" SOURCE_" ) }) { copy(source = it) }
77
+ .copyOnChange(target, { it.withEnv(" TARGET_" ) }) { copy(target = it) }
78
+ .copyOnChange(redis, { it.withEnv() }) { copy(redis = it) }
79
+
72
80
companion object {
73
81
fun load (path : String? ): RestructureConfig = YAMLConfigLoader .load(path, RESTRUCTURE_CONFIG_FILE_NAME ) {
74
82
logger.info(" No config file found. Using default configuration." )
@@ -77,21 +85,38 @@ data class RestructureConfig(
77
85
78
86
private val logger = LoggerFactory .getLogger(RestructureConfig ::class .java)
79
87
internal const val RESTRUCTURE_CONFIG_FILE_NAME = " restructure.yml"
88
+
89
+ inline fun <T > T.copyEnv (key : String , doCopy : T .(String ) -> T ): T = copyOnChange<T , String ?>(
90
+ null ,
91
+ modification = { System .getenv(key) },
92
+ doCopy = { doCopy(requireNotNull(it)) }
93
+ )
94
+
95
+ inline fun <T , V > T.copyOnChange (original : V , modification : (V ) -> V , doCopy : T .(V ) -> T ): T {
96
+ val newValue = modification(original)
97
+ return if (newValue != original) {
98
+ doCopy(newValue)
99
+ } else this
100
+ }
80
101
}
81
102
}
82
103
83
104
/* * Redis configuration. */
84
105
data class RedisConfig (
85
- /* *
86
- * Full Redis URI. The protocol should be redis for plain text and rediss for TLS. It
87
- * should contain at least a hostname and port, but it may also include username and
88
- * password.
89
- */
90
- val uri : URI = URI .create("redis://localhost:6379"),
91
- /* *
92
- * Prefix to use for creating a lock of a topic.
93
- */
94
- val lockPrefix : String = " radar-output/lock" )
106
+ /* *
107
+ * Full Redis URI. The protocol should be redis for plain text and rediss for TLS. It
108
+ * should contain at least a hostname and port, but it may also include username and
109
+ * password.
110
+ */
111
+ val uri : URI = URI .create("redis://localhost:6379"),
112
+ /* *
113
+ * Prefix to use for creating a lock of a topic.
114
+ */
115
+ val lockPrefix : String = " radar-output/lock" ,
116
+ ) {
117
+ fun withEnv (): RedisConfig = this
118
+ .copyEnv(" REDIS_URI" ) { copy(uri = URI .create(it)) }
119
+ }
95
120
96
121
data class ServiceConfig (
97
122
/* * Whether to enable the service mode of this application. */
@@ -207,35 +232,36 @@ private inline fun <reified T: Plugin> String.toPluginInstance(properties: Map<S
207
232
208
233
data class TopicConfig (
209
234
/* * Topic-specific deduplication handling. */
210
- val deduplication : DeduplicationConfig ? = null ,
235
+ val deduplication : DeduplicationConfig = DeduplicationConfig () ,
211
236
/* * Whether to exclude the topic from being processed. */
212
237
val exclude : Boolean = false ,
213
238
/* *
214
239
* Whether to exclude the topic from being deleted, if this configuration has been set
215
240
* in the service.
216
241
*/
217
242
val excludeFromDelete : Boolean = false ) {
218
- fun deduplication (deduplicationDefault : DeduplicationConfig ): DeduplicationConfig {
219
- return deduplication
220
- ?.run { if (enable == null ) copy(enable = deduplicationDefault.enable) else this }
221
- ?.run { if (distinctFields == null ) copy(distinctFields = deduplicationDefault.distinctFields) else this }
222
- ?.run { if (ignoreFields == null ) copy(distinctFields = deduplicationDefault.ignoreFields) else this }
223
- ? : deduplicationDefault
224
- }
243
+ fun deduplication (deduplicationDefault : DeduplicationConfig ): DeduplicationConfig = deduplication
244
+ .withDefaults(deduplicationDefault)
225
245
}
226
246
227
247
data class DeduplicationConfig (
228
- /* * Whether to enable deduplication. */
229
- val enable : Boolean? = null ,
230
- /* *
231
- * Only deduplicate using given fields. Fields not specified here are ignored
232
- * for determining duplication.
233
- */
234
- val distinctFields : Set <String >? = null ,
235
- /* *
236
- * Ignore given fields for determining whether a row is identical to another.
237
- */
238
- val ignoreFields : Set <String >? = null )
248
+ /* * Whether to enable deduplication. */
249
+ val enable : Boolean? = null ,
250
+ /* *
251
+ * Only deduplicate using given fields. Fields not specified here are ignored
252
+ * for determining duplication.
253
+ */
254
+ val distinctFields : Set <String >? = null ,
255
+ /* *
256
+ * Ignore given fields for determining whether a row is identical to another.
257
+ */
258
+ val ignoreFields : Set <String >? = null ,
259
+ ) {
260
+ fun withDefaults (deduplicationDefaults : DeduplicationConfig ): DeduplicationConfig = deduplicationDefaults
261
+ .copyOnChange<DeduplicationConfig , Boolean ?>(null , { enable }) { copy(enable = it) }
262
+ .copyOnChange<DeduplicationConfig , Set <String >? > (null , { distinctFields }) { copy(distinctFields = it) }
263
+ .copyOnChange<DeduplicationConfig , Set <String >? > (null , { ignoreFields }) { copy(ignoreFields = it) }
264
+ }
239
265
240
266
data class HdfsConfig (
241
267
/* * HDFS name nodes to use. */
@@ -290,6 +316,13 @@ data class ResourceConfig(
290
316
ResourceType .AZURE -> checkNotNull(azure) { " No Azure configuration provided." }
291
317
}
292
318
}
319
+
320
+ fun withEnv (prefix : String ): ResourceConfig = when (sourceType) {
321
+ ResourceType .S3 -> copyOnChange(s3, { it?.withEnv(prefix) }) { copy(s3 = it) }
322
+ ResourceType .HDFS -> this
323
+ ResourceType .LOCAL -> this
324
+ ResourceType .AZURE -> copyOnChange(azure, { it?.withEnv(prefix) }) { copy(azure = it) }
325
+ }
293
326
}
294
327
295
328
enum class ResourceType {
@@ -305,46 +338,53 @@ fun String.toResourceType() = when(toLowerCase()) {
305
338
}
306
339
307
340
data class LocalConfig (
308
- /* * User ID (uid) to write data as. Only valid on Unix-based filesystems. */
309
- val userId : Int = -1 ,
310
- /* * Group ID (gid) to write data as. Only valid on Unix-based filesystems. */
311
- val groupId : Int = -1 )
341
+ /* * User ID (uid) to write data as. Only valid on Unix-based filesystems. */
342
+ val userId : Int = -1 ,
343
+ /* * Group ID (gid) to write data as. Only valid on Unix-based filesystems. */
344
+ val groupId : Int = -1 ,
345
+ )
312
346
313
347
data class S3Config (
314
- /* * URL to reach object store at. */
315
- val endpoint : String ,
316
- /* * Access token for writing data with. */
317
- val accessToken : String ,
318
- /* * Secret key belonging to access token. */
319
- val secretKey : String ,
320
- /* * Bucket name. */
321
- val bucket : String ,
322
- /* * If no endOffset is in the filename, read it from object tags. */
323
- val endOffsetFromTags : Boolean = false
348
+ /* * URL to reach object store at. */
349
+ val endpoint : String ,
350
+ /* * Access token for writing data with. */
351
+ val accessToken : String ,
352
+ /* * Secret key belonging to access token. */
353
+ val secretKey : String ,
354
+ /* * Bucket name. */
355
+ val bucket : String ,
356
+ /* * If no endOffset is in the filename, read it from object tags. */
357
+ val endOffsetFromTags : Boolean = false ,
324
358
) {
325
359
fun createS3Client (): MinioClient = MinioClient .Builder ()
326
360
.endpoint(endpoint)
327
361
.credentials(accessToken, secretKey)
328
362
.build()
363
+
364
+ fun withEnv (prefix : String ): S3Config = this
365
+ .copyEnv(" ${prefix} S3_ACCESS_TOKEN" ) { copy(accessToken = it) }
366
+ .copyEnv(" ${prefix} S3_SECRET_KEY" ) { copy(secretKey = it) }
367
+ .copyEnv(" ${prefix} S3_BUCKET" ) { copy(bucket = it) }
368
+ .copyEnv(" ${prefix} S3_ENDPOINT" ) { copy(endpoint = it) }
329
369
}
330
370
331
371
data class AzureConfig (
332
- /* * URL to reach object store at. */
333
- val endpoint : String ,
334
- /* * Name of the Azure Blob Storage container. */
335
- val container : String ,
336
- /* * If no endOffset is in the filename, read it from object metadata. */
337
- val endOffsetFromMetadata : Boolean = false ,
338
- /* * Azure username. */
339
- val username : String? ,
340
- /* * Azure password. */
341
- val password : String? ,
342
- /* * Shared Azure Blob Storage account name. */
343
- val accountName : String? ,
344
- /* * Shared Azure Blob Storage account key. */
345
- val accountKey : String? ,
346
- /* * Azure SAS token for a configured service. */
347
- val sasToken : String?
372
+ /* * URL to reach object store at. */
373
+ val endpoint : String ,
374
+ /* * Name of the Azure Blob Storage container. */
375
+ val container : String ,
376
+ /* * If no endOffset is in the filename, read it from object metadata. */
377
+ val endOffsetFromMetadata : Boolean = false ,
378
+ /* * Azure username. */
379
+ val username : String? ,
380
+ /* * Azure password. */
381
+ val password : String? ,
382
+ /* * Shared Azure Blob Storage account name. */
383
+ val accountName : String? ,
384
+ /* * Shared Azure Blob Storage account key. */
385
+ val accountKey : String? ,
386
+ /* * Azure SAS token for a configured service. */
387
+ val sasToken : String? ,
348
388
) {
349
389
fun createAzureClient (): BlobServiceClient = BlobServiceClientBuilder ().apply {
350
390
endpoint(endpoint)
@@ -356,6 +396,13 @@ data class AzureConfig(
356
396
}
357
397
}.buildClient()
358
398
399
+ fun withEnv (prefix : String ): AzureConfig = this
400
+ .copyEnv(" ${prefix} AZURE_USERNAME" ) { copy(username = it) }
401
+ .copyEnv(" ${prefix} AZURE_PASSWORD" ) { copy(password = it) }
402
+ .copyEnv(" ${prefix} AZURE_ACCOUNT_NAME" ) { copy(accountName = it) }
403
+ .copyEnv(" ${prefix} AZURE_ACCOUNT_KEY" ) { copy(accountKey = it) }
404
+ .copyEnv(" ${prefix} AZURE_SAS_TOKEN" ) { copy(sasToken = it) }
405
+
359
406
companion object {
360
407
private val logger = LoggerFactory .getLogger(AzureConfig ::class .java)
361
408
}
0 commit comments