Skip to content

Commit e1a4608

Browse files
committed
Allow non-record keys
1 parent 178ebf0 commit e1a4608

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

src/main/java/org/radarbase/output/path/RecordPathFactory.kt

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package org.radarbase.output.path
1818

19+
import org.apache.avro.Schema
1920
import org.apache.avro.generic.GenericRecord
21+
import org.apache.avro.generic.GenericRecordBuilder
2022
import org.radarbase.output.Plugin
2123
import org.radarbase.output.util.TimeUtil
2224
import org.slf4j.LoggerFactory
@@ -55,19 +57,29 @@ abstract class RecordPathFactory : Plugin {
5557
*/
5658
open fun getRecordOrganization(topic: String,
5759
record: GenericRecord, attempt: Int): RecordOrganization {
58-
val keyField = record.get("key") as? GenericRecord
60+
val keyField = record.get("key")
5961
val valueField = record.get("value") as? GenericRecord
6062

6163
if (keyField == null || valueField == null) {
6264
logger.error("Failed to process {}", record)
6365
throw IllegalArgumentException("Failed to process $record; no key or value")
6466
}
6567

66-
val time = TimeUtil.getDate(keyField, valueField)
68+
val keyRecord: GenericRecord = if (keyField is GenericRecord) {
69+
keyField
70+
} else {
71+
GenericRecordBuilder(observationKeySchema)
72+
.set("projectId", valueField.getOrNull("projectId"))
73+
.set("userId", keyField.toString())
74+
.set("sourceId", valueField.getOrNull("sourceId") ?: "unknown")
75+
.build()
76+
}
77+
78+
val time = TimeUtil.getDate(keyRecord, valueField)
6779

68-
val relativePath = getRelativePath(topic, keyField, valueField, time, attempt)
80+
val relativePath = getRelativePath(topic, keyRecord, valueField, time, attempt)
6981
val outputPath = root.resolve(relativePath)
70-
val category = getCategory(keyField, valueField)
82+
val category = getCategory(keyRecord, valueField)
7183
return RecordOrganization(outputPath, category, time)
7284
}
7385

@@ -119,5 +131,21 @@ abstract class RecordPathFactory : Plugin {
119131
?.let { ILLEGAL_CHARACTER_PATTERN.matcher(it.toString()).replaceAll("") }
120132
?.takeIf { it.isNotEmpty() }
121133
?: defaultValue
134+
135+
private val observationKeySchema = Schema.Parser().parse("""
136+
{
137+
"namespace": "org.radarcns.kafka",
138+
"type": "record",
139+
"name": "ObservationKey",
140+
"doc": "Key of an observation.",
141+
"fields": [
142+
{"name": "projectId", "type": ["null", "string"], "doc": "Project identifier. Null if unknown or the user is not enrolled in a project.", "default": null},
143+
{"name": "userId", "type": "string", "doc": "User Identifier created during the enrolment."},
144+
{"name": "sourceId", "type": "string", "doc": "Unique identifier associated with the source."}
145+
]
146+
}
147+
""".trimIndent())
148+
149+
private fun GenericRecord.getOrNull(fieldName: String): Any? = if (hasField(fieldName)) get(fieldName) else null
122150
}
123151
}

0 commit comments

Comments
 (0)