Skip to content

Commit d647a0d

Browse files
authored
Add module to process decompression metrics (#5)
1 parent 7184b54 commit d647a0d

File tree

6 files changed

+346
-0
lines changed

6 files changed

+346
-0
lines changed

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ jreleaser = "org.jreleaser:org.jreleaser.gradle.plugin:1.20.0"
66
jspecify = "org.jspecify:jspecify:1.0.0"
77
junit = "org.junit.jupiter:junit-jupiter:5.14.0"
88
junit-launcher = "org.junit.platform:junit-platform-launcher:1.14.0"
9+
kotlin-logging-jvm = "io.github.oshai:kotlin-logging-jvm:7.0.13"
910
kotlinx-coroutines-core = "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2"
1011
slf4j = "org.slf4j:slf4j-api:2.0.17"
1112
logback-classic = "ch.qos.logback:logback-classic:1.5.21"
13+
trove4j-core = "net.sf.trove4j:core:3.1.0"
1214
zstd-jni = "com.github.luben:zstd-jni:1.5.7-6"
1315

1416
[bundles]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
plugins {
2+
`java-conventions`
3+
kotlin("jvm") version "2.3.0"
4+
}
5+
6+
dependencies {
7+
implementation(libs.kotlinx.coroutines.core)
8+
implementation(libs.trove4j.core)
9+
implementation(libs.kotlin.logging.jvm)
10+
implementation(libs.logback.classic)
11+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import kotlin.time.Duration
2+
3+
data class DecompressionEntry(val logIndex: UByte, val entryIndex: UInt, val timeToDecompress: Duration, val compressedSize: Int, val decompressedSize: Int)
4+
5+
/** B/µs */
6+
val DecompressionEntry.throughput: Double
7+
get() = decompressedSize / timeToDecompress.inWholeNanoseconds.toDouble() * 1000.0
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import gnu.trove.list.array.TIntArrayList
2+
import gnu.trove.list.array.TLongArrayList
3+
import kotlin.time.Duration
4+
import kotlin.time.Duration.Companion.nanoseconds
5+
6+
class DecompressionMetrics(private val entryAmount: Int) {
7+
var addedEntries = 0
8+
9+
var totalCompressed: Long = 0
10+
var totalDecompressed: Long = 0
11+
var totalTimeToDecompress: Duration = Duration.ZERO
12+
13+
lateinit var minDecompressTime: DecompressionEntry
14+
var averageDecompressTime: Duration = Duration.ZERO
15+
val decompressTimes = TLongArrayList(entryAmount)
16+
val medianDecompressTime: Duration get() = decompressTimes[decompressTimes.size() / 2].nanoseconds
17+
lateinit var maxDecompressTime: DecompressionEntry
18+
19+
// bytes per microsecond
20+
var minThroughput: Double = 0.0
21+
var averageThroughput: Double = 0.0
22+
var maxThroughput: Double = 0.0
23+
24+
lateinit var minCompressedSize: DecompressionEntry
25+
var averageCompressedSize: Double = 0.0
26+
val compressedSizes = TIntArrayList(entryAmount)
27+
val medianCompressedSize: Int get() = compressedSizes[compressedSizes.size() / 2]
28+
lateinit var maxCompressedSize: DecompressionEntry
29+
30+
lateinit var minDecompressedSize: DecompressionEntry
31+
var averageDecompressedSize: Double = 0.0
32+
val decompressedSizes = TIntArrayList(entryAmount)
33+
val medianDecompressedSize: Int get() = decompressedSizes[decompressedSizes.size() / 2]
34+
lateinit var maxDecompressedSize: DecompressionEntry
35+
36+
@Synchronized
37+
fun accept(entry: DecompressionEntry) {
38+
val entryIndex = ++addedEntries
39+
40+
totalCompressed += entry.compressedSize
41+
totalDecompressed += entry.decompressedSize
42+
totalTimeToDecompress += entry.timeToDecompress
43+
44+
decompressTimes.add(entry.timeToDecompress.inWholeNanoseconds)
45+
decompressedSizes.add(entry.decompressedSize)
46+
compressedSizes.add(entry.compressedSize)
47+
48+
// https://en.wikipedia.org/wiki/Moving_average#Cumulative_average
49+
fun newAverage(newEntryValue: Duration, currentAverage: Duration): Duration {
50+
return currentAverage + ((newEntryValue - currentAverage) / entryIndex)
51+
}
52+
53+
fun newAverage(newEntryValue: Double, currentAverage: Double): Double {
54+
return currentAverage + ((newEntryValue - currentAverage) / entryIndex)
55+
}
56+
57+
if (entryIndex == 1) {
58+
minDecompressTime = entry
59+
maxDecompressTime = entry
60+
averageDecompressTime = entry.timeToDecompress
61+
62+
minThroughput = entry.throughput
63+
averageThroughput = minThroughput
64+
maxThroughput = minThroughput
65+
66+
minCompressedSize = entry
67+
maxCompressedSize = entry
68+
averageCompressedSize = entry.compressedSize.toDouble()
69+
70+
minDecompressedSize = entry
71+
maxDecompressedSize = entry
72+
averageDecompressedSize = entry.decompressedSize.toDouble()
73+
} else {
74+
minDecompressTime = if (minDecompressTime.timeToDecompress > entry.timeToDecompress) entry else minDecompressTime
75+
maxDecompressTime = if (maxDecompressTime.timeToDecompress < entry.timeToDecompress) entry else maxDecompressTime
76+
averageDecompressTime = newAverage(entry.timeToDecompress, averageDecompressTime)
77+
78+
val newThroughput = entry.throughput
79+
minThroughput = if (minThroughput > newThroughput) newThroughput else minThroughput
80+
maxThroughput = if (maxThroughput < newThroughput) newThroughput else maxThroughput
81+
averageThroughput = newAverage(newThroughput, averageThroughput)
82+
83+
minCompressedSize = if (minCompressedSize.compressedSize > entry.compressedSize) entry else minCompressedSize
84+
maxCompressedSize = if (maxCompressedSize.compressedSize < entry.compressedSize) entry else maxCompressedSize
85+
averageCompressedSize = newAverage(entry.compressedSize.toDouble(), averageCompressedSize)
86+
87+
minDecompressedSize = if (minDecompressedSize.decompressedSize > entry.decompressedSize) entry else minDecompressedSize
88+
maxDecompressedSize = if (maxDecompressedSize.decompressedSize < entry.decompressedSize) entry else maxDecompressedSize
89+
averageDecompressedSize = newAverage(entry.decompressedSize.toDouble(), averageDecompressedSize)
90+
}
91+
}
92+
93+
fun finish() {
94+
if (decompressTimes.size() != entryAmount) {
95+
System.err.println("Mismatched entry amount, found ${decompressTimes.size()}, expected $entryAmount")
96+
}
97+
98+
decompressTimes.sort()
99+
decompressedSizes.sort()
100+
compressedSizes.sort()
101+
}
102+
103+
override fun toString(): String {
104+
return "DecompressionMetrics(\n\taddedEntries=$addedEntries,\n\ttotalCompressed=$totalCompressed,\n\ttotalDecompressed=$totalDecompressed,\n\ttotalTimeToDecompress=$totalTimeToDecompress,\n\n\tminDecompressTime=$minDecompressTime,\n\taverageDecompressTime=$averageDecompressTime,\n\tmedianDecompressTime=$medianDecompressTime,\n\tmaxDecompressTime=$maxDecompressTime,\n\n\tminThroughput=$minThroughput,\n\taverageThroughput=$averageThroughput,\n\tmaxThroughput=$maxThroughput,\n\n\tminCompressedSize=$minCompressedSize,\n\taverageCompressedSize=$averageCompressedSize,\n\tmedianCompressedSize=$medianCompressedSize,\n\tmaxCompressedSize=$maxCompressedSize,\n\n\tminDecompressedSize=$minDecompressedSize,\n\taverageDecompressedSize=$averageDecompressedSize,\n\tmedianDecompressedSize=$medianDecompressedSize,\n\tmaxDecompressedSize=$maxDecompressedSize\n)"
105+
}
106+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import io.github.oshai.kotlinlogging.KotlinLogging
2+
import kotlinx.coroutines.*
3+
import java.io.DataInputStream
4+
import java.nio.file.Files
5+
import java.nio.file.Path
6+
import kotlin.io.path.Path
7+
import kotlin.io.path.inputStream
8+
import kotlin.io.path.name
9+
import kotlin.io.path.walk
10+
import kotlin.time.Duration.Companion.nanoseconds
11+
12+
private const val entrySize = 8 + 4 + 4 + 2
13+
14+
private val logger = KotlinLogging.logger { }
15+
16+
private val shardFolderRegex = Regex("""shard-(\d+)-(?:zstd|zlib)""")
17+
private val logFileRegex = Regex("""log-(\d+)\.bin""")
18+
19+
private const val minThroughput: Double = 100.0
20+
21+
suspend fun main(args: Array<String>) {
22+
require(args.size == 1) {
23+
"One argument must be present for the logs input directory (decompression-logs)"
24+
}
25+
26+
val dispatcher = Dispatchers.IO.limitedParallelism(12)
27+
28+
val zstdSmallShards = arrayListOf<List<Path>>()
29+
val zstdBigShards = arrayListOf<List<Path>>()
30+
val zlibShards = arrayListOf<List<Path>>()
31+
32+
val logsDirectory = Path(args[0])
33+
withContext(Dispatchers.IO) {
34+
Files.walk(logsDirectory, 1)
35+
.filter { it.name.matches(shardFolderRegex) }
36+
.sorted(Comparator.comparingInt { shardFolder ->
37+
shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt()
38+
})
39+
.forEach { shardFolder ->
40+
val shardId = shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt()
41+
val shards = when (shardId % 3) {
42+
0 -> zstdSmallShards
43+
1 -> zstdBigShards
44+
2 -> zlibShards
45+
else -> error("Unhandled shard $shardId")
46+
}
47+
48+
shardFolder.walk()
49+
.filter { it.name.matches(logFileRegex) }
50+
.sortedBy { logFile ->
51+
// Uses a timestamp not an index
52+
logFileRegex.matchEntire(logFile.name)!!.groupValues[1].toLong()
53+
}
54+
.toList()
55+
.also(shards::add)
56+
}
57+
}
58+
59+
fun List<List<Path>>.computeEntryCount(): Int {
60+
return sumOf { shard ->
61+
shard.sumOf { logFile ->
62+
var retainedEntries = 0
63+
readLogThroughputs(logFile) { throughput ->
64+
if (throughput > minThroughput) {
65+
retainedEntries++
66+
}
67+
}
68+
retainedEntries
69+
}
70+
}
71+
}
72+
73+
val zstdSmallMetrics = DecompressionMetrics(zstdSmallShards.computeEntryCount())
74+
val zstdBigMetrics = DecompressionMetrics(zstdBigShards.computeEntryCount())
75+
val zlibMetrics = DecompressionMetrics(zlibShards.computeEntryCount())
76+
77+
coroutineScope {
78+
launch(dispatcher) { processShard(zstdSmallShards, zstdSmallMetrics) }
79+
launch(dispatcher) { processShard(zstdBigShards, zstdBigMetrics) }
80+
launch(dispatcher) { processShard(zlibShards, zlibMetrics) }
81+
}
82+
83+
coroutineScope {
84+
launch(dispatcher) { zstdSmallMetrics.finish() }
85+
launch(dispatcher) { zstdBigMetrics.finish() }
86+
launch(dispatcher) { zlibMetrics.finish() }
87+
}
88+
89+
println("zstdSmall = $zstdSmallMetrics")
90+
println("zstdBig = $zstdBigMetrics")
91+
println("zlib = $zlibMetrics")
92+
93+
fun DecompressionEntry.decompressTimeStats(): String = "$timeToDecompress<br>${compressedSize.prettySize()} -> ${decompressedSize.prettySize()}"
94+
fun DecompressionEntry.compressedStats(): String = "**${compressedSize.prettySize()}** -> ${decompressedSize.prettySize()}<br>$timeToDecompress"
95+
fun DecompressionEntry.decompressedStats(): String = "${compressedSize.prettySize()} -> **${decompressedSize.prettySize()}**<br>$timeToDecompress"
96+
97+
println("""
98+
| Stat | Zlib | Zstd (8K buf) | Zstd (128K buf) |
99+
|------|------|---------------|-----------------|
100+
| Entries | ${zlibMetrics.addedEntries.pretty()} | ${zstdSmallMetrics.addedEntries.pretty()} | ${zstdBigMetrics.addedEntries.pretty()} |
101+
| Total compressed | ${zlibMetrics.totalCompressed.prettySize()} | ${zstdSmallMetrics.totalCompressed.prettySize()} | ${zstdBigMetrics.totalCompressed.prettySize()} |
102+
| Total decompressed | ${zlibMetrics.totalDecompressed.prettySize()} | ${zstdSmallMetrics.totalDecompressed.prettySize()} | ${zstdBigMetrics.totalDecompressed.prettySize()} |
103+
| Total time to decompress | ${zlibMetrics.totalTimeToDecompress} | ${zstdSmallMetrics.totalTimeToDecompress} | ${zstdBigMetrics.totalTimeToDecompress} |
104+
| Min decompress time | ${zlibMetrics.minDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.minDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.minDecompressTime.decompressTimeStats()} |
105+
| Average decompress time | ${zlibMetrics.averageDecompressTime} | ${zstdSmallMetrics.averageDecompressTime} | ${zstdBigMetrics.averageDecompressTime} |
106+
| Median decompress time | ${zlibMetrics.medianDecompressTime} | ${zstdSmallMetrics.medianDecompressTime} | ${zstdBigMetrics.medianDecompressTime} |
107+
| Max decompress time | ${zlibMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.maxDecompressTime.decompressTimeStats()} |
108+
| Min throughput (B/µs) | ${zlibMetrics.minThroughput} | ${zstdSmallMetrics.minThroughput} | ${zstdBigMetrics.minThroughput} |
109+
| Average throughput (B/µs) | ${zlibMetrics.averageThroughput} | ${zstdSmallMetrics.averageThroughput} | ${zstdBigMetrics.averageThroughput} |
110+
| Max throughput (B/µs) | ${zlibMetrics.maxThroughput} | ${zstdSmallMetrics.maxThroughput} | ${zstdBigMetrics.maxThroughput} |
111+
| Min compressed size | ${zlibMetrics.minCompressedSize.compressedStats()} | ${zstdSmallMetrics.minCompressedSize.compressedStats()} | ${zstdBigMetrics.minCompressedSize.compressedStats()} |
112+
| Average compressed size (B) | ${zlibMetrics.averageCompressedSize} | ${zstdSmallMetrics.averageCompressedSize} | ${zstdBigMetrics.averageCompressedSize} |
113+
| Median compressed size (B) | ${zlibMetrics.medianCompressedSize} | ${zstdSmallMetrics.medianCompressedSize} | ${zstdBigMetrics.medianCompressedSize} |
114+
| Max compressed size | ${zlibMetrics.maxCompressedSize.compressedStats()} | ${zstdSmallMetrics.maxCompressedSize.compressedStats()} | ${zstdBigMetrics.maxCompressedSize.compressedStats()} |
115+
| Min decompressed size | ${zlibMetrics.minDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.minDecompressedSize.decompressedStats()} | ${zstdBigMetrics.minDecompressedSize.decompressedStats()} |
116+
| Average decompressed size (B) | ${zlibMetrics.averageDecompressedSize} | ${zstdSmallMetrics.averageDecompressedSize} | ${zstdBigMetrics.averageDecompressedSize} |
117+
| Median decompressed size (B) | ${zlibMetrics.medianDecompressedSize} | ${zstdSmallMetrics.medianDecompressedSize} | ${zstdBigMetrics.medianDecompressedSize} |
118+
| Max decompressed size | ${zlibMetrics.maxDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.maxDecompressedSize.decompressedStats()} | ${zstdBigMetrics.maxDecompressedSize.decompressedStats()} |
119+
""".trimIndent())
120+
}
121+
122+
private fun CoroutineScope.processShard(shards: List<List<Path>>, metrics: DecompressionMetrics) {
123+
var i = 0
124+
for (shard in shards) {
125+
val shardId = i++
126+
127+
shard.forEachIndexed { logIndex, logFile ->
128+
launch {
129+
logger.info { "Reading shard $shardId file $logIndex" }
130+
131+
val logIndexByte = logIndex.toUByte()
132+
readLogFile(logFile, logIndexByte) { entry ->
133+
// Take only those with a throughput of 100 bytes per microsecond
134+
if (entry.throughput <= minThroughput) return@readLogFile
135+
metrics.accept(entry)
136+
}
137+
}
138+
}
139+
}
140+
}
141+
142+
private fun Int.pretty(): String {
143+
return toLong().pretty()
144+
}
145+
146+
private fun Long.pretty(): String {
147+
val str = toString()
148+
if (str.length <= 3) return str
149+
150+
return buildString {
151+
str.reversed().forEachIndexed { index, ch ->
152+
if (index != 0 && index % 3 == 0) {
153+
append('_')
154+
}
155+
append(ch)
156+
}
157+
}.reversed()
158+
}
159+
160+
private fun Int.prettySize(): String {
161+
return toLong().prettySize()
162+
}
163+
164+
private fun Long.prettySize(): String {
165+
if (this > 10000) {
166+
var prettySize = this.toDouble() / 1024.0
167+
var prettyUnit = "KB"
168+
if (prettySize > 1024.0) {
169+
prettySize /= 1024.0
170+
prettyUnit = "MB"
171+
}
172+
if (prettySize > 1024.0) {
173+
prettySize /= 1024.0
174+
prettyUnit = "GB"
175+
}
176+
return "$this B (${"%.1f".format(prettySize)} ${prettyUnit})"
177+
} else {
178+
return "$this B"
179+
}
180+
}
181+
182+
private fun readLogFile(logFile: Path, logIndex: UByte, entryConsumer: (DecompressionEntry) -> Unit) {
183+
logFile.inputStream().buffered().let(::DataInputStream).use { input ->
184+
var entryIndex = 0u
185+
var available = input.available()
186+
while (available > 0) {
187+
val timeToDecompress = input.readLong()
188+
val compressedSize = input.readInt()
189+
val decompressedSize = input.readInt()
190+
input.skipBytes(2) // Separator
191+
192+
entryConsumer(DecompressionEntry(logIndex, entryIndex++, timeToDecompress.nanoseconds, compressedSize, decompressedSize))
193+
194+
available -= entrySize
195+
if (available <= 0) {
196+
available = input.available()
197+
}
198+
}
199+
}
200+
}
201+
202+
private fun readLogThroughputs(logFile: Path, consumer: (throughput: Double) -> Unit) {
203+
logFile.inputStream().buffered().let(::DataInputStream).use { input ->
204+
var available = input.available()
205+
while (available > 0) {
206+
val timeToDecompress = input.readLong()
207+
input.skipBytes(4) // Compressed size
208+
val decompressedSize = input.readInt()
209+
input.skipBytes(2) // Separator
210+
211+
consumer(decompressedSize / timeToDecompress.toDouble() * 1000.0)
212+
213+
available -= entrySize
214+
if (available <= 0) {
215+
available = input.available()
216+
}
217+
}
218+
}
219+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ include(":api")
88
include(":test-data")
99
include(":test-data-generator")
1010
include(":benchmarks", ":benchmarks:results-converter")
11+
include(":live-metrics-processor")

0 commit comments

Comments
 (0)