Skip to content

Commit b598054

Browse files
Jiabao-Sun1996fanrui
authored andcommitted
[FLINK-34215] FLIP-377: Support fine-grained configuration to control filter push down for JDBC Connector
1 parent cc5e292 commit b598054

File tree

10 files changed

+144
-20
lines changed

10 files changed

+144
-20
lines changed

docs/content.zh/docs/connectors/table/jdbc.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,18 @@ ON myTopic.key = MyUserTable.id;
258258
<td>Integer</td>
259259
<td>查询数据库失败的最大重试次数。</td>
260260
</tr>
261+
<tr>
262+
<td><h5>filter.handling.policy</h5></td>
263+
<td>可选</td>
264+
<td style="word-wrap: break-word;">always</td>
265+
<td>枚举值,可选项: always, never</td>
266+
<td>过滤器下推策略,支持的策略有:
267+
<ul>
268+
<li><code>always</code>: 始终将支持的过滤器下推到数据库.</li>
269+
<li><code>never</code>: 不将任何过滤器下推到数据库.</li>
270+
</ul>
271+
</td>
272+
</tr>
261273
<tr>
262274
<td><h5>sink.buffer-flush.max-rows</h5></td>
263275
<td>可选</td>

docs/content/docs/connectors/table/jdbc.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,20 @@ Connector Options
274274
<td>Integer</td>
275275
<td>The max retry times if lookup database failed.</td>
276276
</tr>
277+
<tr>
278+
<td><h5>filter.handling.policy</h5></td>
279+
<td>optional</td>
280+
<td>no</td>
281+
<td style="word-wrap: break-word;">always</td>
282+
<td>Enum Possible values: always, never</td>
283+
<td>Fine-grained configuration to control filter push down.
284+
Supported policies are:
285+
<ul>
286+
<li><code>always</code>: Always push the supported filters to database.</li>
287+
<li><code>never</code>: Never push any filters to database.</li>
288+
</ul>
289+
</td>
290+
</tr>
277291
<tr>
278292
<td><h5>sink.buffer-flush.max-rows</h5></td>
279293
<td>optional</td>
@@ -305,7 +319,7 @@ Connector Options
305319
<td style="word-wrap: break-word;">(none)</td>
306320
<td>Integer</td>
307321
<td>Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
308-
</tr>
322+
</tr>
309323
</tbody>
310324
</table>
311325

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.apache.flink.connector.jdbc.core.table;
2+
3+
import org.apache.flink.annotation.PublicEvolving;
4+
import org.apache.flink.configuration.DescribedEnum;
5+
import org.apache.flink.configuration.description.InlineElement;
6+
7+
import static org.apache.flink.configuration.description.TextElement.text;
8+
9+
/** Fine-grained configuration to control filter push down for jdbc Table/SQL source. */
10+
@PublicEvolving
11+
public enum FilterHandlingPolicy implements DescribedEnum {
12+
ALWAYS("always", text("Always push the supported filters to database.")),
13+
14+
NEVER("never", text("Never push any filters to database."));
15+
16+
private final String name;
17+
private final InlineElement description;
18+
19+
FilterHandlingPolicy(String name, InlineElement description) {
20+
this.name = name;
21+
this.description = description;
22+
}
23+
24+
@Override
25+
public InlineElement getDescription() {
26+
return description;
27+
}
28+
29+
@Override
30+
public String toString() {
31+
return name;
32+
}
33+
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,5 +179,12 @@ public class JdbcConnectorOptions {
179179
.defaultValue(3)
180180
.withDescription("The max retry times if writing records to database failed.");
181181

182+
public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
183+
ConfigOptions.key("filter.handling.policy")
184+
.enumType(FilterHandlingPolicy.class)
185+
.defaultValue(FilterHandlingPolicy.ALWAYS)
186+
.withDescription(
187+
"Fine-grained configuration to control filter push down for jdbc Table/SQL source.");
188+
182189
protected JdbcConnectorOptions() {}
183190
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454

5555
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.COMPATIBLE_MODE;
5656
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.DRIVER;
57+
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.FILTER_HANDLING_POLICY;
5758
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
5859
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
5960
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
@@ -128,6 +129,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
128129
getJdbcReadOptions(helper.getOptions()),
129130
helper.getOptions().get(LookupOptions.MAX_RETRIES),
130131
getLookupCache(config),
132+
helper.getOptions().get(FILTER_HANDLING_POLICY),
131133
context.getPhysicalRowDataType());
132134
}
133135

