Skip to content

Commit e124c13

Browse files
committed
memory-efficient deduplication if fields are chosen
1 parent 0bce651 commit e124c13

File tree

1 file changed

+50
-38
lines changed

1 file changed

+50
-38
lines changed

src/main/java/org/radarbase/output/format/CsvAvroConverterFactory.kt

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.radarbase.output.util.TimeUtil.parseDate
88
import org.radarbase.output.util.TimeUtil.parseDateTime
99
import org.radarbase.output.util.TimeUtil.parseTime
1010
import org.radarbase.output.util.TimeUtil.toDouble
11+
import org.slf4j.LoggerFactory
1112
import java.io.*
1213
import java.nio.file.Files
1314
import java.nio.file.Path
@@ -19,24 +20,49 @@ class CsvAvroConverterFactory: RecordConverterFactory {
1920

2021
@Throws(IOException::class)
2122
override fun deduplicate(fileName: String, source: Path, target: Path, compression: Compression, distinctFields: Set<String>, ignoreFields: Set<String>) {
22-
val (header, lines) = Files.newInputStream(source).use {
23-
processLines(it, compression) { header, lines ->
24-
Pair(header, lines.toList())
23+
val (header, lineIndexes) = Files.newInputStream(source).use { input ->
24+
processLines(input, compression) { header, lines ->
25+
if (header == null) return
26+
val fields = fieldIndexes(header, distinctFields, ignoreFields)
27+
var count = 0
28+
val lineMap = lines
29+
.onEach { count += 1 }
30+
.mapIndexed { i, line -> Pair(ArrayWrapper(line.byIndex(fields)), i) }
31+
.toMap(HashMap())
32+
33+
if (lineMap.size == count) {
34+
logger.debug("File {} is already deduplicated. Skipping.", fileName)
35+
return
36+
}
37+
38+
Pair(header, lineMap.values
39+
.toIntArray()
40+
.apply { sort() })
2541
}
2642
}
2743

28-
if (header == null) return
29-
30-
val indexes = fieldIndexes(header, distinctFields, ignoreFields)
31-
val distinct = lines.distinctByLast { line -> ArrayWrapper(line.byIndex(indexes)) }
44+
Files.newInputStream(source).use { input ->
45+
processLines(input, compression) { _, lines ->
46+
var indexIndex = 0
47+
writeLines(target, fileName, compression, sequenceOf(header) + lines.filterIndexed { i, _ ->
48+
if (indexIndex < lineIndexes.size && lineIndexes[indexIndex] == i) {
49+
indexIndex += 1
50+
true
51+
} else false
52+
})
53+
}
54+
}
55+
}
3256

57+
private fun writeLines(target: Path, fileName: String, compression: Compression, lines: Sequence<Array<String>>) {
3358
Files.newOutputStream(target).use { fileOut ->
3459
BufferedOutputStream(fileOut).use { bufOut ->
3560
compression.compress(fileName, bufOut).use { zipOut ->
3661
OutputStreamWriter(zipOut).use { writer ->
3762
CSVWriter(writer).use { csvWriter ->
38-
csvWriter.writeNext(header, false)
39-
csvWriter.writeAll(distinct, false)
63+
lines.forEach {
64+
csvWriter.writeNext(it, false)
65+
}
4066
}
4167
}
4268
}
@@ -45,15 +71,14 @@ class CsvAvroConverterFactory: RecordConverterFactory {
4571
}
4672

4773
private val fieldTimeParsers = listOf(
48-
Pair<String, (String) -> Double?>("value.time", { it.parseTime() }),
49-
Pair<String, (String) -> Double?>("key.timeStart", { it.parseTime() }),
50-
Pair<String, (String) -> Double?>("key.start", { it.parseTime() }),
51-
Pair<String, (String) -> Double?>("value.dateTime", { it.parseDateTime()?.toDouble() }),
52-
Pair<String, (String) -> Double?>("value.date", { it.parseDate()?.toDouble() }),
53-
Pair<String, (String) -> Double?>("value.timeReceived", { it.parseTime() }),
54-
Pair<String, (String) -> Double?>("value.timeReceived", { it.parseTime() }),
55-
Pair<String, (String) -> Double?>("value.timeCompleted", { it.parseTime() })
56-
)
74+
fieldTimeParser("value.time") { it.parseTime() },
75+
fieldTimeParser("key.timeStart") { it.parseTime() },
76+
fieldTimeParser("key.start") { it.parseTime() },
77+
fieldTimeParser("value.dateTime") { it.parseDateTime()?.toDouble() },
78+
fieldTimeParser("value.date") { it.parseDate()?.toDouble() },
79+
fieldTimeParser("value.timeReceived") { it.parseTime() },
80+
fieldTimeParser("value.timeReceived") { it.parseTime() },
81+
fieldTimeParser("value.timeCompleted") { it.parseTime() })
5782

5883
override fun readTimeSeconds(source: InputStream, compression: Compression): Pair<Array<String>, List<Double>>? {
5984
return processLines(source, compression) { header, lines ->
@@ -107,6 +132,10 @@ class CsvAvroConverterFactory: RecordConverterFactory {
107132
override val hasHeader: Boolean = true
108133

109134
companion object {
135+
private val logger = LoggerFactory.getLogger(CsvAvroConverterFactory::class.java)
136+
137+
private fun fieldTimeParser(name: String, method: (String) -> Double?) = Pair(name, method)
138+
110139
private inline fun <T> processLines(
111140
input: InputStream,
112141
compression: Compression,
@@ -116,32 +145,15 @@ class CsvAvroConverterFactory: RecordConverterFactory {
116145
BufferedReader(inReader).use { bufReader ->
117146
CSVReader(bufReader).use { csvReader ->
118147
val header = csvReader.readNext()
119-
val lines = if (header == null) emptySequence() else generateSequence { csvReader.readNext() }
148+
val lines = if (header != null) {
149+
generateSequence { csvReader.readNext() }
150+
} else emptySequence()
120151
process(header, lines)
121152
}
122153
}
123154
}
124155
}
125156

126-
127-
/**
128-
* Make a copy of the current list that only contains distinct entries.
129-
* For duplicate entries, the last entry in the list is selected. Being distinct is
130-
* determined by the provided mapping.
131-
* @param <T> type of data.
132-
* @param <V> type that is mapped to for distinguishing. This type should have valid
133-
* hashCode and equals implementations.
134-
*/
135-
private inline fun <T, V> List<T>.distinctByLast(mapping: (T) -> V): List<T> {
136-
val map: MutableMap<V, Int> = HashMap()
137-
forEachIndexed { i, v ->
138-
map[mapping(v)] = i
139-
}
140-
return map.values.toIntArray()
141-
.apply { sort() }
142-
.map { i -> this[i] }
143-
}
144-
145157
private fun fieldIndexes(
146158
header: Array<String>,
147159
usingFields: Set<String>,

0 commit comments

Comments
 (0)