Skip to content

Commit 4d3b482

Browse files
committed
Tested FormattedPathFactory
1 parent 0edebe1 commit 4d3b482

File tree

4 files changed

+138
-9
lines changed

4 files changed

+138
-9
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ ext {
2828
jedisVersion = '3.4.0'
2929
slf4jVersion = '1.7.30'
3030
azureStorageVersion = '12.9.0'
31+
radarSchemasVersion = '0.5.15'
3132
}
3233

3334
repositories {
@@ -60,7 +61,7 @@ dependencies {
6061

6162
implementation "io.minio:minio:$minioVersion"
6263
implementation("com.azure:azure-storage-blob:$azureStorageVersion")
63-
implementation 'com.opencsv:opencsv:5.0'
64+
implementation 'com.opencsv:opencsv:5.4'
6465

6566
implementation group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
6667
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoopVersion
@@ -71,6 +72,7 @@ dependencies {
7172

7273
runtimeOnly group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion
7374
runtimeOnly group: 'org.apache.hadoop', name: 'hadoop-hdfs-client', version: hadoopVersion
75+
testImplementation group: 'org.radarcns', name: 'radar-schemas-commons', version: radarSchemasVersion
7476
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: junitVersion
7577
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: junitVersion
7678
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: junitVersion

src/main/java/org/radarbase/output/path/FormattedPathFactory.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ open class FormattedPathFactory : RecordPathFactory() {
4040
val parameters = "\\$\\{([^}]*)}".toRegex()
4141
.findAll(format)
4242
.map { it.groupValues[1] }
43-
.toList()
43+
.toSet()
4444

4545
timeParameters = parameters
4646
.filter { it.startsWith("time:") }
@@ -60,10 +60,23 @@ open class FormattedPathFactory : RecordPathFactory() {
6060
" or the following: $knownParameters",
6161
)
6262
}
63+
if ("topic" !in parameters) {
64+
throw IllegalArgumentException("Path must include topic parameter.")
65+
}
66+
if ("filename" !in parameters && ("extension" !in parameters || "attempt" !in parameters)) {
67+
throw IllegalArgumentException(
68+
"Path must include filename parameter or extension and attempt parameters."
69+
)
70+
}
6371
}
6472

65-
override fun getRelativePath(topic: String, key: GenericRecord,
66-
value: GenericRecord, time: Instant?, attempt: Int): Path {
73+
override fun getRelativePath(
74+
topic: String,
75+
key: GenericRecord,
76+
value: GenericRecord,
77+
time: Instant?,
78+
attempt: Int,
79+
): Path {
6780
val attemptSuffix = if (attempt == 0) "" else "_$attempt"
6881

6982
val templatedParameters = mutableMapOf(
@@ -72,6 +85,8 @@ open class FormattedPathFactory : RecordPathFactory() {
7285
"sourceId" to sanitizeId(key.get("sourceId"), "unknown-source"),
7386
"topic" to topic,
7487
"filename" to getTimeBin(time) + attemptSuffix + extension,
88+
"attempt" to attemptSuffix,
89+
"extension" to extension,
7590
)
7691

7792
templatedParameters += if (time != null) {
@@ -100,7 +115,9 @@ open class FormattedPathFactory : RecordPathFactory() {
100115
"topic",
101116
"projectId",
102117
"userId",
103-
"sourceId"
118+
"sourceId",
119+
"attempt",
120+
"extension",
104121
)
105122

106123
private val logger = LoggerFactory.getLogger(FormattedPathFactory::class.java)

src/main/java/org/radarbase/output/path/RecordPathFactory.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ abstract class RecordPathFactory : Plugin {
5353
* paths already existed and are incompatible.
5454
* @return organization of given record
5555
*/
56-
open fun getRecordOrganization(topic: String,
57-
record: GenericRecord, attempt: Int): RecordOrganization {
56+
open fun getRecordOrganization(
57+
topic: String,
58+
record: GenericRecord,
59+
attempt: Int,
60+
): RecordOrganization {
5861
val keyField = record.get("key") as? GenericRecord
5962
val valueField = record.get("value") as? GenericRecord
6063

@@ -81,8 +84,13 @@ abstract class RecordPathFactory : Plugin {
8184
* paths already existed and are incompatible.
8285
* @return relative path corresponding to given parameters.
8386
*/
84-
abstract fun getRelativePath(topic: String, key: GenericRecord,
85-
value: GenericRecord, time: Instant?, attempt: Int): Path
87+
abstract fun getRelativePath(
88+
topic: String,
89+
key: GenericRecord,
90+
value: GenericRecord,
91+
time: Instant?,
92+
attempt: Int,
93+
): Path
8694

8795
/**
8896
* Get the category of a record, representing a partitioning for a given topic and user.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package org.radarbase.output.path
2+
3+
import org.junit.jupiter.api.Assertions.assertEquals
4+
import org.junit.jupiter.api.Test
5+
import org.junit.jupiter.api.assertThrows
6+
import org.radarcns.kafka.ObservationKey
7+
import org.radarcns.passive.phone.PhoneLight
8+
import java.nio.file.Paths
9+
import java.time.Instant
10+
11+
internal class FormattedPathFactoryTest {
12+
@Test
13+
fun testFormat() {
14+
val factory = createFactory(
15+
format = "\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${filename}"
16+
)
17+
18+
val t = Instant.parse("2021-01-02T10:05:00Z")
19+
20+
val path = factory.getRelativePath(
21+
"t",
22+
ObservationKey(
23+
"p",
24+
"u",
25+
"s",
26+
),
27+
PhoneLight(
28+
t.epochSecond.toDouble(),
29+
t.epochSecond.toDouble(),
30+
1.0f,
31+
),
32+
t,
33+
0,
34+
)
35+
36+
assertEquals(Paths.get("t/p/u/s/202101/02/20210102_1000.csv.gz"), path)
37+
}
38+
39+
@Test
40+
fun unparameterized() {
41+
val factory = FormattedPathFactory().apply {
42+
init(emptyMap())
43+
extension = ".csv.gz"
44+
}
45+
val t = Instant.parse("2021-01-02T10:05:00Z")
46+
val path = factory.getRelativePath(
47+
"t",
48+
ObservationKey(
49+
"p",
50+
"u",
51+
"s",
52+
),
53+
PhoneLight(
54+
t.epochSecond.toDouble(),
55+
t.epochSecond.toDouble(),
56+
1.0f,
57+
),
58+
t,
59+
0,
60+
)
61+
assertEquals(Paths.get("p/u/t/20210102_1000.csv.gz"), path)
62+
}
63+
64+
@Test
65+
fun testMissingTopic() {
66+
assertThrows<IllegalArgumentException> {
67+
createFactory("\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${filename}")
68+
}
69+
}
70+
71+
@Test
72+
fun testMissingFilename() {
73+
assertThrows<IllegalArgumentException> {
74+
createFactory("\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}")
75+
}
76+
}
77+
78+
@Test
79+
fun testUnknownParameter() {
80+
assertThrows<IllegalArgumentException> {
81+
createFactory("\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${filename}\${unknown}")
82+
}
83+
}
84+
85+
@Test
86+
fun testAttemptAndExtensionPresent() {
87+
createFactory("\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${attempt}\${extension}")
88+
assertThrows<IllegalArgumentException> {
89+
createFactory("\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${attempt}")
90+
}
91+
assertThrows<IllegalArgumentException> {
92+
createFactory("\${topic}/\${projectId}/\${userId}/\${sourceId}/\${time:yyyyMM}/\${time:dd}/\${extension}")
93+
}
94+
}
95+
96+
private fun createFactory(format: String): FormattedPathFactory = FormattedPathFactory().apply {
97+
init(mapOf(
98+
"format" to format,
99+
))
100+
extension = ".csv.gz"
101+
}
102+
}

0 commit comments

Comments
 (0)