@@ -262,6 +264,7 @@ public Set<ConfigOption<?>> optionalOptions() {
262264
optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
263265
optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
264266
optionalOptions.add(LookupOptions.MAX_RETRIES);
267+
optionalOptions.add(FILTER_HANDLING_POLICY);
265268
return optionalOptions;
266269
}
267270

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
23+
import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
2324
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
2425
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
2526
import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
@@ -53,6 +54,7 @@
5354
import java.io.Serializable;
5455
import java.util.ArrayList;
5556
import java.util.Arrays;
57+
import java.util.Collections;
5658
import java.util.List;
5759
import java.util.Objects;
5860
import java.util.Optional;
@@ -72,6 +74,7 @@ public class JdbcDynamicTableSource
7274
private final JdbcReadOptions readOptions;
7375
private final int lookupMaxRetryTimes;
7476
@Nullable private final LookupCache cache;
77+
private final FilterHandlingPolicy filterHandlingPolicy;
7578
private DataType physicalRowDataType;
7679
private final String dialectName;
7780
private long limit = -1;
@@ -83,11 +86,13 @@ public JdbcDynamicTableSource(
8386
JdbcReadOptions readOptions,
8487
int lookupMaxRetryTimes,
8588
@Nullable LookupCache cache,
89+
FilterHandlingPolicy filterHandlingPolicy,
8690
DataType physicalRowDataType) {
8791
this.options = options;
8892
this.readOptions = readOptions;
8993
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
9094
this.cache = cache;
95+
this.filterHandlingPolicy = filterHandlingPolicy;
9196
this.physicalRowDataType = physicalRowDataType;
9297
this.dialectName = options.getDialect().dialectName();
9398
}
@@ -208,7 +213,12 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
208213
public DynamicTableSource copy() {
209214
JdbcDynamicTableSource newSource =
210215
new JdbcDynamicTableSource(
211-
options, readOptions, lookupMaxRetryTimes, cache, physicalRowDataType);
216+
options,
217+
readOptions,
218+
lookupMaxRetryTimes,
219+
cache,
220+
filterHandlingPolicy,
221+
physicalRowDataType);
212222
newSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
213223
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, this.pushdownParams.length);
214224
return newSource;
@@ -231,6 +241,7 @@ public boolean equals(Object o) {
231241
return Objects.equals(options, that.options)
232242
&& Objects.equals(readOptions, that.readOptions)
233243
&& Objects.equals(lookupMaxRetryTimes, that.lookupMaxRetryTimes)
244+
&& Objects.equals(filterHandlingPolicy, that.filterHandlingPolicy)
234245
&& Objects.equals(cache, that.cache)
235246
&& Objects.equals(physicalRowDataType, that.physicalRowDataType)
236247
&& Objects.equals(dialectName, that.dialectName)
@@ -246,6 +257,7 @@ public int hashCode() {
246257
readOptions,
247258
lookupMaxRetryTimes,
248259
cache,
260+
filterHandlingPolicy,
249261
physicalRowDataType,
250262
dialectName,
251263
limit,
@@ -260,22 +272,29 @@ public void applyLimit(long limit) {
260272

261273
@Override
262274
public Result applyFilters(List<ResolvedExpression> filters) {
263-
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
264-
List<ResolvedExpression> remainingFilters = new ArrayList<>();
265-
266-
for (ResolvedExpression filter : filters) {
267-
Optional<ParameterizedPredicate> simplePredicate = parseFilterToPredicate(filter);
268-
if (simplePredicate.isPresent()) {
269-
acceptedFilters.add(filter);
270-
ParameterizedPredicate pred = simplePredicate.get();
271-
this.pushdownParams = ArrayUtils.addAll(this.pushdownParams, pred.getParameters());
272-
this.resolvedPredicates.add(pred.getPredicate());
273-
} else {
274-
remainingFilters.add(filter);
275-
}
275+
switch (filterHandlingPolicy) {
276+
case NEVER:
277+
return Result.of(Collections.emptyList(), filters);
278+
case ALWAYS:
279+
default:
280+
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
281+
List<ResolvedExpression> remainingFilters = new ArrayList<>();
282+
283+
for (ResolvedExpression filter : filters) {
284+
Optional<ParameterizedPredicate> simplePredicate =
285+
parseFilterToPredicate(filter);
286+
if (simplePredicate.isPresent()) {
287+
acceptedFilters.add(filter);
288+
ParameterizedPredicate pred = simplePredicate.get();
289+
this.pushdownParams =
290+
ArrayUtils.addAll(this.pushdownParams, pred.getParameters());
291+
this.resolvedPredicates.add(pred.getPredicate());
292+
} else {
293+
remainingFilters.add(filter);
294+
}
295+
}
296+
return Result.of(acceptedFilters, remainingFilters);
276297
}
277-
278-
return Result.of(acceptedFilters, remainingFilters);
279298
}
280299

281300
private Optional<ParameterizedPredicate> parseFilterToPredicate(ResolvedExpression filter) {

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ void testJdbcCommonProperties() {
7171
properties.put("username", "user");
7272
properties.put("password", "pass");
7373
properties.put("connection.max-retry-timeout", "120s");
74+
properties.put("filter.handling.policy", "never");
7475

7576
// validation for source
7677
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
@@ -89,6 +90,7 @@ void testJdbcCommonProperties() {
8990
JdbcReadOptions.builder().build(),
9091
LookupOptions.MAX_RETRIES.defaultValue(),
9192
null,
93+
FilterHandlingPolicy.NEVER,
9294
SCHEMA.toPhysicalRowDataType());
9395
assertThat(actualSource).isEqualTo(expectedSource);
9496

@@ -146,6 +148,7 @@ void testJdbcReadProperties() {
146148
readOptions,
147149
LookupOptions.MAX_RETRIES.defaultValue(),
148150
null,
151+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
149152
SCHEMA.toPhysicalRowDataType());
150153

151154
assertThat(actual).isEqualTo(expected);
@@ -174,6 +177,7 @@ void testJdbcLookupProperties() {
174177
JdbcReadOptions.builder().build(),
175178
10,
176179
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
180+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
177181
SCHEMA.toPhysicalRowDataType());
178182

179183
assertThat(actual).isEqualTo(expected);
@@ -202,6 +206,7 @@ void testJdbcLookupPropertiesWithLegacyOptions() {
202206
.maximumSize(1000L)
203207
.expireAfterWrite(Duration.ofSeconds(10))
204208
.build(),
209+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
205210
SCHEMA.toPhysicalRowDataType());
206211

207212
assertThat(actual).isEqualTo(expected);
@@ -387,6 +392,7 @@ void testJdbcLookupPropertiesWithExcludeEmptyResult() {
387392
.maximumSize(1000L)
388393
.expireAfterWrite(Duration.ofSeconds(10))
389394
.build(),
395+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
390396
SCHEMA.toPhysicalRowDataType());
391397

392398
assertThat(actual).isEqualTo(expected);

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ void setup(TestInfo testInfo) {
5252
+ " 'url'='jdbc:derby:memory:test',"
5353
+ " 'table-name'='test_table'"
5454
+ ")");
55+
util.tableEnv()
56+
.executeSql(
57+
"CREATE TABLE jdbc_never_pushdown WITH ('filter.handling.policy' = 'never') LIKE jdbc;");
5558
util.tableEnv()
5659
.executeSql(
5760
"CREATE TABLE d ( "
@@ -98,6 +101,12 @@ void testFilterPushdown() {
98101
"SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
99102
}
100103

104+
@Test
105+
void testNeverFilterPushdown() {
106+
util.verifyExecPlan(
107+
"SELECT id, time_col, real_col FROM jdbc_never_pushdown WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
108+
}
109+
101110
/**
102111
* Note the join condition is not present in the optimized plan, see FLINK-34170, as it is
103112
* handled in the JDBC java code, where it adds the join conditions to the select statement

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.connector.jdbc.core.table.source;
2020

2121
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
2223
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
2324
import org.apache.flink.connector.jdbc.testutils.TableManaged;
2425
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
@@ -195,8 +196,9 @@ public void testLimit() {
195196
.containsAll(collected);
196197
}
197198

198-
@Test
199-
public void testFilter() {
199+
@ParameterizedTest
200+
@EnumSource(FilterHandlingPolicy.class)
201+
void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
200202
String testTable = "testTable";
201203
tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
202204

@@ -210,7 +212,8 @@ public void testFilter() {
210212
"'scan.partition.column'='id'",
211213
"'scan.partition.num'='1'",
212214
"'scan.partition.lower-bound'='1'",
213-
"'scan.partition.upper-bound'='1'")));
215+
"'scan.partition.upper-bound'='1'",
216+
"'filter.handling.policy'='" + filterHandlingPolicy.name() + "'")));
214217

215218
// we create a VIEW here to test column remapping, ie. would filter push down work if we
216219
// create a view that depends on our source table

flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@ LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
6565
<Resource name="optimized exec plan">
6666
<![CDATA[
6767
TableSourceScan(table=[[default_catalog, default_database, jdbc, filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
68+
]]>
69+
</Resource>
70+
</TestCase>
71+
<TestCase name="testNeverFilterPushdown">
72+
<Resource name="sql">
73+
<![CDATA[SELECT id, time_col, real_col FROM jdbc_never_pushdown WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
74+
</Resource>
75+
<Resource name="ast">
76+
<![CDATA[
77+
LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
78+
+- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, -1000.23:DECIMAL(6, 2)))])
79+
+- LogicalTableScan(table=[[default_catalog, default_database, jdbc_never_pushdown]])
80+
]]>
81+
</Resource>
82+
<Resource name="optimized exec plan">
83+
<![CDATA[
84+
Calc(select=[id, time_col, real_col], where=[(((id = 900001) OR (double_col >= -1000.23)) AND ((time_col <> 11:11:11) OR (double_col >= -1000.23)))])
85+
+- TableSourceScan(table=[[default_catalog, default_database, jdbc_never_pushdown, filter=[], project=[id, time_col, real_col, double_col]]], fields=[id, time_col, real_col, double_col])
6886
]]>
6987
</Resource>
7088
</TestCase>

0 commit comments

Comments
 (0)