Skip to content

Commit f88fce7

Browse files
authored
source-mssql: Fix primary key discovery and clustered index handling (#69194)
1 parent 4b367de commit f88fce7

File tree

9 files changed

+832
-144
lines changed

9 files changed

+832
-144
lines changed

airbyte-integrations/connectors/source-mssql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.3.0-rc.3
12+
dockerImageTag: 4.3.0-rc.4
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerJdbcPartition.kt

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ fun stateValueToJsonNode(field: Field, stateValue: String?): JsonNode {
152152

153153
sealed class MsSqlServerJdbcPartition(
154154
val selectQueryGenerator: SelectQueryGenerator,
155-
streamState: DefaultJdbcStreamState,
155+
override val streamState: DefaultJdbcStreamState,
156156
) : JdbcPartition<DefaultJdbcStreamState> {
157157
val stream: Stream = streamState.stream
158158
val from = From(stream.name, stream.namespace)
@@ -176,15 +176,15 @@ sealed class MsSqlServerJdbcPartition(
176176

177177
class MsSqlServerJdbcNonResumableSnapshotPartition(
178178
selectQueryGenerator: SelectQueryGenerator,
179-
override val streamState: DefaultJdbcStreamState,
179+
streamState: DefaultJdbcStreamState,
180180
) : MsSqlServerJdbcPartition(selectQueryGenerator, streamState) {
181181

182182
override val completeState: OpaqueStateValue = MsSqlServerJdbcStreamStateValue.snapshotCompleted
183183
}
184184

185185
class MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
186186
selectQueryGenerator: SelectQueryGenerator,
187-
override val streamState: DefaultJdbcStreamState,
187+
streamState: DefaultJdbcStreamState,
188188
val cursor: Field,
189189
val cursorCutoffTime: JsonNode? = null,
190190
) :
@@ -322,7 +322,7 @@ sealed class MsSqlServerJdbcResumablePartition(
322322
/** RFR for cursor based read. */
323323
class MsSqlServerJdbcRfrSnapshotPartition(
324324
selectQueryGenerator: SelectQueryGenerator,
325-
override val streamState: DefaultJdbcStreamState,
325+
streamState: DefaultJdbcStreamState,
326326
primaryKey: List<Field>,
327327
override val lowerBound: List<JsonNode>?,
328328
override val upperBound: List<JsonNode>?,
@@ -351,7 +351,7 @@ class MsSqlServerJdbcRfrSnapshotPartition(
351351
/** RFR for CDC. */
352352
class MsSqlServerJdbcCdcRfrSnapshotPartition(
353353
selectQueryGenerator: SelectQueryGenerator,
354-
override val streamState: DefaultJdbcStreamState,
354+
streamState: DefaultJdbcStreamState,
355355
primaryKey: List<Field>,
356356
override val lowerBound: List<JsonNode>?,
357357
override val upperBound: List<JsonNode>?,
@@ -380,9 +380,9 @@ class MsSqlServerJdbcCdcRfrSnapshotPartition(
380380
*/
381381
class MsSqlServerJdbcCdcSnapshotPartition(
382382
selectQueryGenerator: SelectQueryGenerator,
383-
override val streamState: DefaultJdbcStreamState,
383+
streamState: DefaultJdbcStreamState,
384384
primaryKey: List<Field>,
385-
override val lowerBound: List<JsonNode>?
385+
override val lowerBound: List<JsonNode>?,
386386
) : MsSqlServerJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
387387
override val upperBound: List<JsonNode>? = null
388388
override val completeState: OpaqueStateValue
@@ -414,7 +414,7 @@ sealed class MsSqlServerJdbcCursorPartition(
414414

415415
val cursorUpperBoundQuerySpec: SelectQuerySpec
416416
get() =
417-
if (cursorCutoffTime != null && checkpointColumns.contains(cursor)) {
417+
if (cursorCutoffTime != null) {
418418
// When excluding today's data, apply cutoff constraint to upper bound query too
419419
SelectQuerySpec(
420420
SelectColumnMaxValue(cursor),
@@ -425,9 +425,25 @@ sealed class MsSqlServerJdbcCursorPartition(
425425
SelectQuerySpec(SelectColumnMaxValue(cursor), from)
426426
}
427427

428+
// Override samplingQuery to avoid TABLESAMPLE for cursor-based operations
429+
// TABLESAMPLE fails on views and isn't needed for cursor-based incremental reads
430+
// which are typically small (only new/changed data)
431+
override fun samplingQuery(sampleRateInvPow2: Int): SelectQuery {
432+
val sampleSize: Int = streamState.sharedState.maxSampleSize
433+
val querySpec =
434+
SelectQuerySpec(
435+
SelectColumns(stream.fields + checkpointColumns),
436+
from,
437+
NoWhere,
438+
OrderBy(checkpointColumns),
439+
Limit(sampleSize.toLong())
440+
)
441+
return selectQueryGenerator.generate(querySpec.optimize())
442+
}
443+
428444
override val additionalWhereClause: WhereClauseNode?
429445
get() =
430-
if (cursorCutoffTime != null && checkpointColumns.contains(cursor)) {
446+
if (cursorCutoffTime != null) {
431447
// Add an additional constraint for the cutoff time
432448
Lesser(cursor, cursorCutoffTime)
433449
} else {
@@ -437,7 +453,7 @@ sealed class MsSqlServerJdbcCursorPartition(
437453

438454
class MsSqlServerJdbcSnapshotWithCursorPartition(
439455
selectQueryGenerator: SelectQueryGenerator,
440-
override val streamState: DefaultJdbcStreamState,
456+
streamState: DefaultJdbcStreamState,
441457
primaryKey: List<Field>,
442458
override val lowerBound: List<JsonNode>?,
443459
cursor: Field,
@@ -472,7 +488,7 @@ class MsSqlServerJdbcSnapshotWithCursorPartition(
472488

473489
class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
474490
selectQueryGenerator: SelectQueryGenerator,
475-
override val streamState: DefaultJdbcStreamState,
491+
streamState: DefaultJdbcStreamState,
476492
primaryKey: List<Field>,
477493
override val lowerBound: List<JsonNode>?,
478494
override val upperBound: List<JsonNode>?,
@@ -522,7 +538,7 @@ class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
522538
*/
523539
class MsSqlServerJdbcCursorIncrementalPartition(
524540
selectQueryGenerator: SelectQueryGenerator,
525-
override val streamState: DefaultJdbcStreamState,
541+
streamState: DefaultJdbcStreamState,
526542
cursor: Field,
527543
val cursorLowerBound: JsonNode,
528544
override val isLowerBoundIncluded: Boolean,

0 commit comments

Comments
 (0)