Skip to content

Commit f6ad635

Browse files
authored
Merge pull request #552 from RADAR-base/bugfix-pathconfig
Configuration bug
2 parents 54bd1cb + cbf509b commit f6ad635

File tree

4 files changed

+105
-4
lines changed

4 files changed

+105
-4
lines changed

src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,39 @@ import java.nio.charset.StandardCharsets.UTF_8
3131
import java.nio.file.Paths
3232

3333
class RestructureS3IntegrationTest {
34+
@Test
35+
fun configuration() = runTest {
36+
Timer.isEnabled = true
37+
val sourceConfig = S3Config(
38+
endpoint = "http://localhost:9000",
39+
accessToken = "minioadmin",
40+
secretKey = "minioadmin",
41+
bucket = "source",
42+
)
43+
val targetConfig = sourceConfig.copy(bucket = "target")
44+
val topicConfig = mapOf(
45+
"application_server_status" to TopicConfig(
46+
pathProperties = PathFormatterConfig(
47+
format = "\${projectId}/\${userId}/\${topic}/\${value:serverStatus}/\${filename}",
48+
),
49+
),
50+
)
51+
val config = RestructureConfig(
52+
source = ResourceConfig("s3", s3 = sourceConfig),
53+
target = ResourceConfig("s3", s3 = targetConfig),
54+
paths = PathConfig(
55+
inputs = listOf(Paths.get("in")),
56+
// These properties were added to verify that they are present later in PathConfig.createFactory()
57+
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
58+
),
59+
worker = WorkerConfig(minimumFileAge = 0L),
60+
topics = topicConfig,
61+
)
62+
val application = Application(config)
63+
64+
assertEquals(4, application.pathFactory.pathConfig.path.properties.count())
65+
}
66+
3467
@Test
3568
fun integration() = runTest {
3669
Timer.isEnabled = true
@@ -51,11 +84,18 @@ class RestructureS3IntegrationTest {
5184
val config = RestructureConfig(
5285
source = ResourceConfig("s3", s3 = sourceConfig),
5386
target = ResourceConfig("s3", s3 = targetConfig),
54-
paths = PathConfig(inputs = listOf(Paths.get("in"))),
87+
paths = PathConfig(
88+
inputs = listOf(Paths.get("in")),
89+
// These properties were added to verify that they are present later in PathConfig.createFactory()
90+
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
91+
),
5592
worker = WorkerConfig(minimumFileAge = 0L),
5693
topics = topicConfig,
5794
)
5895
val application = Application(config)
96+
97+
assertEquals(4, application.pathFactory.pathConfig.path.properties.count())
98+
5999
val sourceClient = sourceConfig.createS3Client()
60100
val sourceBucket = requireNotNull(sourceConfig.bucket)
61101
if (!sourceClient.bucketExists(BucketExistsArgs.builder().bucketBuild(sourceBucket))) {

src/main/java/org/radarbase/output/config/PathConfig.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,19 @@ data class PathConfig(
4545
else -> null
4646
}
4747

48+
// Pass any properties from the given PathConfig to the PathFormatterConfig for the factory.
49+
// Properties passed in the PathConfig.path.properties take precedent
50+
val pathProperties = buildMap {
51+
putAll(path.properties)
52+
putAll(properties)
53+
}
54+
55+
val pathFormatterConfig = path.copy(properties = pathProperties)
56+
val pathConfig = copy(bucket = bucketConfig, path = pathFormatterConfig)
57+
4858
pathFactory.init(
4959
extension = extension,
50-
config = copy(bucket = bucketConfig),
60+
config = pathConfig,
5161
topics = topics,
5262
)
5363

src/main/java/org/radarbase/output/path/RecordPathFactory.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder
2424
import org.radarbase.output.config.PathConfig
2525
import org.radarbase.output.config.TopicConfig
2626
import org.radarbase.output.util.TimeUtil
27+
import org.slf4j.LoggerFactory
2728
import java.nio.file.Path
2829
import java.nio.file.Paths
2930
import java.util.regex.Pattern
@@ -44,12 +45,13 @@ abstract class RecordPathFactory {
4445
config.output
4546
},
4647
path = config.path.copy(
47-
properties = buildMap(config.path.properties.size + 1) {
48+
properties = buildMap {
4849
putAll(config.path.properties)
4950
putIfAbsent("extension", extension)
5051
},
5152
),
5253
)
54+
5355
this.addTopicConfiguration(topics)
5456
}
5557

@@ -67,7 +69,8 @@ abstract class RecordPathFactory {
6769
attempt: Int,
6870
): Path {
6971
val keyField = requireNotNull(record.get("key")) { "Failed to process $record; no key present" }
70-
val valueField = requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }
72+
val valueField =
73+
requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }
7174

7275
val keyRecord: GenericRecord = if (keyField is GenericRecord) {
7376
keyField
@@ -146,5 +149,7 @@ abstract class RecordPathFactory {
146149
?.let { get(it.pos()) }
147150
}
148151

152+
private val logger = LoggerFactory.getLogger(RecordPathFactory::class.java)
153+
149154
protected open fun addTopicConfiguration(topicConfig: Map<String, TopicConfig>) = Unit
150155
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.radarbase.output.path
2+
3+
import org.junit.jupiter.api.Assertions.assertEquals
4+
import org.junit.jupiter.api.Test
5+
import org.radarbase.output.config.PathConfig
6+
import org.radarbase.output.config.PathFormatterConfig
7+
import org.radarbase.output.config.ResourceConfig
8+
import org.radarbase.output.config.S3Config
9+
10+
internal class RecordPathFactoryTest {
11+
12+
@Test
13+
fun testInit() {
14+
var properties = mapOf("key1" to "value1", "key2" to "value2")
15+
16+
val pathConfig = PathConfig(
17+
factory = "org.radarbase.output.path.FormattedPathFactory",
18+
properties = properties,
19+
path = PathFormatterConfig(
20+
format = "\${topic}/\${projectId}/\${userId}/\${sourceId}/\${filename}",
21+
plugins = "fixed",
22+
),
23+
)
24+
25+
val targetConfig = S3Config(
26+
endpoint = "http://localhost:9000",
27+
accessToken = "minioadmin",
28+
secretKey = "minioadmin",
29+
bucket = "target",
30+
)
31+
32+
val factory = pathConfig.createFactory(
33+
ResourceConfig("s3", s3 = targetConfig),
34+
"test-extension",
35+
topics = mapOf(),
36+
)
37+
38+
properties = buildMap {
39+
putAll(properties)
40+
putIfAbsent("extension", "test-extension")
41+
}
42+
43+
assertEquals(properties, factory.pathConfig.path.properties)
44+
assertEquals(properties, factory.pathConfig.path.properties)
45+
}
46+
}

0 commit comments

Comments
 (0)