Skip to content

Commit 6b0db42

Browse files
authored
Query table metadata per schema (#69184)
1 parent 1992c65 commit 6b0db42

File tree

3 files changed

+52
-35
lines changed

3 files changed

+52
-35
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.1.65
2+
3+
extract cdk: fix bug when getting table metadata that cause timeout
4+
15
## Version 0.1.64
26

37
extract cdk: add table filtering to jdbc connectors

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.sql.ResultSet
2323
import java.sql.ResultSetMetaData
2424
import java.sql.SQLException
2525
import java.sql.Statement
26+
import kotlin.collections.isNotEmpty
2627

2728
/** Default implementation of [MetadataQuerier]. */
2829
class JdbcMetadataQuerier(
@@ -62,7 +63,10 @@ class JdbcMetadataQuerier(
6263
return null
6364
}
6465

65-
val tableFilters = config.tableFilters
66+
val tableFiltersBySchema: Map<String, List<String>> =
67+
config.tableFilters
68+
.groupBy { it.schemaName }
69+
.mapValues { (_, filters) -> filters.flatMap { it.patterns } }
6670

6771
val memoizedTableNames: List<TableName> by lazy {
6872
log.info { "Querying table names for catalog discovery." }
@@ -93,21 +97,13 @@ class JdbcMetadataQuerier(
9397
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
9498
}
9599

96-
if (tableFilters.isEmpty()) {
97-
addTablesFromQuery(catalog, schema, null)
98-
} else {
99-
val filtersForSchema =
100-
tableFilters.filter { it.schemaName.equals(schema, ignoreCase = true) }
101-
102-
if (filtersForSchema.isEmpty()) {
103-
addTablesFromQuery(catalog, schema, null)
104-
} else {
105-
for (filter in filtersForSchema) {
106-
for (pattern in filter.patterns) {
107-
addTablesFromQuery(catalog, filter.schemaName, pattern)
108-
}
109-
}
100+
val patterns = tableFiltersBySchema[namespace]
101+
if (patterns != null && patterns.isNotEmpty()) {
102+
for (pattern in patterns) {
103+
addTablesFromQuery(catalog, schema, pattern)
110104
}
105+
} else {
106+
addTablesFromQuery(catalog, schema, null)
111107
}
112108
}
113109
log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." }
@@ -127,28 +123,45 @@ class JdbcMetadataQuerier(
127123
log.info { "Querying column names for catalog discovery." }
128124
try {
129125
val dbmd: DatabaseMetaData = conn.metaData
130-
memoizedTableNames
131-
.filter { it.namespace() != null }
132-
.forEach { table ->
133-
dbmd.getPseudoColumns(table.catalog, table.schema, table.name, null).use {
134-
rs: ResultSet ->
135-
while (rs.next()) {
136-
val (tableName: TableName, metadata: ColumnMetadata) =
137-
columnMetadataFromResultSet(rs, isPseudoColumn = true)
138-
val joinedTableName: TableName = joinMap[tableName] ?: continue
139-
results.add(joinedTableName to metadata)
140-
}
126+
127+
fun addColumnsFromQuery(
128+
catalog: String?,
129+
schema: String?,
130+
tablePattern: String?,
131+
isPseudoColumn: Boolean
132+
) {
133+
val rsMethod = if (isPseudoColumn) dbmd::getPseudoColumns else dbmd::getColumns
134+
rsMethod(catalog, schema, tablePattern, null).use { rs: ResultSet ->
135+
while (rs.next()) {
136+
val (tableName: TableName, metadata: ColumnMetadata) =
137+
columnMetadataFromResultSet(rs, isPseudoColumn)
138+
val joinedTableName: TableName = joinMap[tableName] ?: continue
139+
results.add(joinedTableName to metadata)
141140
}
142-
dbmd.getColumns(table.catalog, table.schema, table.name, null).use {
143-
rs: ResultSet ->
144-
while (rs.next()) {
145-
val (tableName: TableName, metadata: ColumnMetadata) =
146-
columnMetadataFromResultSet(rs, isPseudoColumn = false)
147-
val joinedTableName: TableName = joinMap[tableName] ?: continue
148-
results.add(joinedTableName to metadata)
149-
}
141+
}
142+
}
143+
// Query columns using the same pattern as table discovery:
144+
// - If schema has filters, query per filter pattern
145+
// - If no filters, query entire schema at once
146+
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
147+
val (catalog: String?, schema: String?) =
148+
when (constants.namespaceKind) {
149+
NamespaceKind.CATALOG -> namespace to null
150+
NamespaceKind.SCHEMA -> null to namespace
151+
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
150152
}
153+
154+
val patterns = tableFiltersBySchema[namespace]
155+
if (patterns != null && patterns.isNotEmpty()) {
156+
for (pattern in patterns) {
157+
addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = true)
158+
addColumnsFromQuery(catalog, schema, pattern, isPseudoColumn = false)
159+
}
160+
} else {
161+
addColumnsFromQuery(catalog, schema, null, isPseudoColumn = true)
162+
addColumnsFromQuery(catalog, schema, null, isPseudoColumn = false)
151163
}
164+
}
152165
log.info { "Discovered ${results.size} column(s) and pseudo-column(s)." }
153166
} catch (e: Exception) {
154167
throw RuntimeException("Column name discovery query failed: ${e.message}", e)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.1.64
1+
version=0.1.65

0 commit comments

Comments
 (0)