Skip to content

Commit f6dad31

Browse files
committed
Added docs and clarified code
1 parent b09dae5 commit f6dad31

File tree

8 files changed

+154
-52
lines changed

8 files changed

+154
-52
lines changed

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,52 @@ Secrets can be provided as environment variables as well:
151151

152152
Replace `SOURCE` with `TARGET` in the variables above to configure the target storage.
153153

154+
### Path format
155+
156+
The output path at the target storage is determined by the path format. The class that handles path
157+
output by default is the `org.radarbase.output.path.FormattedPathFactory`. The default format is
158+
```
159+
${projectId}/${userId}/${topic}/${filename}
160+
```
161+
Each format parameter is enclosed by a dollar sign with curly brackets.
162+
163+
The full set of parameters is listed here:
164+
```yaml
165+
paths:
166+
# Input directories in source storage
167+
inputs:
168+
- /testIn
169+
# Temporary directory for local file processing.
170+
temp: ./output/+tmp
171+
# Output directory in target storage
172+
output: /output
173+
# Output path construction factory
174+
factory: org.radarbase.output.path.FormattedPathFactory
175+
# Additional properties
176+
# properties:
177+
# format: ${projectId}/${userId}/${topic}/${time:mm}/${time:YYYYmmDD_HH'00'}${attempt}${extension}
178+
# plugins: fixed time key value org.example.plugin.MyPathPlugin
179+
```
180+
181+
The FormattedPathFactory can use multiple plugins to format paths based on a given record.
182+
The `fixed` plugin has a number of fixed parameters that can be used:
183+
184+
| Parameter | Description |
185+
|-----------|-------------------------------------------------------------------------|
186+
| projectId | record project ID |
187+
| userId | record user ID |
188+
| sourceId | record source ID |
189+
| topic | Kafka topic |
190+
| filename | default time binning with attempt suffix and file extension |
191+
| attempt | attempt suffix for if a file with an incompatible format already exists |
192+
| extension | file extension |
193+
194+
At least `filename` should be used, or a combination of `attempt` and `extension`.
195+
196+
Then there are also plugins that take their own format. The `time` plugin formats a parameter according to the record time. It takes parameters with format `time:<date format>` where `<date format>` should be replaced by a [Java date format](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/time/format/DateTimeFormatter.html), such as `YYYY-mm-dd`. The plugin tries to use the following time fields, in this order: a double `time` in the value struct, `timeStart` double or `start` long in the key struct, `dateTime` string in the value struct, `date` string in the value struct, `timeReceived` double in the value struct or `timeCompleted` double in the value struct. The first valid value used. If no valid time values are found, `unknown-date` is returned.
197+
198+
The `key` and `value` plugins read values from the key or value structs of a given record. For example, parameter `value:color.red` will attempt to read the value struct, finding first the `color` field and then the enclosed `red` field. If no such value exists, `unknown-value` will be used in the format.
199+
154200
### Cleaner
155201

156202
Source files can be automatically be removed by a cleaner process. This checks whether the file has already been extracted and is older than a configured age. This feature is not enabled by default. It can be configured in the `cleaner` configuration section:
@@ -199,3 +245,5 @@ To implement alternative storage paths, storage drivers or storage formats, put
199245
| `compression: factory: ...` | `org.radarbase.output.compression.CompressionFactory` | Factory class to use for data compression. | CompressionFactory |
200246

201247
The respective `<type>: properties: {}` configuration parameters can be used to provide custom configuration of the factory. This configuration will be passed to the `Plugin#init(Map<String, String>)` method.
248+
249+
By adding additional path format plugins to the classpath, the path format of FormattedPathFactory may be expanded with different parameters or lookup engines.

src/main/java/org/radarbase/output/path/FixedPathFormatterPlugin.kt

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,30 @@ package org.radarbase.output.path
33
import org.radarbase.output.path.RecordPathFactory.Companion.sanitizeId
44

