Skip to content

Commit 8cf4302

Browse files
committed
Deduplication optimization
1 parent e48d3c3 commit 8cf4302

File tree

1 file changed

+58
-50
lines changed

1 file changed

+58
-50
lines changed

src/main/java/org/radarbase/hdfs/format/CsvAvroConverter.kt

Lines changed: 58 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ package org.radarbase.hdfs.format
1919
import com.opencsv.CSVReader
2020
import com.opencsv.CSVWriter
2121
import java.nio.ByteBuffer
22-
import java.util.ArrayList
23-
import java.util.Base64
24-
import java.util.LinkedHashMap
2522
import org.apache.avro.Schema
23+
import org.apache.avro.Schema.Type.*
2624
import org.apache.avro.generic.GenericData
2725
import org.apache.avro.generic.GenericFixed
2826
import org.apache.avro.generic.GenericRecord
2927
import org.radarbase.hdfs.compression.Compression
30-
import org.radarbase.hdfs.format.CsvAvroConverter.Companion.removeDuplicates
3128
import java.io.*
3229
import java.nio.file.Files
3330
import java.nio.file.Path
31+
import java.util.*
32+
import java.util.regex.Pattern
33+
import kotlin.collections.HashMap
3434

3535
/**
3636
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
@@ -53,7 +53,8 @@ class CsvAvroConverter(
5353
createHeaders(record)
5454
.also { csvWriter.writeNext(it.toTypedArray(), false) }
5555
} else {
56-
CSVReader(reader).readNext().toList()
56+
CSVReader(reader).use { requireNotNull(it.readNext()) { "No header found" } }
57+
.toList()
5758
}
5859

5960
values = ArrayList(headers.size)
@@ -108,48 +109,44 @@ class CsvAvroConverter(
108109

109110
private fun convertAvro(values: MutableList<String>, data: Any?, schema: Schema, prefix: String) {
110111
when (schema.type) {
111-
Schema.Type.RECORD -> {
112+
RECORD -> {
112113
val record = data as GenericRecord
113114
val subSchema = record.schema
114115
for (field in subSchema.fields) {
115116
val subData = record.get(field.pos())
116117
convertAvro(values, subData, field.schema(), prefix + '.'.toString() + field.name())
117118
}
118119
}
119-
Schema.Type.MAP -> {
120+
MAP -> {
120121
val valueType = schema.valueType
121122
for ((key, value) in data as Map<*, *>) {
122123
val name = "$prefix.$key"
123124
convertAvro(values, value, valueType, name)
124125
}
125126
}
126-
Schema.Type.ARRAY -> {
127+
ARRAY -> {
127128
val itemType = schema.elementType
128129
for ((i, orig) in (data as List<*>).withIndex()) {
129130
convertAvro(values, orig, itemType, "$prefix.$i")
130131
}
131132
}
132-
Schema.Type.UNION -> {
133+
UNION -> {
133134
val type = GenericData().resolveUnion(schema, data)
134135
convertAvro(values, data, schema.types[type], prefix)
135136
}
136-
Schema.Type.BYTES -> {
137+
BYTES -> {
137138
checkHeader(prefix, values.size)
138139
values.add(BASE64_ENCODER.encodeToString((data as ByteBuffer).array()))
139140
}
140-
Schema.Type.FIXED -> {
141+
FIXED -> {
141142
checkHeader(prefix, values.size)
142143
values.add(BASE64_ENCODER.encodeToString((data as GenericFixed).bytes()))
143144
}
144-
Schema.Type.STRING -> {
145+
STRING, ENUM, INT, LONG, DOUBLE, FLOAT, BOOLEAN -> {
145146
checkHeader(prefix, values.size)
146147
values.add(data.toString())
147148
}
148-
Schema.Type.ENUM, Schema.Type.INT, Schema.Type.LONG, Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN -> {
149-
checkHeader(prefix, values.size)
150-
values.add(data.toString())
151-
}
152-
Schema.Type.NULL -> {
149+
NULL -> {
153150
checkHeader(prefix, values.size)
154151
values.add("")
155152
}
@@ -184,7 +181,7 @@ class CsvAvroConverter(
184181
reader -> CSVReader(reader).use {
185182
csvReader ->
186183
val header = csvReader.readNext() ?: return
187-
val lines = generateSequence { csvReader.readNext() }.toMutableList()
184+
val lines = generateSequence { csvReader.readNext() }.toList()
188185
Pair(header, lines)
189186
} } } } }
190187

@@ -209,47 +206,43 @@ class CsvAvroConverter(
209206
override val hasHeader: Boolean = true
210207
}
211208

212-
private fun List<Array<String>>.removeDuplicates(header: Array<String>, usingFields: Set<String>, ignoreFields: Set<String>): List<Array<String>> {
209+
private fun List<Array<String>>.removeDuplicates(
210+
header: Array<String>,
211+
usingFields: Set<String>,
212+
ignoreFields: Set<String>
213+
): List<Array<String>> {
213214
if (usingFields.isNotEmpty()) {
214-
val fieldIndexes = usingFields
215-
.map { f -> header.indexOf(f) }
216-
.takeIf { indexes -> indexes.none { it == -1 } }
217-
?: emptyList()
215+
val fieldIndexes = usingFields.map { f -> header.indexOf(f) }.toIntArray()
218216

219-
if (fieldIndexes.isNotEmpty()) {
220-
return distinctByLast { line -> fieldIndexes.joinToString { line[it] } }
217+
if (fieldIndexes.none { it == -1 }) {
218+
return distinctByLast { line -> line.mapToArrayWrapper(fieldIndexes) }
221219
}
222220
}
223221

224222
if (ignoreFields.isNotEmpty()) {
225-
val ignoreIndexes = ignoreFields
226-
.map { f -> header.indexOf(f) }
227-
.takeIf { indexes -> indexes.none { it == -1 } }
228-
?.toSet()
229-
?: emptySet()
230-
231-
if (ignoreIndexes.isNotEmpty()) {
232-
return distinctByLast { line -> line.filterIndexed { i, _ -> i !in ignoreIndexes }.joinToString() }
223+
val ignoreIndexes = ignoreFields.map { f -> header.indexOf(f) }
224+
225+
if (ignoreIndexes.any { it != -1 }) {
226+
val fieldIndexes = (header.indices - ignoreIndexes).toIntArray()
227+
return distinctByLast { line -> line.mapToArrayWrapper(fieldIndexes) }
233228
}
234229
}
235230

236-
return distinctByLast { it.joinToString() }
231+
return distinctByLast { ArrayWrapper(it) }
237232
}
238233

239-
private fun <T, V> List<T>.distinctByLast(mapping: (T) -> V): List<T> {
234+
private inline fun <reified T> Array<T>.mapToArrayWrapper(indices: IntArray): ArrayWrapper<T> {
235+
return ArrayWrapper(Array(indices.size) { i -> this[indices[i]] })
236+
}
237+
238+
private inline fun <T, V> List<T>.distinctByLast(mapping: (T) -> V): List<T> {
240239
val map: MutableMap<V, Int> = HashMap()
241-
val mappings = ArrayList<V>(size)
242240
forEachIndexed { i, v ->
243-
val mapped = mapping(v)
244-
map[mapped] = i
245-
mappings.add(mapped)
246-
}
247-
val distinct = ArrayList<T>(map.size)
248-
mappings.forEachIndexed { i, mapped ->
249-
if (i == map[mapped])
250-
distinct.add(this[i])
241+
map[mapping(v)] = i
251242
}
252-
return distinct
243+
return map.values.toIntArray()
244+
.apply { sort() }
245+
.map { i -> this[i] }
253246
}
254247

255248
internal fun createHeaders(record: GenericRecord): List<String> {
@@ -263,34 +256,49 @@ class CsvAvroConverter(
263256

264257
private fun createHeader(headers: MutableList<String>, data: Any?, schema: Schema, prefix: String) {
265258
when (schema.type) {
266-
Schema.Type.RECORD -> {
259+
RECORD -> {
267260
val record = data as GenericRecord
268261
val subSchema = record.schema
269262
for (field in subSchema.fields) {
270263
val subData = record.get(field.pos())
271264
createHeader(headers, subData, field.schema(), prefix + '.'.toString() + field.name())
272265
}
273266
}
274-
Schema.Type.MAP -> {
267+
MAP -> {
275268
val valueType = schema.valueType
276269
for ((key, value) in data as Map<*, *>) {
277270
val name = "$prefix.$key"
278271
createHeader(headers, value, valueType, name)
279272
}
280273
}
281-
Schema.Type.ARRAY -> {
274+
ARRAY -> {
282275
val itemType = schema.elementType
283276
for ((i, orig) in (data as List<*>).withIndex()) {
284277
createHeader(headers, orig, itemType, "$prefix.$i")
285278
}
286279
}
287-
Schema.Type.UNION -> {
280+
UNION -> {
288281
val type = GenericData().resolveUnion(schema, data)
289282
createHeader(headers, data, schema.types[type], prefix)
290283
}
291-
Schema.Type.BYTES, Schema.Type.FIXED, Schema.Type.ENUM, Schema.Type.STRING, Schema.Type.INT, Schema.Type.LONG, Schema.Type.DOUBLE, Schema.Type.FLOAT, Schema.Type.BOOLEAN, Schema.Type.NULL -> headers.add(prefix)
284+
BYTES, FIXED, ENUM, STRING, INT, LONG, DOUBLE, FLOAT, BOOLEAN, NULL -> headers.add(prefix)
292285
else -> throw IllegalArgumentException("Cannot parse field type " + schema.type)
293286
}
294287
}
295288
}
289+
290+
private class ArrayWrapper<T>(val values: Array<T>) {
291+
private val hashCode = values.contentHashCode()
292+
293+
override fun equals(other: Any?): Boolean {
294+
if (this === other) return true
295+
if (javaClass != other?.javaClass) return false
296+
297+
other as ArrayWrapper<*>
298+
299+
return values.contentEquals(other.values)
300+
}
301+
302+
override fun hashCode(): Int = hashCode
303+
}
296304
}

0 commit comments

Comments
 (0)