@@ -15,7 +15,7 @@ import org.radarbase.output.config.RestructureConfig.Companion.copyEnv
15
15
import org.radarbase.output.config.RestructureConfig.Companion.copyOnChange
16
16
import org.radarbase.output.format.FormatFactory
17
17
import org.radarbase.output.format.RecordConverterFactory
18
- import org.radarbase.output.path.ObservationKeyPathFactory
18
+ import org.radarbase.output.path.FormattedPathFactory
19
19
import org.radarbase.output.path.RecordPathFactory
20
20
import org.slf4j.LoggerFactory
21
21
import java.net.URI
@@ -119,58 +119,58 @@ data class RedisConfig(
119
119
}
120
120
121
121
data class ServiceConfig (
122
- /* * Whether to enable the service mode of this application. */
123
- val enable : Boolean ,
124
- /* * Polling interval in seconds. */
125
- val interval : Long = 300L ,
126
- /* * Age in days after an avro file can be removed. Ignored if not strictly positive. */
127
- val deleteAfterDays : Int = -1 ) {
128
-
122
+ /* * Whether to enable the service mode of this application. */
123
+ val enable : Boolean ,
124
+ /* * Polling interval in seconds. */
125
+ val interval : Long = 300L ,
126
+ /* * Age in days after an avro file can be removed. Ignored if not strictly positive. */
127
+ val deleteAfterDays : Int = -1 ,
128
+ ) {
129
129
fun validate () {
130
130
check(interval > 0 ) { " Cleaner interval must be strictly positive" }
131
131
}
132
132
}
133
133
134
134
data class CleanerConfig (
135
- /* * Whether to enable the cleaner. */
136
- val enable : Boolean = false ,
137
- /* * How often to run the cleaner in seconds. */
138
- val interval : Long = 1260L ,
139
- /* * Age in days after an avro file can be removed. Must be strictly positive. */
140
- val age : Int = 7 ) {
141
-
135
+ /* * Whether to enable the cleaner. */
136
+ val enable : Boolean = false ,
137
+ /* * How often to run the cleaner in seconds. */
138
+ val interval : Long = 1260L ,
139
+ /* * Age in days after an avro file can be removed. Must be strictly positive. */
140
+ val age : Int = 7 ,
141
+ ) {
142
142
fun validate () {
143
143
check(age > 0 ) { " Cleaner file age must be strictly positive" }
144
144
check(interval > 0 ) { " Cleaner interval must be strictly positive" }
145
145
}
146
146
}
147
147
148
148
data class WorkerConfig (
149
- /* * Whether to enable restructuring */
150
- val enable : Boolean = true ,
151
- /* * Number of threads to use for processing files. */
152
- val numThreads : Int = 1 ,
153
- /* *
154
- * Maximum number of files to process for a given topic. Limit this to ensure that a single
155
- * processing iteration including lock takes a limited amount of time.
156
- */
157
- val maxFilesPerTopic : Int? = null ,
158
- /* *
159
- * Number of files to simultaneously keep in cache, including open writer. A higher size will
160
- * decrease overhead but increase memory usage and open file descriptors.
161
- */
162
- val cacheSize : Int = CACHE_SIZE_DEFAULT ,
163
- /* *
164
- * Number of offsets to simultaneously keep in cache. A higher size will
165
- * decrease overhead but increase memory usage.
166
- */
167
- val cacheOffsetsSize : Long = 500_000 ,
168
- /* *
169
- * Minimum time since the file was last modified in seconds. Avoids
170
- * synchronization issues that may occur in a source file that is being
171
- * appended to.
172
- */
173
- val minimumFileAge : Long = 60
149
+ /* * Whether to enable restructuring */
150
+ val enable : Boolean = true ,
151
+ /* * Number of threads to use for processing files. */
152
+ val numThreads : Int = 1 ,
153
+ /* *
154
+ * Maximum number of files to process for a given topic. Limit this to ensure that a single
155
+ * processing iteration including lock takes a limited amount of time.
156
+ */
157
+ val maxFilesPerTopic : Int? = null ,
158
+ /* *
159
+ * Number of files to simultaneously keep in cache, including open writer. A higher size will
160
+ * decrease overhead but increase memory usage and open file descriptors.
161
+ */
162
+ val cacheSize : Int = CACHE_SIZE_DEFAULT ,
163
+ /* *
164
+ * Number of offsets to simultaneously keep in cache. A higher size will
165
+ * decrease overhead but increase memory usage.
166
+ */
167
+ val cacheOffsetsSize : Long = 500_000 ,
168
+ /* *
169
+ * Minimum time since the file was last modified in seconds. Avoids
170
+ * synchronization issues that may occur in a source file that is being
171
+ * appended to.
172
+ */
173
+ val minimumFileAge : Long = 60 ,
174
174
) {
175
175
init {
176
176
check(cacheSize >= 1 ) { " Maximum files per topic must be strictly positive" }
@@ -187,23 +187,23 @@ interface PluginConfig {
187
187
}
188
188
189
189
data class PathConfig (
190
- override val factory : String = ObservationKeyPathFactory : :class.qualifiedName!!,
191
- override val properties : Map <String , String > = emptyMap(),
192
- /* * Input paths referencing the source resource. */
193
- val inputs : List <Path > = emptyList(),
194
- /* * Temporary directory for processing output files before uploading. */
195
- val temp : Path = Files .createTempDirectory("radar-output-restructure"),
196
- /* * Output path on the target resource. */
197
- val output : Path = Paths .get("output")
190
+ override val factory : String = FormattedPathFactory : :class.qualifiedName!!,
191
+ override val properties : Map <String , String > = emptyMap(),
192
+ /* * Input paths referencing the source resource. */
193
+ val inputs : List <Path > = emptyList(),
194
+ /* * Temporary directory for processing output files before uploading. */
195
+ val temp : Path = Files .createTempDirectory("radar-output-restructure"),
196
+ /* * Output path on the target resource. */
197
+ val output : Path = Paths .get("output"),
198
198
) : PluginConfig {
199
199
fun createFactory (): RecordPathFactory = factory.toPluginInstance(properties)
200
200
}
201
201
202
202
data class CompressionConfig (
203
- override val factory : String = CompressionFactory : :class.qualifiedName!!,
204
- override val properties : Map <String , String > = emptyMap(),
205
- /* * Compression type. Currently one of gzip, zip or none. */
206
- val type : String = " none"
203
+ override val factory : String = CompressionFactory : :class.qualifiedName!!,
204
+ override val properties : Map <String , String > = emptyMap(),
205
+ /* * Compression type. Currently one of gzip, zip or none. */
206
+ val type : String = " none" ,
207
207
) : PluginConfig {
208
208
fun createFactory (): CompressionFactory = factory.toPluginInstance(properties)
209
209
fun createCompression (): Compression = createFactory()[type]
@@ -231,15 +231,16 @@ private inline fun <reified T: Plugin> String.toPluginInstance(properties: Map<S
231
231
}
232
232
233
233
data class TopicConfig (
234
- /* * Topic-specific deduplication handling. */
235
- val deduplication : DeduplicationConfig = DeduplicationConfig (),
236
- /* * Whether to exclude the topic from being processed. */
237
- val exclude : Boolean = false ,
238
- /* *
239
- * Whether to exclude the topic from being deleted, if this configuration has been set
240
- * in the service.
241
- */
242
- val excludeFromDelete : Boolean = false ) {
234
+ /* * Topic-specific deduplication handling. */
235
+ val deduplication : DeduplicationConfig = DeduplicationConfig (),
236
+ /* * Whether to exclude the topic from being processed. */
237
+ val exclude : Boolean = false ,
238
+ /* *
239
+ * Whether to exclude the topic from being deleted, if this configuration has been set
240
+ * in the service.
241
+ */
242
+ val excludeFromDelete : Boolean = false ,
243
+ ) {
243
244
fun deduplication (deduplicationDefault : DeduplicationConfig ): DeduplicationConfig = deduplication
244
245
.withDefaults(deduplicationDefault)
245
246
}
@@ -264,10 +265,11 @@ data class DeduplicationConfig(
264
265
}
265
266
266
267
data class HdfsConfig (
267
- /* * HDFS name nodes to use. */
268
- val nameNodes : List <String > = emptyList(),
269
- /* * Additional HDFS configuration parameters. */
270
- val properties : Map <String , String > = emptyMap()) {
268
+ /* * HDFS name nodes to use. */
269
+ val nameNodes : List <String > = emptyList(),
270
+ /* * Additional HDFS configuration parameters. */
271
+ val properties : Map <String , String > = emptyMap(),
272
+ ) {
271
273
272
274
val configuration: Configuration = Configuration ()
273
275
@@ -296,20 +298,20 @@ data class HdfsConfig(
296
298
}
297
299
298
300
data class ResourceConfig (
299
- /* * Resource type. One of s3, hdfs or local. */
300
- val type : String ,
301
- val s3 : S3Config ? = null ,
302
- val hdfs : HdfsConfig ? = null ,
303
- val local : LocalConfig ? = null ,
304
- val azure : AzureConfig ? = null ) {
305
-
301
+ /* * Resource type. One of s3, hdfs or local. */
302
+ val type : String ,
303
+ val s3 : S3Config ? = null ,
304
+ val hdfs : HdfsConfig ? = null ,
305
+ val local : LocalConfig ? = null ,
306
+ val azure : AzureConfig ? = null ,
307
+ ) {
306
308
@JsonIgnore
307
309
lateinit var sourceType: ResourceType
308
310
309
311
fun validate () {
310
312
sourceType = type.toResourceType()
311
313
312
- when (sourceType) {
314
+ when (sourceType) {
313
315
ResourceType .S3 -> checkNotNull(s3) { " No S3 configuration provided." }
314
316
ResourceType .HDFS -> checkNotNull(hdfs) { " No HDFS configuration provided." }.also { it.validate() }
315
317
ResourceType .LOCAL -> checkNotNull(local) { " No local configuration provided." }
@@ -357,9 +359,9 @@ data class S3Config(
357
359
val endOffsetFromTags : Boolean = false ,
358
360
) {
359
361
fun createS3Client (): MinioClient = MinioClient .Builder ()
360
- .endpoint(endpoint)
361
- .credentials(accessToken, secretKey)
362
- .build()
362
+ .endpoint(endpoint)
363
+ .credentials(accessToken, secretKey)
364
+ .build()
363
365
364
366
fun withEnv (prefix : String ): S3Config = this
365
367
.copyEnv(" ${prefix} S3_ACCESS_TOKEN" ) { copy(accessToken = it) }
0 commit comments