Skip to content

Commit 55bb5d8

Browse files
authored
Set lower bound included flag in snapshot partitions (#69177)
1 parent 46abc20 commit 55bb5d8

File tree

4 files changed

+33
-23
lines changed

4 files changed

+33
-23
lines changed

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.51.2
12+
dockerImageTag: 3.51.3
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartition.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ class MySqlSourceJdbcSplittableCdcRfrSnapshotPartition(
255255
primaryKey: List<Field>,
256256
override val lowerBound: List<JsonNode>?,
257257
override val upperBound: List<JsonNode>?,
258+
override val isLowerBoundIncluded: Boolean,
258259
) : MySqlSourceJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
259260
override val completeState: OpaqueStateValue
260261
get() =
@@ -364,6 +365,7 @@ class MySqlSourceJdbcSplittableSnapshotWithCursorPartition(
364365
override val upperBound: List<JsonNode>?,
365366
cursor: Field,
366367
cursorUpperBound: JsonNode?,
368+
override val isLowerBoundIncluded: Boolean
367369
) :
368370
MySqlSourceJdbcCursorPartition(
369371
selectQueryGenerator,

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceJdbcPartitionFactory.kt

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -466,18 +466,21 @@ class MySqlSourceJdbcPartitionFactory(
466466
true -> effectiveLowerBound
467467
false -> type.jsonDecoder.decode(lowerBound[0])
468468
}
469-
return calculateBoundaries(opaqueStateValues, lowerBound, upperBound)?.map { (l, u) ->
470-
MySqlSourceJdbcSplittableSnapshotWithCursorPartition(
471-
selectQueryGenerator,
472-
streamState,
473-
checkpointColumns,
474-
listOf(stateValueToJsonNode(checkpointColumns[0], l.toString())),
475-
u?.let { listOf(stateValueToJsonNode(checkpointColumns[0], u.toString())) },
476-
// listOf(stateValueToJsonNode(checkpointColumns[0], u.toString())),
477-
cursor,
478-
cursorUpperBound
479-
)
480-
}
469+
return calculateBoundaries(opaqueStateValues, lowerBound, upperBound)
470+
?.entries
471+
?.mapIndexed { index, (l, u) ->
472+
MySqlSourceJdbcSplittableSnapshotWithCursorPartition(
473+
selectQueryGenerator,
474+
streamState,
475+
checkpointColumns,
476+
listOf(stateValueToJsonNode(checkpointColumns[0], l.toString())),
477+
u?.let { listOf(stateValueToJsonNode(checkpointColumns[0], u.toString())) },
478+
cursor,
479+
cursorUpperBound,
480+
// The first partition includes the lower bound
481+
index == 0
482+
)
483+
}
481484
}
482485

483486
private fun MySqlSourceJdbcRfrSnapshotPartition.split(
@@ -538,15 +541,19 @@ class MySqlSourceJdbcPartitionFactory(
538541
false -> type.jsonDecoder.decode(lowerBound[0])
539542
}
540543

541-
return calculateBoundaries(opaqueStateValues, lowerBound, upperBound)?.map { (l, u) ->
542-
MySqlSourceJdbcSplittableCdcRfrSnapshotPartition(
543-
selectQueryGenerator,
544-
streamState,
545-
checkpointColumns,
546-
listOf(stateValueToJsonNode(checkpointColumns[0], l.toString())),
547-
u?.let { listOf(stateValueToJsonNode(checkpointColumns[0], u.toString())) },
548-
)
549-
}
544+
return calculateBoundaries(opaqueStateValues, lowerBound, upperBound)
545+
?.entries
546+
?.mapIndexed { index, (l, u) ->
547+
MySqlSourceJdbcSplittableCdcRfrSnapshotPartition(
548+
selectQueryGenerator,
549+
streamState,
550+
checkpointColumns,
551+
listOf(stateValueToJsonNode(checkpointColumns[0], l.toString())),
552+
u?.let { listOf(stateValueToJsonNode(checkpointColumns[0], u.toString())) },
553+
// The first partition includes the lower bound
554+
index == 0
555+
)
556+
}
550557
}
551558

552559
private fun <T> calculateBoundaries(

docs/integrations/sources/mysql.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ Any database or table encoding combination of charset and collation is supported
230230

231231
| Version | Date | Pull Request | Subject |
232232
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
233-
| 3.51.2 | 2025-11-2 | [69104](https://github.com/airbytehq/airbyte/pull/69104) | Better partitioning for tables with GUID string primary key. |
233+
| 3.51.3 | 2025-11-05 | [69177](https://github.com/airbytehq/airbyte/pull/69177) | Fix a bug in CDC snapshot queries leading to omission of the first record in some cases. |
234+
| 3.51.2 | 2025-11-02 | [69104](https://github.com/airbytehq/airbyte/pull/69104) | Better partitioning for tables with GUID string primary key. |
234235
| 3.51.1 | 2025-10-24 | [68652](https://github.com/airbytehq/airbyte/pull/68652) | Bump CDK version to the latest to resolve issue with database Views in CDC mode. |
235236
| 3.51.0 | 2025-10-15 | [68094](https://github.com/airbytehq/airbyte/pull/66515) | Bump to the latest CDK for improved Protobuf encoding in socket mode. |
236237
| 3.50.9 | 2025-10-06 | [67151](https://github.com/airbytehq/airbyte/pull/66515) | Fix CDC decorating fields encoding to Protobuf |

0 commit comments

Comments
 (0)