55
class FixedPathFormatterPlugin : PathFormatterPlugin() {
6-
override val allowedFormats: String = lookupTable.keys.joinToString(separator = ", ")
6+
override val allowedFormats: String = allowedParamNames.joinToString(separator = ", ")
77

8-
override fun createLookupTable(
9-
parameterNames: Set<String>,
10-
): Map<String, PathFormatParameters.() -> String> = lookupTable.filterKeys { it in parameterNames }
8+
override fun lookup(parameterContents: String): PathFormatParameters.() -> String = when (parameterContents) {
9+
"projectId" -> ({ sanitizeId(key.get("projectId"), "unknown-project") })
10+
"userId" -> ({ sanitizeId(key.get("userId"), "unknown-user") })
11+
"sourceId" -> ({ sanitizeId(key.get("sourceId"), "unknown-source") })
12+
"topic" -> ({ topic })
13+
"filename" -> ({ timeBin + attempt.toAttemptSuffix() + extension })
14+
"attempt" -> ({ attempt.toAttemptSuffix() })
15+
"extension" -> ({ extension })
16+
else -> throw IllegalArgumentException("Unknown path parameter $parameterContents")
17+
}
18+
19+
override fun extractParamContents(paramName: String): String? = paramName.takeIf { it in allowedParamNames }
1120

1221
companion object {
13-
val lookupTable = mapOf<String, PathFormatParameters.() -> String>(
14-
"projectId" to { sanitizeId(key.get("projectId"), "unknown-project") },
15-
"userId" to { sanitizeId(key.get("userId"), "unknown-user") },
16-
"sourceId" to { sanitizeId(key.get("sourceId"), "unknown-source") },
17-
"topic" to { topic },
18-
"filename" to { timeBin + attempt.toAttemptSuffix() + extension },
19-
"attempt" to { attempt.toAttemptSuffix() },
20-
"extension" to { extension },
22+
val allowedParamNames = setOf(
23+
"projectId",
24+
"userId",
25+
"sourceId",
26+
"topic",
27+
"filename",
28+
"attempt",
29+
"extension",
2130
)
2231

2332
private fun Int.toAttemptSuffix() = if (this == 0) "" else "_$this"

src/main/java/org/radarbase/output/path/FormattedPathFactory.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ open class FormattedPathFactory : RecordPathFactory() {
3535

3636
format = properties["format"]
3737
?: run {
38-
logger.warn("Path format not provided, using {} instead", DEFAULT_FORMAT)
38+
logger.warn("Path format not provided, using '{}' instead", DEFAULT_FORMAT)
3939
DEFAULT_FORMAT
4040
}
4141
val pluginClassNames = properties["plugins"]
4242
?: run {
43-
logger.warn("Path format plugins not provided, using {} instead", DEFAULT_FORMAT_PLUGINS)
43+
logger.warn("Path format plugins not provided, using '{}' instead", DEFAULT_FORMAT_PLUGINS)
4444
DEFAULT_FORMAT_PLUGINS
4545
}
4646

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package org.radarbase.output.path
22

3+
import org.radarbase.output.path.RecordPathFactory.Companion.sanitizeId
34
import org.radarbase.output.path.ValuePathFormatterPlugin.Companion.lookup
45

56
class KeyPathFormatterPlugin : PathFormatterPlugin() {
7+
override val prefix: String = "key"
8+
69
override val allowedFormats: String = "key:my.key.index"
710

8-
override fun createLookupTable(
9-
parameterNames: Set<String>
10-
): Map<String, (PathFormatParameters) -> String> = parameterNames
11-
.filter { it.startsWith("key:") }
12-
.associateWith { name ->
13-
val index = name.removePrefix("key:").split('.')
14-
return@associateWith { params ->
15-
RecordPathFactory.sanitizeId(params.key.lookup(index), "unknown-key")
16-
}
11+
override fun lookup(parameterContents: String): PathFormatParameters.() -> String {
12+
val index = parameterContents.split('.')
13+
require(index.none { it.isBlank() }) { "Cannot format key record with index $parameterContents" }
14+
return {
15+
sanitizeId(key.lookup(index), "unknown-key")
1716
}
17+
}
1818
}

src/main/java/org/radarbase/output/path/PathFormatter.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.radarbase.output.path
1818

19+
import org.slf4j.LoggerFactory
1920
import java.nio.file.Path
2021
import java.nio.file.Paths
2122

@@ -33,7 +34,14 @@ class PathFormatter(
3334

3435
parameterLookups = buildMap {
3536
plugins.forEach { plugin ->
36-
putAll(plugin.createLookupTable(foundParameters))
37+
putAll(
38+
try {
39+
plugin.createLookupTable(foundParameters)
40+
} catch (ex: IllegalArgumentException) {
41+
logger.error("Cannot parse path format {}, illegal format parameter found by plugin {}", format, plugin.javaClass, ex)
42+
throw ex
43+
}
44+
)
3745
}
3846
}
3947
val unsupportedParameters = foundParameters - parameterLookups.keys
@@ -61,4 +69,8 @@ class PathFormatter(
6169

6270
return Paths.get(path)
6371
}
72+
73+
companion object {
74+
private val logger = LoggerFactory.getLogger(PathFormatter::class.java)
75+
}
6476
}

src/main/java/org/radarbase/output/path/PathFormatterPlugin.kt

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,47 @@ package org.radarbase.output.path
33
import org.radarbase.output.Plugin
44

55
abstract class PathFormatterPlugin : Plugin {
6+
/**
7+
* Prefix for parameter names covered by this plugin. If null, [extractParamContents] must be
8+
* overridden to cover only supported parameters.
9+
*/
10+
open val prefix: String? = null
11+
12+
/** Textual format of formats allowed to be represented. */
613
abstract val allowedFormats: String
714

8-
abstract fun createLookupTable(
9-
parameterNames: Set<String>
10-
): Map<String, PathFormatParameters.() -> String>
15+
/**
16+
* Create a lookup table from parameter names to
17+
* its value for a given record. Only parameter names supported by this plugin will be mapped.
18+
* @throws IllegalArgumentException if any of the parameter contents are invalid.
19+
*/
20+
fun createLookupTable(
21+
parameterNames: Collection<String>
22+
): Map<String, PathFormatParameters.() -> String> = buildMap {
23+
parameterNames.forEach { paramName ->
24+
val paramContents = extractParamContents(paramName)
25+
if (paramContents != null) {
26+
put(paramName, lookup(paramContents))
27+
}
28+
}
29+
}
30+
31+
/**
32+
* Validate a parameter name and extract its contents to use in the lookup.
33+
*
34+
* @return name to use in the lookup or null if the parameter is not supported by this plugin
35+
*/
36+
protected open fun extractParamContents(paramName: String): String? {
37+
val prefixString = prefix?.let { "$it:" } ?: return null
38+
if (!paramName.startsWith(prefixString)) return null
39+
val parameterContents = paramName.removePrefix(prefixString).trim()
40+
require(parameterContents.isNotEmpty()) { "Parameter contents of '$paramName' are empty" }
41+
return parameterContents
42+
}
43+
44+
/**
45+
* Create a lookup function from a record to formatted value, based on parameter contents.
46+
* @throws IllegalArgumentException if the parameter contents are invalid.
47+
*/
48+
protected abstract fun lookup(parameterContents: String): PathFormatParameters.() -> String
1149
}

src/main/java/org/radarbase/output/path/TimePathFormatterPlugin.kt

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,19 @@ import java.time.ZoneOffset
55
import java.time.format.DateTimeFormatter
66

77
class TimePathFormatterPlugin : PathFormatterPlugin() {
8+
override val prefix: String = "time"
9+
810
override val allowedFormats: String = "time:YYYY-mm-dd"
911

10-
override fun createLookupTable(
11-
parameterNames: Set<String>
12-
): Map<String, PathFormatParameters.() -> String> {
13-
return parameterNames
14-
.filter { it.startsWith("time:") }
15-
.associateWith { p ->
16-
val dateFormatter = DateTimeFormatter
17-
.ofPattern(p.removePrefix("time:"))
18-
.withZone(ZoneOffset.UTC)
19-
return@associateWith {
20-
sanitizeId(
21-
time?.let { dateFormatter.format(it) },
22-
"unknown-time",
23-
)
24-
}
25-
}
12+
override fun lookup(parameterContents: String): PathFormatParameters.() -> String {
13+
val dateFormatter = DateTimeFormatter
14+
.ofPattern(parameterContents)
15+
.withZone(ZoneOffset.UTC)
16+
return {
17+
sanitizeId(
18+
time?.let { dateFormatter.format(it) },
19+
"unknown-time",
20+
)
21+
}
2622
}
2723
}

src/main/java/org/radarbase/output/path/ValuePathFormatterPlugin.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@ import org.apache.avro.generic.GenericRecord
44
import org.radarbase.output.path.RecordPathFactory.Companion.sanitizeId
55

66
class ValuePathFormatterPlugin : PathFormatterPlugin() {
7+
override val prefix: String = "value"
8+
79
override val allowedFormats: String = "value:my.value.index"
810

9-
override fun createLookupTable(
10-
parameterNames: Set<String>
11-
): Map<String, (PathFormatParameters) -> String> = parameterNames
12-
.filter { it.startsWith("value:") }
13-
.associateWith { name ->
14-
val index = name.removePrefix("value:").split('.')
15-
return@associateWith { params ->
16-
sanitizeId(params.value.lookup(index), "unknown-value")
17-
}
11+
override fun lookup(parameterContents: String): PathFormatParameters.() -> String {
12+
val index = parameterContents.split('.')
13+
require(index.none { it.isBlank() }) { "Cannot format value record with index $parameterContents" }
14+
return {
15+
sanitizeId(value.lookup(index), "unknown-value")
1816
}
17+
}
1918

2019
companion object {
2120
fun GenericRecord.lookup(index: List<String>): Any? =

0 commit comments

Comments
 (0)