Skip to content

Commit 8f93b64

Browse files
author
Jonathan Pearlin
authored
chore: refactor additional stat tracking (#69222)
1 parent 0bb3665 commit 8f93b64

30 files changed

+663
-490
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Version 0.1.68
2+
3+
**Load CDK**
4+
5+
* Refactor additional state statistic tracking.
6+
17
## Version 0.1.67
28

39
**Load CDK**

airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ abstract class BaseMockBasicFunctionalityIntegrationTest(
8080
} else {
8181
UnknownTypesBehavior.PASS_THROUGH
8282
},
83-
includesAdditionalStats = false,
8483
) {
8584
@Test
8685
override fun testBasicWrite() {

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/aggregate/AggregateStore.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class AggregateStore(
2929
val (_, agg, counts, bytes, timeTrigger, countTrigger, bytesTrigger) = getOrCreate(key)
3030

3131
agg.accept(record)
32-
counts.increment(record.partitionKey, 1)
33-
bytes.increment(record.partitionKey, record.sizeBytes)
32+
counts.increment(record.partitionKey, 1.0)
33+
bytes.increment(record.partitionKey, record.sizeBytes.toDouble())
3434
countTrigger.increment(1)
3535
bytesTrigger.increment(record.sizeBytes)
3636
timeTrigger.update(record.emittedAtMs)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/stages/ParseStage.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,28 @@
44

55
package io.airbyte.cdk.load.dataflow.stages
66

7+
import io.airbyte.cdk.load.data.AirbyteValue
78
import io.airbyte.cdk.load.dataflow.pipeline.DataFlowStage
89
import io.airbyte.cdk.load.dataflow.pipeline.DataFlowStageIO
10+
import io.airbyte.cdk.load.dataflow.state.PartitionKey
911
import io.airbyte.cdk.load.dataflow.transform.RecordDTO
10-
import io.airbyte.cdk.load.dataflow.transform.RecordMunger
12+
import io.airbyte.cdk.load.dataflow.transform.medium.ConversionInput
13+
import io.airbyte.cdk.load.dataflow.transform.medium.JsonConverter
14+
import io.airbyte.cdk.load.dataflow.transform.medium.ProtobufConverter
15+
import io.airbyte.cdk.load.message.DestinationRecordProtobufSource
16+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1117
import jakarta.inject.Named
1218
import jakarta.inject.Singleton
1319

1420
@Named("parse")
1521
@Singleton
1622
class ParseStage(
17-
private val munger: RecordMunger,
23+
private val jsonConverter: JsonConverter,
24+
private val protobufConverter: ProtobufConverter
1825
) : DataFlowStage {
1926
override suspend fun apply(input: DataFlowStageIO): DataFlowStageIO {
2027
val raw = input.raw!!
21-
val fields = munger.transformForDest(raw)
22-
28+
val fields = transform(raw, input.partitionKey!!)
2329
return input.apply {
2430
munged =
2531
RecordDTO(
@@ -30,4 +36,15 @@ class ParseStage(
3036
)
3137
}
3238
}
39+
40+
private fun transform(
41+
msg: DestinationRecordRaw,
42+
partitionKey: PartitionKey
43+
): Map<String, AirbyteValue> {
44+
val input = ConversionInput(msg = msg, partitionKey = partitionKey)
45+
return when (msg.rawData) {
46+
is DestinationRecordProtobufSource -> protobufConverter.convert(input)
47+
else -> jsonConverter.convert(input)
48+
}
49+
}
3350
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/StateHistogram.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ data class PartitionKey(
2222
val id: String,
2323
)
2424

25-
open class Histogram<T>(private val map: ConcurrentMap<T, Long> = ConcurrentHashMap()) {
26-
fun increment(key: T, quantity: Long): Histogram<T> =
27-
this.apply { map.merge(key, quantity, Long::plus) }
25+
open class Histogram<T>(private val map: ConcurrentMap<T, Double> = ConcurrentHashMap()) {
26+
fun increment(key: T, quantity: Double): Histogram<T> =
27+
this.apply { map.merge(key, quantity, Double::plus) }
2828

2929
fun merge(other: Histogram<T>): Histogram<T> =
30-
this.apply { other.map.forEach { map.merge(it.key, it.value, Long::plus) } }
30+
this.apply { other.map.forEach { map.merge(it.key, it.value, Double::plus) } }
3131

32-
fun get(key: T): Long? = map[key]
32+
fun get(key: T): Double? = map[key]
3333

34-
fun remove(key: T): Long? = map.remove(key)
34+
fun remove(key: T): Double? = map.remove(key)
35+
36+
fun toMap(): Map<T, Double> = map.toMap()
3537
}
3638

39+
typealias AdditionalStatsHistogram = Histogram<String>
40+
3741
typealias StateHistogram = Histogram<StateKey>
3842

3943
typealias PartitionHistogram = Histogram<PartitionKey>

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/StateHistogramStore.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,29 @@ class StateHistogramStore {
1919
}
2020

2121
fun acceptExpectedCounts(key: StateKey, count: Long): StateHistogram {
22-
val inner = ConcurrentHashMap<StateKey, Long>()
23-
inner[key] = count
22+
val inner = ConcurrentHashMap<StateKey, Double>()
23+
inner[key] = count.toDouble()
2424

2525
return expected.merge(StateHistogram(inner))
2626
}
2727

2828
fun isComplete(key: StateKey): Boolean {
2929
val expectedCount = expected.get(key)
30-
val flushedCount = key.partitionKeys.sumOf { flushed.get(it) ?: 0 }
30+
val flushedCount = key.partitionKeys.sumOf { flushed.get(it) ?: 0.0 }
3131

3232
return expectedCount == flushedCount
3333
}
3434

3535
// mirrors isComplete. Purely for debugging purposes.
3636
fun whyIsStateIncomplete(key: StateKey): String {
3737
val expectedCount = expected.get(key)
38-
val partitionFlushCounts = key.partitionKeys.map { flushed.get(it) ?: 0 }
38+
val partitionFlushCounts = key.partitionKeys.map { flushed.get(it) ?: 0.0 }
3939
val flushedCount = partitionFlushCounts.sum()
4040
return "expectedCount $expectedCount does not equal flushedCount $flushedCount (by partition: $partitionFlushCounts)"
4141
}
4242

4343
fun remove(key: StateKey): Long? {
4444
key.partitionKeys.forEach { flushed.remove(it) }
45-
return expected.remove(key)
45+
return expected.remove(key)?.toLong()
4646
}
4747
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/stats/CommitedStatsStore.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ class CommittedStatsStore {
5151
internal fun removeLiveStats(s: DestinationStream.Descriptor, p: PartitionKey): EmissionStats? =
5252
liveStats[s]?.let {
5353
EmissionStats(
54-
count = it.counts.remove(p) ?: 0,
55-
bytes = it.bytes.remove(p) ?: 0,
54+
count = it.counts.remove(p)?.toLong() ?: 0,
55+
bytes = it.bytes.remove(p)?.toLong() ?: 0,
5656
)
5757
}
5858
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.dataflow.state.stats
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.dataflow.state.AdditionalStatsHistogram
9+
import io.airbyte.cdk.load.dataflow.state.Histogram
10+
import io.airbyte.cdk.load.dataflow.state.PartitionKey
11+
import jakarta.inject.Singleton
12+
import java.util.concurrent.ConcurrentHashMap
13+
14+
/**
15+
* A singleton class responsible for managing additional state statistics, specifically histograms
16+
* of metrics associated with partitions and destination streams. This class provides methods to add
17+
* metrics values, retrieve combined statistics, and maintain metrics completeness with default
18+
* values.
19+
*/
20+
@Singleton
21+
class StateAdditionalStatsStore {
22+
23+
private val store =
24+
ConcurrentHashMap<
25+
PartitionKey, ConcurrentHashMap<DestinationStream.Descriptor, AdditionalStatsHistogram>
26+
>()
27+
28+
/**
29+
* Adds a value to the histogram associated with the given partition and stream descriptor for
30+
* the specified metric.
31+
*
32+
* @param partitionKey The key representing the partition for which the histogram is maintained.
33+
* @param streamDescriptor The descriptor identifying the specific destination stream.
34+
* @param metric The metric whose value is to be incremented in the histogram.
35+
* @param value The value to increment for the specified metric.
36+
*/
37+
fun add(
38+
partitionKey: PartitionKey,
39+
streamDescriptor: DestinationStream.Descriptor,
40+
metric: ObservabilityMetrics,
41+
value: Double
42+
) {
43+
store.putIfAbsent(partitionKey, ConcurrentHashMap())
44+
store[partitionKey]?.let { partitionStats ->
45+
partitionStats.putIfAbsent(streamDescriptor, Histogram())
46+
partitionStats[streamDescriptor]?.increment(metric.metricName, value)
47+
}
48+
}
49+
50+
/**
51+
* Drains the accumulated statistics for the specified destination stream across the provided
52+
* partition keys, returning a combined histogram of all relevant metrics. For any metric values
53+
* not present, default values are added. This is to ensure that the metrics are always
54+
* published as part of the state message with a zero value if no value has been recorded since
55+
* the last state message publish event.
56+
*
57+
* @param partitionKeys The list of partition keys to process, each representing a distinct
58+
* partition.
59+
* @param stream The descriptor of the destination stream whose statistics are to be drained.
60+
* @return A `Histogram` containing the combined metrics for the specified destination stream.
61+
*/
62+
fun drain(
63+
partitionKeys: List<PartitionKey>,
64+
): Map<DestinationStream.Descriptor, AdditionalStatsHistogram> {
65+
val accumulator =
66+
mutableMapOf<DestinationStream.Descriptor, AdditionalStatsHistogram>().withDefault {
67+
populateWithDefaultValues(AdditionalStatsHistogram())
68+
}
69+
70+
return partitionKeys
71+
.mapNotNull { store.remove(it) }
72+
.fold(accumulator) { acc, perStreamStats ->
73+
perStreamStats.forEach { (stream, histogram) ->
74+
acc.merge(stream, histogram, AdditionalStatsHistogram::merge)
75+
}
76+
addDefaultValues(acc)
77+
}
78+
}
79+
80+
/**
81+
* Adds default values for metrics that are not present in the given histogram. This ensures
82+
* that all metrics defined in `ObservabilityMetrics` are represented in the histogram with a
83+
* default value of 0.0 if they are missing.
84+
*
85+
* @param histogram The histogram to which default values should be added,
86+
* ```
87+
* representing metric counts by their names.
88+
* @return
89+
* ```
90+
* The updated histogram with default values for missing metrics.
91+
*/
92+
private fun addDefaultValues(
93+
stats: MutableMap<DestinationStream.Descriptor, AdditionalStatsHistogram>
94+
): MutableMap<DestinationStream.Descriptor, AdditionalStatsHistogram> {
95+
stats.forEach { (_, histogram) -> populateWithDefaultValues(histogram) }
96+
return stats
97+
}
98+
99+
private fun populateWithDefaultValues(
100+
histogram: AdditionalStatsHistogram
101+
): AdditionalStatsHistogram {
102+
println("Populating...")
103+
ObservabilityMetrics.entries.forEach {
104+
if (histogram.get(it.metricName) == null) {
105+
histogram.increment(it.metricName, 0.0)
106+
}
107+
}
108+
return histogram
109+
}
110+
111+
/**
112+
* Enum representing the available observability metrics. Each metric is associated with a
113+
* specific metric name used for tracking system behavior and performance.
114+
*
115+
* @property metricName The name of the metric used in tracking.
116+
*/
117+
enum class ObservabilityMetrics(val metricName: String) {
118+
NULLED_VALUE_COUNT("nulledValueCount"),
119+
TRUNCATED_VALUE_COUNT("truncatedValueCount")
120+
}
121+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/stats/StateStatsEnricher.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ package io.airbyte.cdk.load.dataflow.state.stats
77
import com.google.common.annotations.VisibleForTesting
88
import io.airbyte.cdk.load.command.DestinationStream
99
import io.airbyte.cdk.load.command.NamespaceMapper
10+
import io.airbyte.cdk.load.dataflow.state.PartitionKey
1011
import io.airbyte.cdk.load.dataflow.state.StateKey
11-
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
1212
import io.airbyte.cdk.load.message.CheckpointMessage
1313
import io.airbyte.cdk.load.message.GlobalCheckpoint
1414
import io.airbyte.cdk.load.message.GlobalSnapshotCheckpoint
@@ -20,7 +20,7 @@ import jakarta.inject.Singleton
2020
class StateStatsEnricher(
2121
private val statsStore: CommittedStatsStore,
2222
private val namespaceMapper: NamespaceMapper,
23-
private val metricTracker: MetricTracker,
23+
private val stateAdditionalStatsStore: StateAdditionalStatsStore,
2424
) {
2525
// Enriches provided state message with stats associated with the given state key.
2626
fun enrich(msg: CheckpointMessage, key: StateKey): CheckpointMessage {
@@ -36,13 +36,15 @@ class StateStatsEnricher(
3636
fun enrichTopLevelDestinationStats(
3737
msg: CheckpointMessage,
3838
desc: DestinationStream.Descriptor,
39+
partitionKeys: List<PartitionKey>,
3940
count: Long
4041
): CheckpointMessage {
4142
// TODO: set this using the count above once we get to total rejected
4243
// records.
4344
msg.updateStats(
4445
destinationStats = msg.sourceStats,
45-
additionalStats = metricTracker.drain(desc)
46+
// Use getValue to ensure the default histogram is returned if the stream is no present
47+
additionalStats = stateAdditionalStatsStore.drain(partitionKeys).getValue(desc).toMap(),
4648
)
4749

4850
return msg
@@ -83,7 +85,7 @@ class StateStatsEnricher(
8385
)
8486
val (committed, cumulative) = statsStore.commitStats(desc, key)
8587

86-
enrichTopLevelDestinationStats(msg, desc, committed.count)
88+
enrichTopLevelDestinationStats(msg, desc, key.partitionKeys, committed.count)
8789
enrichTopLevelStats(msg, cumulative)
8890

8991
return msg

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/stats/socket/EmittedStatsStoreImpl.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class EmittedStatsStoreImpl(
3232
count: Long,
3333
bytes: Long,
3434
) {
35-
readCounts.increment(s, count)
36-
readBytes.increment(s, bytes)
35+
readCounts.increment(s, count.toDouble())
36+
readBytes.increment(s, bytes.toDouble())
3737
}
3838

3939
override fun getStats(): List<AirbyteMessage> =
@@ -45,8 +45,8 @@ class EmittedStatsStoreImpl(
4545
@VisibleForTesting
4646
internal fun get(s: DestinationStream.Descriptor) =
4747
EmissionStats(
48-
count = readCounts.get(s) ?: 0,
49-
bytes = readBytes.get(s) ?: 0,
48+
count = readCounts.get(s)?.toLong() ?: 0,
49+
bytes = readBytes.get(s)?.toLong() ?: 0,
5050
)
5151

5252
@VisibleForTesting

0 commit comments

Comments
 (0)