Skip to content

Commit dc83e41

Browse files
author
Jonathan Pearlin
authored
refactor: separate validation result handling from coercer (#69113)
1 parent 81e0662 commit dc83e41

File tree

19 files changed

+543
-64
lines changed

19 files changed

+543
-64
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## Version 0.1.66
2+
3+
**Load CDK**
4+
5+
* Added: Support for reporting of additional stats in destination state messages.
6+
* Changed: Refactor coercer interface to separate out coercion and validation.
7+
18
## Version 0.1.65
29

310
extract cdk: fix bug when getting table metadata that cause timeout

airbyte-cdk/bulk/core/base/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ dependencies {
7777
api("com.fasterxml.jackson.core:jackson-databind")
7878
api("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
7979
api("com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39")
80-
api("io.airbyte.airbyte-protocol:protocol-models:0.18.0") {
80+
api("io.airbyte.airbyte-protocol:protocol-models:0.19.0") {
8181
exclude(group="com.google.guava", module="guava")
8282
exclude(group="com.google.api-client")
8383
exclude(group="org.apache.logging.log4j")

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ class EnrichedAirbyteValue(
199199
* Creates a nullified version of this value with the specified reason.
200200
*
201201
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
202+
*
203+
* @deprecated Use the new data flow pipeline instead
202204
*/
203205
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
204206
val nullChange = Meta.Change(field = name, change = Change.NULLED, reason = reason)
@@ -212,6 +214,8 @@ class EnrichedAirbyteValue(
212214
*
213215
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
214216
* @param newValue The new (truncated) value to use
217+
*
218+
* @deprecated Use the new data flow pipeline instead
215219
*/
216220
fun truncate(
217221
newValue: AirbyteValue,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/stats/StateStatsEnricher.kt

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
package io.airbyte.cdk.load.dataflow.state.stats
66

77
import com.google.common.annotations.VisibleForTesting
8+
import io.airbyte.cdk.load.command.DestinationStream
89
import io.airbyte.cdk.load.command.NamespaceMapper
910
import io.airbyte.cdk.load.dataflow.state.StateKey
11+
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
1012
import io.airbyte.cdk.load.message.CheckpointMessage
1113
import io.airbyte.cdk.load.message.GlobalCheckpoint
1214
import io.airbyte.cdk.load.message.GlobalSnapshotCheckpoint
@@ -18,6 +20,7 @@ import jakarta.inject.Singleton
1820
class StateStatsEnricher(
1921
private val statsStore: CommittedStatsStore,
2022
private val namespaceMapper: NamespaceMapper,
23+
private val metricTracker: MetricTracker,
2124
) {
2225
// Enriches provided state message with stats associated with the given state key.
2326
fun enrich(msg: CheckpointMessage, key: StateKey): CheckpointMessage {
@@ -30,16 +33,34 @@ class StateStatsEnricher(
3033

3134
@VisibleForTesting
3235
@Suppress("UNUSED_PARAMETER")
33-
fun enrichTopLevelDestinationStats(msg: CheckpointMessage, count: Long): CheckpointMessage {
36+
fun enrichTopLevelDestinationStats(
37+
msg: CheckpointMessage,
38+
desc: DestinationStream.Descriptor,
39+
count: Long
40+
): CheckpointMessage {
3441
// TODO: set this using the count above once we get to total rejected
3542
// records.
3643
msg.updateStats(
3744
destinationStats = msg.sourceStats,
45+
additionalStats = metricTracker.drain(desc)
3846
)
3947

4048
return msg
4149
}
4250

51+
@VisibleForTesting
52+
@Suppress("UNUSED_PARAMETER")
53+
fun enrichTopLevelDestinationStatsGlobalState(
54+
msg: CheckpointMessage,
55+
count: Long
56+
): CheckpointMessage {
57+
// TODO: set this using the count above once we get to total rejected
58+
// records.
59+
msg.updateStats(destinationStats = msg.sourceStats)
60+
61+
return msg
62+
}
63+
4364
@VisibleForTesting
4465
fun enrichTopLevelStats(msg: CheckpointMessage, stats: EmissionStats): CheckpointMessage {
4566
msg.updateStats(
@@ -62,7 +83,7 @@ class StateStatsEnricher(
6283
)
6384
val (committed, cumulative) = statsStore.commitStats(desc, key)
6485

65-
enrichTopLevelDestinationStats(msg, committed.count)
86+
enrichTopLevelDestinationStats(msg, desc, committed.count)
6687
enrichTopLevelStats(msg, cumulative)
6788

6889
return msg
@@ -88,7 +109,7 @@ class StateStatsEnricher(
88109
}
89110
.fold(CommitStatsResult()) { acc, c -> acc.merge(c) }
90111

91-
enrichTopLevelDestinationStats(msg, committed.count)
112+
enrichTopLevelDestinationStatsGlobalState(msg, committed.count)
92113
enrichTopLevelStats(msg, cumulative)
93114

94115
return msg
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.dataflow.stats
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import jakarta.inject.Singleton
9+
import java.util.concurrent.ConcurrentHashMap
10+
11+
/**
12+
* A thread-safe utility class designed to track and manage metrics for different destination
13+
* streams. Metrics are categorized by stream descriptors and identified by metric names.
14+
*
15+
* The class supports adding numeric metric values, retrieving metrics for specific streams, and
16+
* draining (retrieving and clearing) metrics for a stream. If metric values are missing for
17+
* predefined metric names, default values of 0.0 are used.
18+
*/
19+
@Singleton
20+
class MetricTracker {
21+
22+
private val metrics: MutableMap<DestinationStream.Descriptor, MutableMap<String, Double>> =
23+
ConcurrentHashMap()
24+
25+
private val lock: Any = Any()
26+
27+
/**
28+
* Adds a metric value to the specified stream descriptor and metric. If the metric does not
29+
* already exist for the given stream, it will be initialized with the given value. Later calls
30+
* will update the existing value by adding the provided value.
31+
*
32+
* @param stream the stream descriptor used to identify the metrics data.
33+
* @param metric the metric name and related metadata to be added or updated.
34+
* @param value the numeric value to add to the specified metric for the given stream.
35+
*/
36+
fun add(stream: DestinationStream.Descriptor, metric: ObservabilityMetrics, value: Double) {
37+
synchronized(lock) {
38+
metrics.putIfAbsent(stream, ConcurrentHashMap())
39+
val streamMetrics = metrics[stream]!!
40+
streamMetrics[metric.metricName] =
41+
streamMetrics.getOrDefault(metric.metricName, 0.0) + value
42+
}
43+
}
44+
45+
/**
46+
* Retrieves the metrics associated with the specified stream descriptor.
47+
*
48+
* @param stream the stream descriptor used to identify the metrics to retrieve.
49+
* @return a map containing the metrics data, where keys represent metric names
50+
* ```
51+
* and values are their corresponding numeric values. If no metrics are
52+
* found for the provided stream, an empty map is returned.
53+
* ```
54+
*/
55+
fun get(stream: DestinationStream.Descriptor): Map<String, Double> =
56+
synchronized(lock) { metrics[stream]?.toMap() ?: mutableMapOf() }
57+
58+
/**
59+
* Drains and returns the current metrics data for the specified stream descriptor. After
60+
* retrieval, the metrics for the provided stream are cleared.
61+
*
62+
* @param stream the stream descriptor associated with the metrics to be drained.
63+
* @return a map containing the drained metrics data, where keys are metric names and values are
64+
* their respective numeric values.
65+
*/
66+
fun drain(stream: DestinationStream.Descriptor): Map<String, Double> {
67+
synchronized(lock) {
68+
// Ensure that all metrics are present even if not explicitly set.
69+
val copy = addDefaultValues(get(stream))
70+
metrics[stream]?.clear()
71+
return copy
72+
}
73+
}
74+
75+
/**
76+
* Adds default values for any missing metrics in the provided map. If a metric defined in the
77+
* [ObservabilityMetrics] enum is absent in the given map, it will be initialized with a default
78+
* value of 0.0.
79+
*
80+
* @param metrics the map of metrics to be updated with default values. The keys
81+
* ```
82+
* are metric names and the values are their respective numeric values.
83+
* @return
84+
* ```
85+
* a new mutable map containing the updated metrics with default values for
86+
* ```
87+
* any missing entries.
88+
* ```
89+
*/
90+
private fun addDefaultValues(metrics: Map<String, Double>): Map<String, Double> {
91+
val copy = metrics.toMutableMap()
92+
ObservabilityMetrics.entries.forEach { copy.putIfAbsent(it.metricName, 0.0) }
93+
return copy
94+
}
95+
}
96+
97+
/**
98+
* Enum representing the available observability metrics. Each metric is associated with a specific
99+
* metric name used for tracking system behavior and performance.
100+
*
101+
* @property metricName The name of the metric used in tracking.
102+
*/
103+
enum class ObservabilityMetrics(val metricName: String) {
104+
NULLED_VALUE_COUNT("nulledValueCount"),
105+
TRUNCATED_VALUE_COUNT("truncatedValueCount")
106+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/transform/ValueCoercer.kt

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.cdk.load.dataflow.transform
77
import io.airbyte.cdk.load.data.AirbyteType
88
import io.airbyte.cdk.load.data.AirbyteValue
99
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
10+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
1011

1112
/**
1213
* Interface for destination-specific field coercion and type representation.
@@ -71,30 +72,43 @@ interface ValueCoercer {
7172
* nullify values that fail validation.
7273
*
7374
* @param value The enriched Airbyte value to validate
74-
* @return The validated EnrichedAirbyteValue, potentially nullified if validation fails
75+
* @return The [ValidationResult] indicating whether the value is valid or not, and if so, why
7576
*
7677
* @example
7778
* ```kotlin
78-
* override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue {
79+
* override fun validate(value: EnrichedAirbyteValue): ValidationResult =
7980
* when (val abValue = value.abValue) {
8081
* is StringValue -> {
8182
* if (abValue.value.length > MAX_STRING_LENGTH) {
82-
* value.nullify(
83-
* AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
84-
* )
83+
* ShouldNullify(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION)
84+
* } else {
85+
* Valid
8586
* }
8687
* }
8788
* is IntegerValue -> {
8889
* if (abValue.value > MAX_INTEGER) {
89-
* value.nullify(
90-
* AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
91-
* )
90+
* ShouldNullify(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION)
91+
* } else {
92+
* Valid
9293
* }
9394
* }
9495
* }
95-
* return value
96-
* }
9796
* ```
9897
*/
99-
fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue
98+
fun validate(value: EnrichedAirbyteValue): ValidationResult
99+
}
100+
101+
/** Result of a value validation check via the [ValueCoercer.validate] method. */
102+
sealed interface ValidationResult {
103+
/** Value is valid, no action needed */
104+
data object Valid : ValidationResult
105+
106+
/** Value should be nullified with the given reason */
107+
data class ShouldNullify(val reason: AirbyteRecordMessageMetaChange.Reason) : ValidationResult
108+
109+
/** Value should be replaced with the new, truncated value and reason */
110+
data class ShouldTruncate(
111+
val truncatedValue: AirbyteValue,
112+
val reason: AirbyteRecordMessageMetaChange.Reason
113+
) : ValidationResult
100114
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.dataflow.transform.data
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.data.AirbyteValue
9+
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
10+
import io.airbyte.cdk.load.data.NullValue
11+
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
12+
import io.airbyte.cdk.load.dataflow.stats.ObservabilityMetrics
13+
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
14+
import io.airbyte.cdk.load.message.Meta
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
16+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
17+
import jakarta.inject.Singleton
18+
19+
@Singleton
20+
class ValidationResultHandler(private val metricTracker: MetricTracker) {
21+
22+
/**
23+
* Processes an `EnrichedAirbyteValue` based on its corresponding `ValidationResult`. The method
24+
* handles three cases:
25+
* - If the result requires truncation, the value is truncated using the `truncate` method.
26+
* - If the result requires nullification, the value is nullified using the `nullify` method.
27+
* - If the result is valid, the original value is returned.
28+
*
29+
* @param stream The descriptor of the destination stream where the value belongs.
30+
* @param result The validation result indicating how the value should be handled.
31+
* @param value The enriched Airbyte value to process based on the validation result.
32+
*/
33+
fun handle(
34+
stream: DestinationStream.Descriptor,
35+
result: ValidationResult,
36+
value: EnrichedAirbyteValue
37+
) =
38+
when (result) {
39+
is ValidationResult.ShouldTruncate ->
40+
truncate(
41+
stream = stream,
42+
value = value,
43+
truncatedValue = result.truncatedValue,
44+
reason = result.reason
45+
)
46+
is ValidationResult.ShouldNullify ->
47+
nullify(stream = stream, value = value, reason = result.reason)
48+
is ValidationResult.Valid -> value
49+
}
50+
51+
/**
52+
* Creates a nullified version of this value with the specified reason.
53+
*
54+
* @param stream The [DestinationStream.Descriptor] for the stream that this nulled value
55+
* belongs to.
56+
* @param value The [EnrichedAirbyteValue] to nullify
57+
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
58+
*
59+
* @return The nullified [EnrichedAirbyteValue].
60+
*/
61+
fun nullify(
62+
stream: DestinationStream.Descriptor,
63+
value: EnrichedAirbyteValue,
64+
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
65+
): EnrichedAirbyteValue {
66+
val nullChange = Meta.Change(field = value.name, change = Change.NULLED, reason = reason)
67+
value.abValue = NullValue
68+
value.changes.add(nullChange)
69+
metricTracker.add(stream, ObservabilityMetrics.NULLED_VALUE_COUNT, 1.0)
70+
return value
71+
}
72+
73+
/**
74+
* Creates a truncated version of this value with the specified reason and new value.
75+
*
76+
* @param stream The [DestinationStream.Descriptor] for the stream that this truncated value
77+
* belongs to.
78+
* @param value The original [EnrichedAirbyteValue] that is to be truncated
79+
* @param truncatedValue The new, truncated value to use
80+
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
81+
*
82+
* @return The truncated [EnrichedAirbyteValue].
83+
*/
84+
fun truncate(
85+
stream: DestinationStream.Descriptor,
86+
value: EnrichedAirbyteValue,
87+
truncatedValue: AirbyteValue,
88+
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION
89+
): EnrichedAirbyteValue {
90+
val truncateChange =
91+
Meta.Change(field = value.name, change = Change.TRUNCATED, reason = reason)
92+
value.abValue = truncatedValue
93+
value.changes.add(truncateChange)
94+
metricTracker.add(stream, ObservabilityMetrics.TRUNCATED_VALUE_COUNT, 1.0)
95+
return value
96+
}
97+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/transform/defaults/NoOpCoercer.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.dataflow.transform.defaults
66

77
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
8+
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
89
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
910
import io.micronaut.context.annotation.Secondary
1011
import jakarta.inject.Singleton
@@ -18,5 +19,5 @@ import jakarta.inject.Singleton
1819
class NoOpCoercer : ValueCoercer {
1920
override fun map(value: EnrichedAirbyteValue): EnrichedAirbyteValue = value
2021

21-
override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue = value
22+
override fun validate(value: EnrichedAirbyteValue): ValidationResult = ValidationResult.Valid
2223
}

0 commit comments

Comments
 (0)