Skip to content

Commit 0edebe1

Browse files
committed
Added formatted path formatter
1 parent b88b7ff commit 0edebe1

File tree

2 files changed

+109
-1
lines changed

2 files changed

+109
-1
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.radarbase.output.path
18+
19+
import org.apache.avro.generic.GenericRecord
20+
import org.slf4j.LoggerFactory
21+
import java.nio.file.Path
22+
import java.nio.file.Paths
23+
import java.time.Instant
24+
import java.time.ZoneOffset.UTC
25+
import java.time.format.DateTimeFormatter
26+
27+
open class FormattedPathFactory : RecordPathFactory() {
28+
lateinit var format: String
29+
lateinit var timeParameters: Map<String, DateTimeFormatter>
30+
31+
override fun init(properties: Map<String, String>) {
32+
super.init(properties)
33+
34+
format = properties["format"]
35+
?: run {
36+
logger.warn("Path format not provided, using {} instead", DEFAULT_FORMAT)
37+
DEFAULT_FORMAT
38+
}
39+
40+
val parameters = "\\$\\{([^}]*)}".toRegex()
41+
.findAll(format)
42+
.map { it.groupValues[1] }
43+
.toList()
44+
45+
timeParameters = parameters
46+
.filter { it.startsWith("time:") }
47+
.associateWith { p ->
48+
DateTimeFormatter
49+
.ofPattern(p.removePrefix("time:"))
50+
.withZone(UTC)
51+
}
52+
53+
val parameterNames = knownParameters + timeParameters.keys
54+
55+
val illegalParameters = parameters.filterNot { it in parameterNames }
56+
if (illegalParameters.isNotEmpty()) {
57+
throw IllegalArgumentException(
58+
"Cannot use path format $format: unknown parameters $illegalParameters." +
59+
" Legal parameter names are time formats (e.g., \${time:YYYYmmDD}" +
60+
" or the following: $knownParameters",
61+
)
62+
}
63+
}
64+
65+
override fun getRelativePath(topic: String, key: GenericRecord,
66+
value: GenericRecord, time: Instant?, attempt: Int): Path {
67+
val attemptSuffix = if (attempt == 0) "" else "_$attempt"
68+
69+
val templatedParameters = mutableMapOf(
70+
"projectId" to sanitizeId(key.get("projectId"), "unknown-project"),
71+
"userId" to sanitizeId(key.get("userId"), "unknown-user"),
72+
"sourceId" to sanitizeId(key.get("sourceId"), "unknown-source"),
73+
"topic" to topic,
74+
"filename" to getTimeBin(time) + attemptSuffix + extension,
75+
)
76+
77+
templatedParameters += if (time != null) {
78+
timeParameters.mapValues { (_, formatter) -> formatter.format(time) }
79+
} else {
80+
timeParameters.mapValues { "unknown-time" }
81+
}
82+
83+
val path = templatedParameters.asSequence()
84+
.fold(format) { p, (name, value) ->
85+
p.replace("\${$name}", value)
86+
}
87+
88+
return Paths.get(path)
89+
}
90+
91+
override fun getCategory(key: GenericRecord, value: GenericRecord): String {
92+
return sanitizeId(key.get("sourceId"), "unknown-source")
93+
}
94+
95+
companion object {
96+
private const val DEFAULT_FORMAT = "\${projectId}/\${userId}/\${topic}/\${filename}"
97+
98+
private val knownParameters = setOf(
99+
"filename",
100+
"topic",
101+
"projectId",
102+
"userId",
103+
"sourceId"
104+
)
105+
106+
private val logger = LoggerFactory.getLogger(FormattedPathFactory::class.java)
107+
}
108+
}

src/main/java/org/radarbase/output/path/RecordPathFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ abstract class RecordPathFactory : Plugin {
4040
.ofPattern(it)
4141
.withZone(UTC)
4242
} catch (ex: IllegalArgumentException) {
43-
logger.error("Cannot use time bin format {}, using {} instad", it, timeBinFormat, ex)
43+
logger.error("Cannot use time bin format {}, using {} instead", it, timeBinFormat, ex)
4444
}
4545
}
4646
}

0 commit comments

Comments
 (0)