Skip to content

Commit f09712c

Browse files
committed
Remove unused functionality
1 parent a66cf3b commit f09712c

File tree

9 files changed

+90
-197
lines changed

9 files changed

+90
-197
lines changed

src/main/java/org/radarbase/output/cleaner/TimestampExtractionCheck.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TimestampExtractionCheck(
6161
var suffix = 0
6262

6363
do {
64-
val (path) = pathFactory.getRecordOrganization(
64+
val path = pathFactory.getRecordPath(
6565
topicFile.topic,
6666
record,
6767
suffix

src/main/java/org/radarbase/output/path/FixedPathFormatterPlugin.kt

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,50 @@
11
package org.radarbase.output.path
22

33
import org.radarbase.output.path.RecordPathFactory.Companion.sanitizeId
4+
import org.slf4j.LoggerFactory
5+
import java.time.ZoneOffset
6+
import java.time.format.DateTimeFormatter
47

58
class FixedPathFormatterPlugin : PathFormatterPlugin() {
9+
private lateinit var timeBinFormat: DateTimeFormatter
610
override val name: String = "fixed"
711

812
override val allowedFormats: String = allowedParamNames.joinToString(separator = ", ")
913

14+
override fun init(properties: Map<String, String>) {
15+
super.init(properties)
16+
timeBinFormat = createTimeBinFormatter(properties["timeBinFormat"])
17+
}
18+
19+
private fun createTimeBinFormatter(pattern: String?): DateTimeFormatter {
20+
pattern ?: return HOURLY_TIME_BIN_FORMAT
21+
22+
return try {
23+
DateTimeFormatter
24+
.ofPattern(pattern)
25+
.withZone(ZoneOffset.UTC)
26+
} catch (ex: IllegalArgumentException) {
27+
logger.error(
28+
"Cannot use time bin format {}, using {} instead",
29+
pattern,
30+
HOURLY_TIME_BIN_FORMAT,
31+
ex,
32+
)
33+
HOURLY_TIME_BIN_FORMAT
34+
}
35+
}
36+
1037
override fun lookup(parameterContents: String): PathFormatParameters.() -> String = when (parameterContents) {
1138
"projectId" -> ({ sanitizeId(key.get("projectId"), "unknown-project") })
1239
"userId" -> ({ sanitizeId(key.get("userId"), "unknown-user") })
1340
"sourceId" -> ({ sanitizeId(key.get("sourceId"), "unknown-source") })
1441
"topic" -> ({ topic })
15-
"filename" -> ({ timeBin + attempt.toAttemptSuffix() + extension })
42+
"filename" -> (
43+
{
44+
val timeBin = sanitizeId(time?.let { timeBinFormat.format(it) }, "unknown-time")
45+
timeBin + attempt.toAttemptSuffix() + extension
46+
}
47+
)
1648
"attempt" -> ({ attempt.toAttemptSuffix() })
1749
"extension" -> ({ extension })
1850
else -> throw IllegalArgumentException("Unknown path parameter $parameterContents")
@@ -31,6 +63,12 @@ class FixedPathFormatterPlugin : PathFormatterPlugin() {
3163
"extension",
3264
)
3365

66+
val HOURLY_TIME_BIN_FORMAT: DateTimeFormatter = DateTimeFormatter
67+
.ofPattern("yyyyMMdd_HH'00'")
68+
.withZone(ZoneOffset.UTC)
69+
3470
private fun Int.toAttemptSuffix() = if (this == 0) "" else "_$this"
71+
72+
private val logger = LoggerFactory.getLogger(FixedPathFormatterPlugin::class.java)
3573
}
3674
}

src/main/java/org/radarbase/output/path/FormattedPathFactory.kt

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,63 +25,46 @@ import kotlin.reflect.jvm.jvmName
2525

2626
open class FormattedPathFactory : RecordPathFactory() {
2727
private lateinit var formatter: PathFormatter
28-
private lateinit var properties: Map<String, String>
28+
private lateinit var config: PathFormatterConfig
2929
private var topicFormatters: Map<String, PathFormatter> = emptyMap()
3030

3131
override fun init(properties: Map<String, String>) {
3232
super.init(properties)
3333

34-
this.properties = DEFAULTS + properties
35-
formatter = createFormatter(this.properties)
34+
this.config = DEFAULTS.withValues(properties)
35+
formatter = config.toPathFormatter()
3636
logger.info("Formatting path with {}", formatter)
3737
}
3838

39-
private fun instantiatePlugins(
40-
pluginClassNames: String,
41-
properties: Map<String, String>,
42-
): List<PathFormatterPlugin> = pluginClassNames
43-
.trim()
44-
.split("\\s+".toRegex())
45-
.mapNotNull { it.toPathFormatterPlugin() }
46-
.onEach { it.init(properties) }
47-
4839
override fun addTopicConfiguration(topicConfig: Map<String, TopicConfig>) {
4940
topicFormatters = topicConfig
5041
.filter { (_, config) -> config.pathProperties.isNotEmpty() }
5142
.mapValues { (_, config) ->
52-
createFormatter(properties + config.pathProperties)
43+
this.config.withValues(config.pathProperties)
44+
.toPathFormatter()
5345
}
5446
.onEach { (topic, formatter) ->
5547
logger.info("Formatting path of topic {} with {}", topic, formatter)
5648
}
5749
}
5850

59-
private fun createFormatter(properties: Map<String, String>): PathFormatter {
60-
val format = checkNotNull(properties["format"])
61-
val pluginClassNames = checkNotNull(properties["plugins"])
62-
val plugins = instantiatePlugins(pluginClassNames, properties)
63-
64-
return PathFormatter(format, plugins)
65-
}
66-
6751
override fun getRelativePath(
6852
topic: String,
6953
key: GenericRecord,
7054
value: GenericRecord,
7155
time: Instant?,
7256
attempt: Int,
7357
): Path = (topicFormatters[topic] ?: formatter)
74-
.format(PathFormatParameters(topic, key, value, time, attempt, extension, this::getTimeBin))
75-
76-
override fun getCategory(
77-
key: GenericRecord,
78-
value: GenericRecord,
79-
): String = sanitizeId(key.get("sourceId"), "unknown-source")
58+
.format(PathFormatParameters(topic, key, value, time, attempt, extension))
8059

8160
companion object {
82-
internal val DEFAULTS = mapOf(
83-
"format" to "\${projectId}/\${userId}/\${topic}/\${filename}",
84-
"plugins" to "fixed time key value",
61+
private fun PathFormatterConfig.toPathFormatter(): PathFormatter {
62+
return PathFormatter(format, createPlugins())
63+
}
64+
65+
internal val DEFAULTS = PathFormatterConfig(
66+
format = "\${projectId}/\${userId}/\${topic}/\${filename}",
67+
pluginNames = "fixed time key value",
8568
)
8669
private val logger = LoggerFactory.getLogger(FormattedPathFactory::class.java)
8770

@@ -107,5 +90,27 @@ open class FormattedPathFactory : RecordPathFactory() {
10790
}
10891
}
10992
}
93+
94+
data class PathFormatterConfig(
95+
val format: String,
96+
val pluginNames: String,
97+
val properties: Map<String, String> = mapOf(),
98+
) {
99+
fun createPlugins(): List<PathFormatterPlugin> = pluginNames
100+
.trim()
101+
.split("\\s+".toRegex())
102+
.mapNotNull { it.toPathFormatterPlugin() }
103+
.onEach { it.init(properties) }
104+
105+
fun withValues(values: Map<String, String>): PathFormatterConfig {
106+
val newProperties = HashMap(properties).apply {
107+
putAll(values)
108+
}
109+
val format = newProperties.remove("format") ?: this.format
110+
val pluginNames = newProperties.remove("plugins") ?: this.pluginNames
111+
112+
return PathFormatterConfig(format, pluginNames, newProperties)
113+
}
114+
}
110115
}
111116
}

src/main/java/org/radarbase/output/path/ObservationKeyPathFactory.kt

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,4 @@
1616

1717
package org.radarbase.output.path
1818

19-
import org.apache.avro.generic.GenericRecord
20-
import java.nio.file.Path
21-
import java.nio.file.Paths
22-
import java.time.Instant
23-
24-
open class ObservationKeyPathFactory : RecordPathFactory() {
25-
override fun getRelativePath(
26-
topic: String,
27-
key: GenericRecord,
28-
value: GenericRecord,
29-
time: Instant?,
30-
attempt: Int,
31-
): Path {
32-
val projectId = sanitizeId(key.get("projectId"), "unknown-project")
33-
val userId = sanitizeId(key.get("userId"), "unknown-user")
34-
35-
val attemptSuffix = if (attempt == 0) "" else "_$attempt"
36-
val outputFileName = getTimeBin(time) + attemptSuffix + extension
37-
38-
return Paths.get(projectId, userId, topic, outputFileName)
39-
}
40-
41-
override fun getCategory(
42-
key: GenericRecord,
43-
value: GenericRecord,
44-
): String {
45-
return sanitizeId(key.get("sourceId"), "unknown-source")
46-
}
47-
}
19+
open class ObservationKeyPathFactory : FormattedPathFactory()

src/main/java/org/radarbase/output/path/PathFormatParameters.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,4 @@ data class PathFormatParameters(
1010
val time: Instant?,
1111
val attempt: Int,
1212
val extension: String,
13-
val computeTimeBin: (time: Instant?) -> String,
14-
) {
15-
val timeBin: String
16-
get() = computeTimeBin(time)
17-
}
13+
)

src/main/java/org/radarbase/output/path/RecordPathFactory.kt

Lines changed: 8 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,13 @@ import org.radarbase.output.util.TimeUtil
2626
import org.slf4j.LoggerFactory
2727
import java.nio.file.Path
2828
import java.time.Instant
29-
import java.time.ZoneOffset.UTC
30-
import java.time.format.DateTimeFormatter
3129
import java.util.regex.Pattern
3230

3331
abstract class RecordPathFactory : Plugin {
3432
lateinit var root: Path
3533
lateinit var extension: String
3634
lateinit var fileStoreFactory: FileStoreFactory
3735

38-
protected open var timeBinFormat: DateTimeFormatter = HOURLY_TIME_BIN_FORMAT
39-
40-
override fun init(properties: Map<String, String>) {
41-
super.init(properties)
42-
properties["timeBinFormat"]?.let {
43-
try {
44-
timeBinFormat = DateTimeFormatter
45-
.ofPattern(it)
46-
.withZone(UTC)
47-
} catch (ex: IllegalArgumentException) {
48-
logger.error(
49-
"Cannot use time bin format {}, using {} instead",
50-
it,
51-
timeBinFormat,
52-
ex,
53-
)
54-
}
55-
}
56-
}
57-
5836
/**
5937
* Get the organization of given record in given topic.
6038
* @param topic Kafka topic name
@@ -63,11 +41,11 @@ abstract class RecordPathFactory : Plugin {
6341
* paths already existed and are incompatible.
6442
* @return organization of given record
6543
*/
66-
open fun getRecordOrganization(
44+
open fun getRecordPath(
6745
topic: String,
6846
record: GenericRecord,
6947
attempt: Int,
70-
): RecordOrganization {
48+
): Path {
7149
val keyField = record.get("key")
7250
val valueField = record.get("value") as? GenericRecord
7351

@@ -79,19 +57,17 @@ abstract class RecordPathFactory : Plugin {
7957
val keyRecord: GenericRecord = if (keyField is GenericRecord) {
8058
keyField
8159
} else {
82-
GenericRecordBuilder(observationKeySchema)
83-
.set("projectId", valueField.getOrNull("projectId"))
84-
.set("userId", keyField.toString())
85-
.set("sourceId", valueField.getOrNull("sourceId") ?: "unknown")
86-
.build()
60+
GenericRecordBuilder(observationKeySchema).apply {
61+
set("projectId", valueField.getOrNull("projectId"))
62+
set("userId", keyField.toString())
63+
set("sourceId", valueField.getOrNull("sourceId") ?: "unknown")
64+
}.build()
8765
}
8866

8967
val time = TimeUtil.getDate(keyRecord, valueField)
9068

9169
val relativePath = getRelativePath(topic, keyRecord, valueField, time, attempt)
92-
val outputPath = root.resolve(relativePath)
93-
val category = getCategory(keyRecord, valueField)
94-
return RecordOrganization(outputPath, category, time)
70+
return root.resolve(relativePath)
9571
}
9672

9773
/**
@@ -112,38 +88,10 @@ abstract class RecordPathFactory : Plugin {
11288
attempt: Int,
11389
): Path
11490

115-
/**
116-
* Get the category of a record, representing a partitioning for a given topic and user.
117-
* @param key record key
118-
* @param value record value
119-
* @return category name.
120-
*/
121-
abstract fun getCategory(key: GenericRecord, value: GenericRecord): String
122-
123-
open fun getTimeBin(time: Instant?): String = time
124-
?.let { timeBinFormat.format(time) }
125-
?: "unknown_date"
126-
127-
/**
128-
* Organization of a record.
129-
*/
130-
data class RecordOrganization(
131-
/** Path that the record should be stored in. */
132-
val path: Path,
133-
/** Category or partition that the record belongs to. */
134-
val category: String,
135-
/** Time contained in the record, if any. */
136-
val time: Instant?,
137-
)
138-
13991
companion object {
14092
private val logger = LoggerFactory.getLogger(RecordPathFactory::class.java)
14193
private val ILLEGAL_CHARACTER_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]+")
14294

143-
val HOURLY_TIME_BIN_FORMAT: DateTimeFormatter = DateTimeFormatter
144-
.ofPattern("yyyyMMdd_HH'00'")
145-
.withZone(UTC)
146-
14795
fun sanitizeId(id: Any?, defaultValue: String): String = id
14896
?.let { ILLEGAL_CHARACTER_PATTERN.matcher(it.toString()).replaceAll("") }
14997
?.takeIf { it.isNotEmpty() }

src/main/java/org/radarbase/output/worker/RestructureWorker.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ internal class RestructureWorker(
167167
) {
168168
var currentSuffix = 0
169169
do {
170-
val (path) = pathFactory.getRecordOrganization(
170+
val path = pathFactory.getRecordPath(
171171
transaction.topicPartition.topic,
172172
record,
173173
attempt = currentSuffix

0 commit comments

Comments
 (0)