Skip to content

Commit b0cd0c6

Browse files
frifriSF59edgao
andauthored
[Snowflake] Truncate vs nullify on out of range for float and integer (#69342)
Co-authored-by: Edward Gao <[email protected]>
1 parent c669993 commit b0cd0c6

File tree

6 files changed

+175
-189
lines changed

6 files changed

+175
-189
lines changed

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: destination
88
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
9-
dockerImageTag: 4.0.28
9+
dockerImageTag: 4.0.29
1010
dockerRepository: airbyte/destination-snowflake
1111
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1212
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/write/transform/SnowflakeValueCoercer.kt

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.integrations.destination.snowflake.write.transform
66

77
import com.google.common.base.Utf8
8-
import io.airbyte.cdk.load.data.AirbyteValue
98
import io.airbyte.cdk.load.data.ArrayValue
109
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1110
import io.airbyte.cdk.load.data.IntegerValue
@@ -28,13 +27,13 @@ import java.math.BigInteger
2827
*/
2928

3029
// https://docs.snowflake.com/en/sql-reference/data-types-numeric#number
31-
internal val INT_MAX = BigInteger("99999999999999999999999999999999999999") // 38 9s
32-
internal val INT_MIN = BigInteger("-99999999999999999999999999999999999999") // 38 9s
30+
val INT_MAX = BigInteger("99999999999999999999999999999999999999") // 38 9s
31+
val INT_MIN = BigInteger("-99999999999999999999999999999999999999") // 38 9s
3332
internal val INT_RANGE = INT_MIN..INT_MAX
3433

3534
// https://docs.snowflake.com/en/sql-reference/data-types-numeric#label-data-type-float
36-
internal val FLOAT_MAX = BigDecimal("9007199254740991")
37-
internal val FLOAT_MIN = BigDecimal("-9007199254740991")
35+
internal val FLOAT_MAX = BigDecimal.valueOf(Double.MAX_VALUE)
36+
internal val FLOAT_MIN = BigDecimal.valueOf(-Double.MAX_VALUE)
3837
internal val FLOAT_RANGE = FLOAT_MIN..FLOAT_MAX
3938

4039
// https://docs.snowflake.com/en/sql-reference/data-types-semistructured#characteristics-of-a-variant-value
@@ -46,17 +45,6 @@ internal const val VARCHAR_LIMIT_BYTES = 16 * 1024 * 1024
4645
internal const val MAX_UTF_8_VARIANT_LENGTH_UNDER_LIMIT = VARIANT_LIMIT_BYTES / 4 // (134217728 / 4)
4746
internal const val MAX_UTF_8_VARCHAR_LENGTH_UNDER_LIMIT = VARCHAR_LIMIT_BYTES / 4 // (16777216 / 4)
4847

49-
fun isValid(value: AirbyteValue): Boolean {
50-
return when (value) {
51-
is ArrayValue,
52-
is ObjectValue -> isVariantValid(value.toCsvValue().toString())
53-
is IntegerValue -> value.value in INT_RANGE
54-
is NumberValue -> value.value in FLOAT_RANGE
55-
is StringValue -> isVarcharValid(value.value)
56-
else -> true
57-
}
58-
}
59-
6048
fun isVariantValid(s: String): Boolean {
6149
// avoid expensive size calculation if we're safely under the limit
6250
if (s.length <= MAX_UTF_8_VARIANT_LENGTH_UNDER_LIMIT) return true
@@ -85,10 +73,56 @@ class SnowflakeValueCoercer : ValueCoercer {
8573
return value
8674
}
8775

88-
override fun validate(value: EnrichedAirbyteValue): ValidationResult =
89-
if (!isValid(value.abValue)) {
90-
ValidationResult.ShouldNullify(
91-
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
92-
)
93-
} else ValidationResult.Valid
76+
override fun validate(value: EnrichedAirbyteValue): ValidationResult {
77+
return when (val abValue = value.abValue) {
78+
is NumberValue -> {
79+
if (abValue.value in FLOAT_RANGE) {
80+
val targetValue = BigDecimal.valueOf(abValue.value.toDouble())
81+
// This is done because BigDecimal is stupid and if we don't use compareTo, we
82+
// end up with 0 != 0.0
83+
if (targetValue.compareTo(abValue.value) == 0) {
84+
ValidationResult.Valid
85+
} else {
86+
ValidationResult.ShouldTruncate(
87+
NumberValue(targetValue),
88+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
89+
)
90+
}
91+
} else {
92+
ValidationResult.ShouldNullify(
93+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
94+
)
95+
}
96+
}
97+
is IntegerValue -> {
98+
if (abValue.value in INT_RANGE) {
99+
ValidationResult.Valid
100+
} else {
101+
ValidationResult.ShouldNullify(
102+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
103+
)
104+
}
105+
}
106+
is StringValue -> {
107+
if (!isVarcharValid(abValue.value)) {
108+
ValidationResult.ShouldNullify(
109+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
110+
)
111+
} else {
112+
ValidationResult.Valid
113+
}
114+
}
115+
is ArrayValue,
116+
is ObjectValue -> {
117+
if (!isVariantValid(abValue.toCsvValue().toString())) {
118+
ValidationResult.ShouldNullify(
119+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
120+
)
121+
} else {
122+
ValidationResult.Valid
123+
}
124+
}
125+
else -> ValidationResult.Valid
126+
}
127+
}
94128
}

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/write/SnowflakeExpectedRawRecordMapper.kt

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
2020
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
2121
import io.airbyte.cdk.load.test.util.OutputRecord
2222
import io.airbyte.integrations.destination.snowflake.write.SnowflakeExpectedRecordMapper.mapAirbyteMetadata
23-
import io.airbyte.integrations.destination.snowflake.write.transform.isValid
23+
import io.airbyte.integrations.destination.snowflake.write.transform.INT_MAX
24+
import io.airbyte.integrations.destination.snowflake.write.transform.INT_MIN
25+
import java.math.BigDecimal
26+
27+
val INT_MIN_NUMBER = INT_MIN.toBigDecimal()
28+
val INT_MAX_NUMBER = INT_MAX.toBigDecimal()
2429

2530
object SnowflakeExpectedRawRecordMapper : ExpectedRecordMapper {
2631
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord {
@@ -52,14 +57,28 @@ object SnowflakeExpectedRawRecordMapper : ExpectedRecordMapper {
5257
* example, 1.0 is stored as 1. We perform the same conversion here.
5358
*/
5459
private fun mapValues(value: AirbyteValue): AirbyteValue =
55-
if (isValid(value)) {
56-
when (value) {
57-
is DateValue -> StringValue(value.value.toString())
58-
is TimeWithTimezoneValue, -> StringValue(value.value.toString())
59-
is TimeWithoutTimezoneValue -> StringValue(value.value.toString())
60-
is TimestampWithTimezoneValue -> StringValue(value.value.toString())
61-
is TimestampWithoutTimezoneValue -> StringValue(value.value.toString())
62-
is NumberValue ->
60+
when (value) {
61+
is DateValue -> StringValue(value.value.toString())
62+
is TimeWithTimezoneValue -> StringValue(value.value.toString())
63+
is TimeWithoutTimezoneValue -> StringValue(value.value.toString())
64+
is TimestampWithTimezoneValue -> StringValue(value.value.toString())
65+
is TimestampWithoutTimezoneValue -> StringValue(value.value.toString())
66+
is NumberValue ->
67+
// TODO This is a hack -
68+
// https://github.com/airbytehq/airbyte-internal-issues/issues/15359
69+
// If we're within the weird clamping range, then clamp to 9.999e38
70+
if (
71+
BigDecimal("-1.00000000000000001526e39") < value.value &&
72+
value.value < INT_MIN_NUMBER
73+
) {
74+
NumberValue(BigDecimal("-9.999999999999999e38"))
75+
} else if (
76+
INT_MAX_NUMBER < value.value &&
77+
value.value < BigDecimal("1.00000000000000001526e39")
78+
) {
79+
NumberValue(BigDecimal("9.999999999999999e38"))
80+
} else if (INT_MIN_NUMBER < value.value && value.value < INT_MAX_NUMBER) {
81+
// If we're within snowflake's NUMBER(38, 0) range, then translate to int
6382
try {
6483
// If the value is exactly an integer, turn it into an IntegerValue
6584
IntegerValue(value.value.toBigIntegerExact())
@@ -68,12 +87,13 @@ object SnowflakeExpectedRawRecordMapper : ExpectedRecordMapper {
6887
// So just return the original NumberValue.
6988
value
7089
}
71-
is ArrayValue -> ArrayValue(value.values.map { mapValues(it) })
72-
is ObjectValue ->
73-
ObjectValue(value.values.mapValuesTo(linkedMapOf()) { (_, v) -> mapValues(v) })
74-
else -> value
75-
}
76-
} else {
77-
NullValue
90+
} else {
91+
// otherwise, leave the value unchanged
92+
value
93+
}
94+
is ArrayValue -> ArrayValue(value.values.map { mapValues(it) })
95+
is ObjectValue ->
96+
ObjectValue(value.values.mapValuesTo(linkedMapOf()) { (_, v) -> mapValues(v) })
97+
else -> value
7898
}
7999
}

airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/write/SnowflakeExpectedRecordMapper.kt

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ package io.airbyte.integrations.destination.snowflake.write
77
import io.airbyte.cdk.load.data.AirbyteType
88
import io.airbyte.cdk.load.data.AirbyteValue
99
import io.airbyte.cdk.load.data.ArrayValue
10+
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1011
import io.airbyte.cdk.load.data.NullValue
1112
import io.airbyte.cdk.load.data.ObjectValue
1213
import io.airbyte.cdk.load.data.StringValue
1314
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
1415
import io.airbyte.cdk.load.data.json.toJson
16+
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
1517
import io.airbyte.cdk.load.message.Meta
1618
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
1719
import io.airbyte.cdk.load.test.util.OutputRecord
1820
import io.airbyte.integrations.destination.snowflake.db.toSnowflakeCompatibleName
19-
import io.airbyte.integrations.destination.snowflake.write.transform.isValid
21+
import io.airbyte.integrations.destination.snowflake.write.transform.SnowflakeValueCoercer
2022
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
2123
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
2224

@@ -40,15 +42,26 @@ object SnowflakeExpectedRecordMapper : ExpectedRecordMapper {
4042
}
4143

4244
private fun mapAirbyteValue(value: AirbyteValue): AirbyteValue {
43-
return if (isValid(value)) {
44-
when (value) {
45-
is TimeWithTimezoneValue -> StringValue(value.value.toString())
46-
is ArrayValue,
47-
is ObjectValue -> StringValue(value.toJson().toPrettyString())
48-
else -> value
49-
}
50-
} else {
51-
NullValue
45+
val validationResult =
46+
SnowflakeValueCoercer()
47+
.validate(
48+
EnrichedAirbyteValue(
49+
value,
50+
value.airbyteType,
51+
name = "unused",
52+
airbyteMetaField = null,
53+
)
54+
)
55+
return when (validationResult) {
56+
is ValidationResult.ShouldNullify -> NullValue
57+
is ValidationResult.ShouldTruncate -> validationResult.truncatedValue
58+
ValidationResult.Valid ->
59+
when (value) {
60+
is TimeWithTimezoneValue -> StringValue(value.value.toString())
61+
is ArrayValue,
62+
is ObjectValue -> StringValue(value.toJson().toPrettyString())
63+
else -> value
64+
}
5265
}
5366
}
5467

0 commit comments

Comments
 (0)