Skip to content

Commit c761e11

Browse files
Adding table filtering to JDBC (#69094)
Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 975934c commit c761e11

File tree

10 files changed

+445
-16
lines changed

10 files changed

+445
-16
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.1.64
2+
3+
extract cdk: add table filtering to jdbc connectors
4+
15
## Version 0.1.63
26

37
introduce extract-trigger toolkit for trigger-based CDC

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/command/JdbcSourceConfiguration.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.cdk.command
33

4+
import io.airbyte.cdk.ConfigErrorException
45
import io.micronaut.context.annotation.Factory
56
import jakarta.inject.Singleton
67

@@ -22,6 +23,10 @@ interface JdbcSourceConfiguration : SourceConfiguration {
2223
val checkPrivileges: Boolean
2324
get() = true
2425

26+
/** Optional table filtering configuration for fine-grained table selection. */
27+
val tableFilters: List<TableFilter>
28+
get() = emptyList()
29+
2530
/**
2631
* Micronaut factory which glues [ConfigurationSpecificationSupplier] and
2732
* [SourceConfigurationFactory] together to produce a [JdbcSourceConfiguration] singleton.
@@ -34,4 +39,30 @@ interface JdbcSourceConfiguration : SourceConfiguration {
3439
factory: SourceConfigurationFactory<I, out JdbcSourceConfiguration>,
3540
): JdbcSourceConfiguration = factory.make(pojoSupplier.get())
3641
}
42+
43+
companion object {
44+
/**
45+
* Validates that all schemas referenced in table filters are present in the configured
46+
* schemas list.
47+
*
48+
* @param configuredSchemas The set of schemas configured for the connector
49+
* @param tableFilters The list of table filters to validate
50+
* @throws ConfigErrorException if any filter references a schema not in configuredSchemas
51+
*/
52+
fun validateTableFilters(configuredSchemas: Set<String>, tableFilters: List<TableFilter>) {
53+
if (tableFilters.isEmpty()) return
54+
if (configuredSchemas.isEmpty()) return
55+
56+
val configuredSchemasUpper = configuredSchemas.map { it.uppercase() }.toSet()
57+
val filterSchemas = tableFilters.map { it.schemaName.uppercase() }.toSet()
58+
val invalidSchemas = filterSchemas - configuredSchemasUpper
59+
60+
if (invalidSchemas.isNotEmpty()) {
61+
throw ConfigErrorException(
62+
"Table filters reference schemas not in configured schemas list: $invalidSchemas. " +
63+
"Configured schemas: $configuredSchemas"
64+
)
65+
}
66+
}
67+
}
3768
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.cdk.command
3+
4+
import com.fasterxml.jackson.annotation.JsonProperty
5+
import com.fasterxml.jackson.annotation.JsonPropertyDescription
6+
import com.fasterxml.jackson.annotation.JsonPropertyOrder
7+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDefault
8+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription
9+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
10+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
11+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
12+
13+
/**
14+
* Base ConfigurationSpecification for JDBC sources with common properties.
15+
*
16+
* Connector-specific implementations should extend this class and add their unique properties (like
17+
* replication methods, SSL modes, etc.).
18+
*/
19+
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
20+
abstract class JdbcSourceConfigurationSpecification : ConfigurationSpecification() {
21+
22+
@JsonProperty("host")
23+
@JsonSchemaTitle("Host")
24+
@JsonPropertyDescription("Hostname of the database.")
25+
@JsonSchemaInject(json = """{"order":0,"always_show":true}""")
26+
lateinit var host: String
27+
28+
@JsonProperty("username")
29+
@JsonSchemaTitle("Username")
30+
@JsonPropertyDescription("The username which is used to access the database.")
31+
@JsonSchemaInject(json = """{"order":2,"always_show":true}""")
32+
lateinit var username: String
33+
34+
@JsonProperty("password")
35+
@JsonSchemaTitle("Password")
36+
@JsonPropertyDescription("The password associated with the username.")
37+
@JsonSchemaInject(json = """{"order":3,"always_show":true,"airbyte_secret":true}""")
38+
var password: String? = null
39+
40+
@JsonProperty("database")
41+
@JsonSchemaTitle("Database")
42+
@JsonPropertyDescription("Name of the database.")
43+
@JsonSchemaInject(json = """{"order":4,"always_show":true}""")
44+
lateinit var database: String
45+
46+
@JsonProperty("schemas")
47+
@JsonSchemaTitle("Schemas")
48+
@JsonPropertyDescription(
49+
"The list of schemas to sync from. " +
50+
"If not specified, all accessible schemas will be synced. " +
51+
"The exact meaning depends on the database (schema names, database names, etc.)."
52+
)
53+
@JsonSchemaInject(json = """{"order":1,"always_show":true,"group":"optional"}""")
54+
var schemas: List<String>? = null
55+
56+
@JsonProperty("table_filters")
57+
@JsonSchemaTitle("Table Filters")
58+
@JsonPropertyDescription(
59+
"Optional filters to include only specific tables from specific schemas. " +
60+
"Works in combination with the 'Schemas' config above."
61+
)
62+
@JsonSchemaInject(json = """{"order":2,"always_show":true,"group":"optional"}""")
63+
var tableFilters: List<TableFilter>? = emptyList()
64+
65+
@JsonProperty("jdbc_url_params")
66+
@JsonSchemaTitle("JDBC URL Params")
67+
@JsonPropertyDescription(
68+
"Additional properties to pass to the JDBC URL string when connecting to the database " +
69+
"formatted as 'key=value' pairs separated by the symbol '&'. " +
70+
"(example: key1=value1&key2=value2&key3=value3)."
71+
)
72+
@JsonSchemaInject(json = """{"order":3,"group":"optional"}""")
73+
var jdbcUrlParams: String? = null
74+
75+
@JsonProperty("check_privileges")
76+
@JsonSchemaTitle("Check Table and Column Access Privileges")
77+
@JsonSchemaInject(json = """{"order":4,"group":"optional"}""")
78+
@JsonSchemaDefault("true")
79+
@JsonPropertyDescription(
80+
"When this feature is enabled, during schema discovery the connector " +
81+
"will query each table or view individually to check access privileges " +
82+
"and inaccessible tables, views, or columns therein will be removed. " +
83+
"In large schemas, this might cause schema discovery to take too long, " +
84+
"in which case it might be advisable to disable this feature.",
85+
)
86+
var checkPrivileges: Boolean? = true
87+
}
88+
89+
@JsonSchemaTitle("Table Filter")
90+
@JsonSchemaDescription("Inclusion filter configuration for table selection per schema.")
91+
@JsonPropertyOrder("schema_name", "table_name_patterns")
92+
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
93+
class TableFilter {
94+
@JsonProperty("schema_name", required = true)
95+
@JsonSchemaTitle("Schema Name")
96+
@JsonPropertyDescription(
97+
"The name of the schema to apply this filter to. " +
98+
"Should match a schema defined in \"Schemas\" field above."
99+
)
100+
@JsonSchemaInject(json = """{"order":1,"always_show":true}""")
101+
lateinit var schemaName: String
102+
103+
@JsonProperty("table_name_patterns", required = true)
104+
@JsonSchemaTitle("Table Filter Patterns")
105+
@JsonPropertyDescription(
106+
"List of table name patterns to include from this schema. " +
107+
"Should be a SQL LIKE pattern."
108+
)
109+
@JsonSchemaInject(json = """{"order":2,"always_show":true,"minItems":1}""")
110+
lateinit var patterns: List<String>
111+
}

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,31 +62,54 @@ class JdbcMetadataQuerier(
6262
return null
6363
}
6464

65+
val tableFilters = config.tableFilters
66+
6567
val memoizedTableNames: List<TableName> by lazy {
6668
log.info { "Querying table names for catalog discovery." }
6769
try {
6870
val allTables = mutableSetOf<TableName>()
6971
val dbmd: DatabaseMetaData = conn.metaData
70-
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
71-
val (catalog: String?, schema: String?) =
72-
when (constants.namespaceKind) {
73-
NamespaceKind.CATALOG -> namespace to null
74-
NamespaceKind.SCHEMA -> null to namespace
75-
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
76-
}
77-
dbmd.getTables(catalog, schema, null, null).use { rs: ResultSet ->
72+
73+
fun addTablesFromQuery(catalog: String?, schema: String?, pattern: String?) {
74+
dbmd.getTables(catalog, schema, pattern, null).use { rs: ResultSet ->
7875
while (rs.next()) {
7976
allTables.add(
8077
TableName(
8178
catalog = rs.getString("TABLE_CAT"),
8279
schema = rs.getString("TABLE_SCHEM"),
8380
name = rs.getString("TABLE_NAME"),
8481
type = rs.getString("TABLE_TYPE") ?: "",
85-
),
82+
)
8683
)
8784
}
8885
}
8986
}
87+
88+
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
89+
val (catalog: String?, schema: String?) =
90+
when (constants.namespaceKind) {
91+
NamespaceKind.CATALOG -> namespace to null
92+
NamespaceKind.SCHEMA -> null to namespace
93+
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
94+
}
95+
96+
if (tableFilters.isEmpty()) {
97+
addTablesFromQuery(catalog, schema, null)
98+
} else {
99+
val filtersForSchema =
100+
tableFilters.filter { it.schemaName.equals(schema, ignoreCase = true) }
101+
102+
if (filtersForSchema.isEmpty()) {
103+
addTablesFromQuery(catalog, schema, null)
104+
} else {
105+
for (filter in filtersForSchema) {
106+
for (pattern in filter.patterns) {
107+
addTablesFromQuery(catalog, filter.schemaName, pattern)
108+
}
109+
}
110+
}
111+
}
112+
}
90113
log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." }
91114
return@lazy allTables.toList().sortedBy { "${it.namespace()}.${it.name}.${it.type}" }
92115
} catch (e: Exception) {
@@ -106,18 +129,18 @@ class JdbcMetadataQuerier(
106129
val dbmd: DatabaseMetaData = conn.metaData
107130
memoizedTableNames
108131
.filter { it.namespace() != null }
109-
.map { it.catalog to it.schema }
110-
.distinct()
111-
.forEach { (catalog: String?, schema: String?) ->
112-
dbmd.getPseudoColumns(catalog, schema, null, null).use { rs: ResultSet ->
132+
.forEach { table ->
133+
dbmd.getPseudoColumns(table.catalog, table.schema, table.name, null).use {
134+
rs: ResultSet ->
113135
while (rs.next()) {
114136
val (tableName: TableName, metadata: ColumnMetadata) =
115137
columnMetadataFromResultSet(rs, isPseudoColumn = true)
116138
val joinedTableName: TableName = joinMap[tableName] ?: continue
117139
results.add(joinedTableName to metadata)
118140
}
119141
}
120-
dbmd.getColumns(catalog, schema, null, null).use { rs: ResultSet ->
142+
dbmd.getColumns(table.catalog, table.schema, table.name, null).use {
143+
rs: ResultSet ->
121144
while (rs.next()) {
122145
val (tableName: TableName, metadata: ColumnMetadata) =
123146
columnMetadataFromResultSet(rs, isPseudoColumn = false)

0 commit comments

Comments
 (0)