Skip to content

Commit 221b895

Browse files
committed
Allow excluding certain fields from output
This is especially useful for excluding projectId and userId, since those are already included in the directory structure.
1 parent 0e681a4 commit 221b895

File tree

13 files changed

+221
-180
lines changed

13 files changed

+221
-180
lines changed

src/main/java/org/radarbase/output/config/FormatConfig.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ data class FormatConfig(
1414
distinctFields = emptySet(),
1515
ignoreFields = emptySet(),
1616
),
17+
val excludeFields: Set<String> = emptySet(),
1718
) : PluginConfig {
1819
fun createFactory(): FormatFactory = factory.toPluginInstance(properties)
1920
fun createConverter(): RecordConverterFactory = createFactory()[type]

src/main/java/org/radarbase/output/format/CsvAvroConverter.kt

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,37 @@ class CsvAvroConverter(
3333
writeHeader: Boolean,
3434
reader: Reader,
3535
recordHeader: Array<String>,
36+
excludeFields: Set<String>,
3637
) : RecordConverter {
3738

3839
private val csvWriter = CSVWriter(writer)
3940
private val converter: CsvAvroDataConverter
4041

4142
init {
42-
converter = if (writeHeader) {
43-
csvWriter.writeNext(recordHeader, false)
44-
CsvAvroDataConverter(recordHeader)
45-
} else {
46-
CsvAvroDataConverter(
47-
CSVReader(reader).use {
43+
val (header, excludedFromHeader) = when {
44+
!writeHeader -> {
45+
val readHeader = CSVReader(reader).use {
4846
requireNotNull(it.readNext()) { "No header found" }
4947
}
50-
)
48+
Pair(
49+
readHeader,
50+
excludeFields - readHeader.toHashSet(),
51+
)
52+
}
53+
excludeFields.isEmpty() -> Pair(recordHeader, excludeFields)
54+
else -> {
55+
val excludedHeaderSet = recordHeader.toHashSet()
56+
Pair(
57+
recordHeader.filter { it !in excludeFields }.toTypedArray(),
58+
excludeFields.filterTo(HashSet()) { it in excludedHeaderSet }
59+
)
60+
}
5161
}
62+
if (writeHeader) {
63+
csvWriter.writeNext(header, false)
64+
}
65+
66+
converter = CsvAvroDataConverter(header, excludedFromHeader)
5267
}
5368

5469
/**
@@ -60,8 +75,7 @@ class CsvAvroConverter(
6075
@Throws(IOException::class)
6176
override fun writeRecord(record: GenericRecord): Boolean {
6277
return try {
63-
val retValues = converter.convertRecordValues(record)
64-
csvWriter.writeNext(retValues.toTypedArray(), false)
78+
csvWriter.writeNext(converter.convertRecordValues(record), false)
6579
true
6680
} catch (ex: IllegalArgumentException) {
6781
false

src/main/java/org/radarbase/output/format/CsvAvroConverterFactory.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ class CsvAvroConverterFactory : RecordConverterFactory {
150150
): Boolean = source.inputStream().use { input ->
151151
processLines(input, compression) { header, lines ->
152152
checkNotNull(header) { "Empty file found" }
153-
val converter = CsvAvroDataConverter(header)
154-
val recordValues = converter.convertRecordValues(record).toTypedArray()
153+
val converter = CsvAvroDataConverter(header, emptySet())
154+
val recordValues = converter.convertRecordValues(record)
155155
val indexes = fieldIndexes(header, usingFields, ignoreFields)
156156

157157
if (indexes == null) {
@@ -168,7 +168,8 @@ class CsvAvroConverterFactory : RecordConverterFactory {
168168
record: GenericRecord,
169169
writeHeader: Boolean,
170170
reader: Reader,
171-
): CsvAvroConverter = CsvAvroConverter(writer, writeHeader, reader, headerFor(record))
171+
excludeFields: Set<String>,
172+
): CsvAvroConverter = CsvAvroConverter(writer, writeHeader, reader, headerFor(record), excludeFields)
172173

173174
override val hasHeader: Boolean = true
174175

src/main/java/org/radarbase/output/format/CsvAvroDataConverter.kt

Lines changed: 63 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -9,93 +9,88 @@ import java.util.*
99

1010
internal class CsvAvroDataConverter(
1111
private val headers: Array<String>,
12+
private val excludeFields: Set<String>,
1213
) {
13-
private val values: MutableList<String> = ArrayList(this.headers.size)
14-
1514
fun convertRecord(record: GenericRecord): Map<String, Any?> {
16-
values.clear()
17-
val schema = record.schema
18-
for (field in schema.fields) {
19-
convertAvro(values, record.get(field.pos()), field.schema(), field.name())
20-
}
21-
val map = LinkedHashMap<String, Any>()
22-
for (i in headers.indices) {
23-
map[headers[i]] = values[i]
15+
val recordValues = convertRecordValues(record)
16+
return buildMap<String, Any> {
17+
for (i in headers.indices) {
18+
put(headers[i], recordValues[i])
19+
}
2420
}
25-
values.clear()
26-
return map
2721
}
2822

29-
fun convertRecordValues(record: GenericRecord): List<String> {
30-
values.clear()
23+
fun convertRecordValues(record: GenericRecord): Array<String> {
24+
val values = arrayOfNulls<String>(headers.size)
3125
val schema = record.schema
32-
for (field in schema.fields) {
33-
convertAvro(values, record.get(field.pos()), field.schema(), field.name())
26+
val endIndex = schema.fields.fold(0) { valueIndex, field ->
27+
convertAvro(values, valueIndex, record.get(field.pos()), field.schema(), field.name())
3428
}
35-
require(values.size >= headers.size) { "Values and headers do not match" }
36-
return values
29+
require(endIndex >= headers.size) { "Values and headers do not match" }
30+
@Suppress("UNCHECKED_CAST")
31+
return values as Array<String>
3732
}
3833

3934
private fun convertAvro(
40-
values: MutableList<String>,
35+
values: Array<String?>,
36+
startIndex: Int,
4137
data: Any?,
4238
schema: Schema,
4339
prefix: String,
44-
) {
45-
when (schema.type) {
46-
Schema.Type.RECORD -> {
47-
val record = data as GenericRecord
48-
val subSchema = record.schema
49-
for (field in subSchema.fields) {
50-
val subData = record.get(field.pos())
51-
convertAvro(
52-
values,
53-
subData,
54-
field.schema(),
55-
prefix + '.'.toString() + field.name(),
56-
)
57-
}
58-
}
59-
Schema.Type.MAP -> {
60-
val valueType = schema.valueType
61-
for ((key, value) in data as Map<*, *>) {
62-
val name = "$prefix.$key"
63-
convertAvro(values, value, valueType, name)
64-
}
65-
}
66-
Schema.Type.ARRAY -> {
67-
val itemType = schema.elementType
68-
for ((i, orig) in (data as List<*>).withIndex()) {
69-
convertAvro(values, orig, itemType, "$prefix.$i")
70-
}
71-
}
72-
Schema.Type.UNION -> {
73-
val type = GenericData().resolveUnion(schema, data)
74-
convertAvro(values, data, schema.types[type], prefix)
75-
}
76-
Schema.Type.BYTES -> {
77-
checkHeader(prefix, values.size)
78-
values.add(BASE64_ENCODER.encodeToString((data as ByteBuffer).array()))
40+
): Int = when (schema.type) {
41+
Schema.Type.RECORD -> {
42+
val record = data as GenericRecord
43+
val subSchema = record.schema
44+
subSchema.fields.fold(startIndex) { index, field ->
45+
val subData = record.get(field.pos())
46+
convertAvro(
47+
values,
48+
index,
49+
subData,
50+
field.schema(),
51+
prefix + '.'.toString() + field.name(),
52+
)
7953
}
80-
Schema.Type.FIXED -> {
81-
checkHeader(prefix, values.size)
82-
values.add(BASE64_ENCODER.encodeToString((data as GenericFixed).bytes()))
83-
}
84-
Schema.Type.STRING, Schema.Type.ENUM, Schema.Type.INT, Schema.Type.LONG,
85-
Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN -> {
86-
checkHeader(prefix, values.size)
87-
values.add(data.toString())
54+
}
55+
Schema.Type.MAP -> {
56+
val valueType = schema.valueType
57+
(data as Map<*, *>).entries.fold(startIndex) { index, (key, value) ->
58+
val name = "$prefix.$key"
59+
convertAvro(values, index, value, valueType, name)
8860
}
89-
Schema.Type.NULL -> {
90-
checkHeader(prefix, values.size)
91-
values.add("")
61+
}
62+
Schema.Type.ARRAY -> {
63+
val itemType = schema.elementType
64+
(data as List<*>).foldIndexed(startIndex) { i, index, orig ->
65+
convertAvro(values, index, orig, itemType, "$prefix.$i")
9266
}
93-
else -> throw IllegalArgumentException("Cannot parse field type " + schema.type)
9467
}
68+
Schema.Type.UNION -> {
69+
val type = GenericData().resolveUnion(schema, data)
70+
convertAvro(values, startIndex, data, schema.types[type], prefix)
71+
}
72+
Schema.Type.BYTES -> {
73+
addValue(prefix, values, startIndex, BASE64_ENCODER.encodeToString((data as ByteBuffer).array()))
74+
}
75+
Schema.Type.FIXED -> {
76+
addValue(prefix, values, startIndex, BASE64_ENCODER.encodeToString((data as GenericFixed).bytes()))
77+
}
78+
Schema.Type.STRING, Schema.Type.ENUM, Schema.Type.INT, Schema.Type.LONG,
79+
Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN -> {
80+
addValue(prefix, values, startIndex, data.toString())
81+
}
82+
Schema.Type.NULL -> {
83+
addValue(prefix, values, startIndex, "")
84+
}
85+
else -> throw IllegalArgumentException("Cannot parse field type " + schema.type)
9586
}
9687

97-
private fun checkHeader(prefix: String, size: Int) {
98-
require(prefix == headers[size]) { "Header $prefix does not match ${headers[size]}" }
88+
private fun addValue(prefix: String, values: Array<String?>, index: Int, value: String): Int {
89+
if (prefix in excludeFields) return index
90+
val header = headers[index]
91+
require(prefix == header) { "Header $prefix does not match $header" }
92+
values[index] = value
93+
return index + 1
9994
}
10095

10196
companion object {

src/main/java/org/radarbase/output/format/JsonAvroConverter.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ import java.io.Writer
2929
*/
3030
class JsonAvroConverter(
3131
writer: Writer,
32-
private val converter: JsonAvroDataConverter,
32+
excludeFields: Set<String>,
3333
) : RecordConverter {
34+
private val converter = JsonAvroDataConverter(excludeFields)
3435
private val generator: JsonGenerator = JSON_FACTORY.createGenerator(writer)
3536
.setPrettyPrinter(MinimalPrettyPrinter("\n"))
3637

src/main/java/org/radarbase/output/format/JsonAvroConverterFactory.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ class JsonAvroConverterFactory : RecordConverterFactory {
1515

1616
override val formats: Collection<String> = setOf("json")
1717

18-
private val converter = JsonAvroDataConverter()
18+
private val converter = JsonAvroDataConverter(setOf())
1919

2020
@Throws(IOException::class)
2121
override fun converterFor(
2222
writer: Writer,
2323
record: GenericRecord,
2424
writeHeader: Boolean,
2525
reader: Reader,
26-
): RecordConverter = JsonAvroConverter(writer, converter)
26+
excludeFields: Set<String>,
27+
): RecordConverter = JsonAvroConverter(writer, excludeFields)
2728

2829
override suspend fun readTimeSeconds(
2930
source: InputStream,

src/main/java/org/radarbase/output/format/JsonAvroDataConverter.kt

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,62 @@ import org.apache.avro.generic.GenericData
55
import org.apache.avro.generic.GenericFixed
66
import org.apache.avro.generic.GenericRecord
77
import java.nio.ByteBuffer
8+
import java.util.EnumSet
89

9-
class JsonAvroDataConverter {
10-
fun convertRecord(record: GenericRecord): Map<String, Any?> {
11-
val map = HashMap<String, Any?>()
10+
class JsonAvroDataConverter(
11+
private val excludeFields: Set<String>
12+
) {
13+
fun convertRecord(record: GenericRecord, prefix: String? = null): Map<String, Any?> {
1214
val schema = record.schema
13-
for (field in schema.fields) {
14-
map[field.name()] = convertAvro(record.get(field.pos()), field.schema())
15+
return buildMap {
16+
for (field in schema.fields) {
17+
val fieldPrefix = if (prefix == null) field.name() else "$prefix.${field.name()}"
18+
convertAvro(record.get(field.pos()), field.schema(), fieldPrefix)
19+
.ifNotExcluded { put(field.name(), it) }
20+
}
1521
}
16-
return map
1722
}
1823

19-
private fun convertAvro(data: Any?, schema: Schema): Any? {
20-
when (schema.type) {
21-
Schema.Type.RECORD -> return convertRecord(data as GenericRecord)
24+
private fun convertAvro(data: Any?, schema: Schema, prefix: String): Any? {
25+
if (schema.type !in compositeTypes && prefix in excludeFields) return EXCLUDE_FIELD
26+
return when (schema.type) {
27+
Schema.Type.RECORD -> convertRecord(data as GenericRecord)
2228
Schema.Type.MAP -> {
23-
val value = HashMap<String, Any?>()
2429
val valueType = schema.valueType
25-
for ((key, value1) in data as Map<*, *>) {
26-
value[key.toString()] = convertAvro(value1, valueType)
30+
buildMap {
31+
for ((key, value1) in data as Map<*, *>) {
32+
convertAvro(value1, valueType, "$prefix.$key")
33+
.ifNotExcluded { put(key.toString(), it) }
34+
}
2735
}
28-
return value
2936
}
3037
Schema.Type.ARRAY -> {
31-
val origList = data as List<*>
3238
val itemType = schema.elementType
33-
val list = ArrayList<Any?>(origList.size)
34-
for (orig in origList) {
35-
list.add(convertAvro(orig, itemType))
39+
buildList {
40+
(data as List<*>).forEachIndexed { i, orig ->
41+
convertAvro(orig, itemType, "$prefix.$i")
42+
.ifNotExcluded { add(it) }
43+
}
3644
}
37-
return list
3845
}
3946
Schema.Type.UNION -> {
40-
val type = GenericData().resolveUnion(schema, data)
41-
return convertAvro(data, schema.types[type])
47+
val typeIndex = GenericData().resolveUnion(schema, data)
48+
convertAvro(data, schema.types[typeIndex], prefix)
4249
}
43-
Schema.Type.BYTES -> return (data as ByteBuffer).array()
44-
Schema.Type.FIXED -> return (data as GenericFixed).bytes()
45-
Schema.Type.ENUM, Schema.Type.STRING -> return data.toString()
46-
Schema.Type.INT, Schema.Type.LONG, Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN, Schema.Type.NULL -> return data
50+
Schema.Type.BYTES -> (data as ByteBuffer).array()
51+
Schema.Type.FIXED -> (data as GenericFixed).bytes()
52+
Schema.Type.ENUM, Schema.Type.STRING -> data.toString()
53+
Schema.Type.INT, Schema.Type.LONG, Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN, Schema.Type.NULL -> data
4754
else -> throw IllegalArgumentException("Cannot parse field type " + schema.type)
4855
}
4956
}
57+
58+
companion object {
59+
private val compositeTypes = EnumSet.of(Schema.Type.RECORD, Schema.Type.MAP, Schema.Type.ARRAY, Schema.Type.UNION)
60+
private val EXCLUDE_FIELD = Any()
61+
62+
private fun Any?.ifNotExcluded(apply: (Any?) -> Unit) {
63+
if (this !== EXCLUDE_FIELD) apply(this)
64+
}
65+
}
5066
}

src/main/java/org/radarbase/output/format/RecordConverterFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ interface RecordConverterFactory : Format {
4545
record: GenericRecord,
4646
writeHeader: Boolean,
4747
reader: Reader,
48+
excludeFields: Set<String> = emptySet(),
4849
): RecordConverter
4950

5051
val hasHeader: Boolean

0 commit comments

Comments
 (0)