Skip to content

Commit cebc5ed

Browse files
committed
Allow users to exclude fields from output
Excluding can be done per topic or for all topics
1 parent 40b1a85 commit cebc5ed

File tree

3 files changed

+16
-1
lines changed

3 files changed

+16
-1
lines changed

restructure.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ format:
9393
# factory: org.radarbase.hdfs.format.FormatFactory
9494
# Additional format properties
9595
# properties: {}
96+
# Do not write certain fields to file
97+
# excludeFields: []
9698

9799
# Worker settings. Each worker thread has its own cache and topic, so the
98100
# settings only apply to a single thread.
@@ -147,6 +149,11 @@ topics:
147149
enable: true
148150
# deduplicate this topic only using given fields.
149151
distinctFields: [key.sourceId, value.time]
152+
# Do not write certain fields to file
153+
# In this case, exclude user and project ID since they are always the same
154+
excludeFields:
155+
- key.userId
156+
- key.projectId
150157
# topic name
151158
connect_fitbit_source2:
152159
# deduplicate this topic, regardless of the format settings

src/main/java/org/radarbase/output/config/TopicConfig.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ data class TopicConfig(
1515
* [org.radarbase.output.path.FormattedPathFactory] format.
1616
*/
1717
val pathProperties: Map<String, String> = emptyMap(),
18+
/**
19+
* Exclude given fields from output files.
20+
*/
21+
val excludeFields: Set<String>? = null,
1822
) {
1923
fun deduplication(deduplicationDefault: DeduplicationConfig): DeduplicationConfig =
2024
deduplication

src/main/java/org/radarbase/output/worker/FileCache.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,16 @@ class FileCache(
6363
private var lastUse: Long = 0
6464
private val hasError: AtomicBoolean = AtomicBoolean(false)
6565
private val deduplicate: DeduplicationConfig
66+
private val excludeFields: Set<String>
6667

6768
init {
6869
val topicConfig = factory.config.topics[topic]
6970
val defaultDeduplicate = factory.config.format.deduplication
7071
deduplicate = topicConfig?.deduplication(defaultDeduplicate) ?: defaultDeduplicate
7172

73+
val defaultExclude = factory.config.format.excludeFields
74+
excludeFields = topicConfig?.excludeFields ?: defaultExclude
75+
7276
this.tmpPath = createTempFile(tmpDir, fileName, ".tmp" + compression.extension)
7377
}
7478

@@ -102,7 +106,7 @@ class FileCache(
102106

103107
this.recordConverter = try {
104108
inputStream.reader().useSuspended { reader ->
105-
converterFactory.converterFor(writer, record, fileIsNew, reader)
109+
converterFactory.converterFor(writer, record, fileIsNew, reader, excludeFields)
106110
}
107111
} catch (ex: IOException) {
108112
coroutineScope {

0 commit comments

Comments
 (0)