Skip to content

Commit b8faac7

Browse files
authored
[source-mysql] do not check binlog if we have gtid validated (#48688)
1 parent f4cfb4b commit b8faac7

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.9.0-rc.20
12+
dockerImageTag: 3.9.0-rc.21
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,19 @@ class MySqlDebeziumOperations(
146146
return abortCdcSync()
147147
}
148148
}
149+
if (!savedGtidSet.isEmpty) {
150+
// If the connector has saved GTID set, we will use that to validate and skip
151+
// binlog validation. GTID and binlog works in an independent way to ensure data
152+
// integrity where GTID is for storing transactions and binlog is for storing changes
153+
// in DB.
154+
return CdcStateValidateResult.VALID
155+
}
149156
val existingLogFiles: List<String> = getBinaryLogFileNames()
150157
val found = existingLogFiles.contains(savedStateOffset.position.fileName)
151158
if (!found) {
152159
log.info {
153-
"Connector last known binlog file ${savedStateOffset.position.fileName} is not found in the server"
160+
"Connector last known binlog file ${savedStateOffset.position.fileName} is " +
161+
"not found in the server. Server has $existingLogFiles"
154162
}
155163
return abortCdcSync()
156164
}
@@ -277,13 +285,12 @@ class MySqlDebeziumOperations(
277285
}
278286

279287
private fun getBinaryLogFileNames(): List<String> {
280-
val logNameField = Field("Log_name", StringFieldType)
288+
// Very old Mysql version (4.x) has different output of SHOW BINARY LOGS output.
281289
return jdbcConnectionFactory.get().use { connection: Connection ->
282290
connection.createStatement().use { stmt: Statement ->
283291
val sql = "SHOW BINARY LOGS"
284292
stmt.executeQuery(sql).use { rs: ResultSet ->
285-
generateSequence { if (rs.next()) rs.getString(logNameField.id) else null }
286-
.toList()
293+
generateSequence { if (rs.next()) rs.getString(1) else null }.toList()
287294
}
288295
}
289296
}

0 commit comments

Comments
 (0)