Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit e9b6071

Browse files
yzeng1618zengyi
andauthored
[Feature][connector-kudu] Support regex and whole-database table_name for source (apache#10180)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent b29ef8a commit e9b6071

File tree

10 files changed

+504
-19
lines changed

10 files changed

+504
-19
lines changed

docs/en/connector-v2/source/Kudu.md

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ The tested kudu version is 1.11.1.
5757
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
5858
| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
5959
| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
60+
| use_regex | Bool | No | false | Control regular expression matching for `table_name`. When set to `true`, the `table_name` will be treated as a regular expression pattern and can match multiple tables. When set to `false` or not specified, the `table_name` will be treated as an exact table name (no regex matching). |
6061
| filter | String | No | - | Kudu scan filter expressions,example id > 100 AND id < 200. |
6162
| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
62-
| table_list | Array | No | - | The list of tables to be read. you can use this configuration instead of `table_path` example: ```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
63+
| table_list | Array | No | - | The list of tables to be read. you can use this configuration instead of `table_name`, for example: ```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```. You can also configure `use_regex = true` inside each entry to enable regex matching for `table_name`. |
6364
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
6465

6566
## Task Example
@@ -143,6 +144,76 @@ sink {
143144
}
144145
```
145146

147+
### Table Matching With Regex
148+
149+
The Kudu Source supports using regular expressions on `table_name` to match multiple tables (including whole-database style synchronization, since Kudu tables are in a single logical database).
150+
151+
#### Exact Table Name
152+
153+
Use `table_name` to specify a single Kudu table with an exact name:
154+
155+
```hocon
156+
source {
157+
kudu {
158+
kudu_masters = "kudu-master:7051"
159+
table_name = "kudu_source_table_1"
160+
}
161+
}
162+
```
163+
164+
#### Regex Matching
165+
166+
Use `table_name` as a regex pattern and enable `use_regex` to read multiple tables with one configuration:
167+
168+
```hocon
169+
source {
170+
kudu {
171+
kudu_masters = "kudu-master:7051"
172+
# Match tables like kudu_source_table_1, kudu_source_table_2, etc.
173+
table_name = "kudu_source_table_\\d+"
174+
use_regex = true
175+
}
176+
}
177+
```
178+
179+
You can also combine regex entries in `table_list`:
180+
181+
```hocon
182+
source {
183+
kudu {
184+
kudu_masters = "kudu-master:7051"
185+
table_list = [
186+
{
187+
table_name = "kudu_source_table_1"
188+
},
189+
{
190+
table_name = "kudu_source_table_2"
191+
},
192+
{
193+
# Regex matching - any table whose name starts with prefix_ and ends with digits
194+
table_name = "prefix_\\d+"
195+
use_regex = true
196+
}
197+
]
198+
}
199+
}
200+
```
201+
202+
#### Whole-Database Matching
203+
204+
You can also synchronize all tables in the current Kudu cluster (or all business tables in the current instance, if there are no system tables) by using a catch-all regex:
205+
206+
```hocon
207+
source {
208+
kudu {
209+
kudu_masters = "kudu-master:7051"
210+
# Match all tables in the current Kudu cluster
211+
table_name = ".*"
212+
use_regex = true
213+
}
214+
}
215+
```
216+
146217
## Changelog
147218

148219
<ChangeLog />

docs/zh/connector-v2/source/Kudu.md

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ import ChangeLog from '../changelog/connector-kudu.md';
5757
| kerberos_krb5conf | String || - | Kerberos krb5 conf。注意所有 zeta 节点都需要有此文件。 |
5858
| scan_token_query_timeout | Long || 30000 | 连接扫描令牌的超时时间。如果未设置,将与 operationTimeout 相同。 |
5959
| scan_token_batch_size_bytes | Int || 1024 * 1024 | Kudu 扫描字节数。一次读取的最大字节数,默认为 1MB。 |
60+
| use_regex | Bool || false | 控制 `table_name` 的正则匹配。当设置为 `true` 时,`table_name` 将被视为正则表达式模式,可以匹配多张表。当设置为 `false` 或未指定时,`table_name` 将被视为精确表名(不进行正则匹配)。 |
6061
| filter | String || - | Kudu 扫描过滤表达式,例如 id > 100 AND id < 200。 |
6162
| schema | Map || 1024 * 1024 | SeaTunnel Schema。 |
62-
| table_list | Array || - | 要读取的表列表。您可以使用此配置代替 `table_path`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
63+
| table_list | Array || - | 要读取的表列表。您可以使用此配置代替 `table_name`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```。也可以在每个 entry 中配置 `use_regex = true` 来对 `table_name` 启用正则匹配。 |
6364
| common-options | || - | 源插件通用参数,请参考 [源通用选项](../source-common-options.md) 详见。 |
6465

6566
## 任务示例
@@ -143,6 +144,76 @@ sink {
143144
}
144145
```
145146

147+
### 使用正则表达式匹配表
148+
149+
Kudu Source 支持在 `table_name` 上使用正则表达式来匹配多张表(由于 Kudu 逻辑上只有一个 database,因此也可以用来实现“整库表”同步)。
150+
151+
#### 精确表名
152+
153+
使用 `table_name` 指定单个 Kudu 表的精确名称:
154+
155+
```hocon
156+
source {
157+
kudu {
158+
kudu_masters = "kudu-master:7051"
159+
table_name = "kudu_source_table_1"
160+
}
161+
}
162+
```
163+
164+
#### 正则匹配
165+
166+
`table_name` 视为正则表达式,并开启 `use_regex`,即可用一条配置匹配多张表:
167+
168+
```hocon
169+
source {
170+
kudu {
171+
kudu_masters = "kudu-master:7051"
172+
# 匹配 kudu_source_table_1、kudu_source_table_2 等
173+
table_name = "kudu_source_table_\\d+"
174+
use_regex = true
175+
}
176+
}
177+
```
178+
179+
也可以在 `table_list` 中组合精确表和正则表:
180+
181+
```hocon
182+
source {
183+
kudu {
184+
kudu_masters = "kudu-master:7051"
185+
table_list = [
186+
{
187+
table_name = "kudu_source_table_1"
188+
},
189+
{
190+
table_name = "kudu_source_table_2"
191+
},
192+
{
193+
# 使用正则匹配,以 prefix_ 开头、以数字结尾的所有表
194+
table_name = "prefix_\\d+"
195+
use_regex = true
196+
}
197+
]
198+
}
199+
}
200+
```
201+
202+
#### 整库匹配
203+
204+
如果当前 Kudu 实例中只有业务表,或者你希望“一次性同步所有表”,可以使用一个全匹配的正则:
205+
206+
```hocon
207+
source {
208+
kudu {
209+
kudu_masters = "kudu-master:7051"
210+
# 匹配当前 Kudu 实例中的所有表
211+
table_name = ".*"
212+
use_regex = true
213+
}
214+
}
215+
```
216+
146217
## 变更日志
147218

148219
<ChangeLog />

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public class KuduSourceOptions extends KuduBaseOptions {
3838
.withDescription(
3939
"Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB");
4040

41+
public static final Option<Boolean> USE_REGEX =
42+
Options.key("use_regex")
43+
.booleanType()
44+
.defaultValue(false)
45+
.withDescription(
46+
"Control regular expression matching for table_name. When set to true, "
47+
+ "the table_name will be treated as a regular expression pattern. "
48+
+ "When set to false or not specified, the table_name will be treated "
49+
+ "as an exact table name (no regex matching).");
50+
4151
public static final Option<String> FILTER =
4252
Options.key("filter")
4353
.stringType()

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
1919

20-
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21-
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
2422
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -32,8 +30,10 @@
3230
import lombok.Getter;
3331

3432
import java.io.Serializable;
33+
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.Optional;
36+
import java.util.regex.Pattern;
3737
import java.util.stream.Collectors;
3838

3939
@Getter
@@ -61,15 +61,28 @@ public static List<KuduSourceTableConfig> of(ReadonlyConfig config) {
6161

6262
try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
6363
kuduCatalog.open();
64+
65+
List<ReadonlyConfig> tableConfigs = new ArrayList<>();
6466
if (config.getOptional(ConnectorCommonOptions.TABLE_LIST).isPresent()) {
65-
return config.get(ConnectorCommonOptions.TABLE_LIST).stream()
66-
.map(ReadonlyConfig::fromMap)
67-
.map(readonlyConfig -> parseKuduSourceConfig(readonlyConfig, kuduCatalog))
68-
.collect(Collectors.toList());
67+
tableConfigs =
68+
config.get(ConnectorCommonOptions.TABLE_LIST).stream()
69+
.map(ReadonlyConfig::fromMap)
70+
.collect(Collectors.toList());
71+
} else {
72+
tableConfigs.add(config);
73+
}
74+
75+
List<KuduSourceTableConfig> result = new ArrayList<>();
76+
for (ReadonlyConfig tableConfig : tableConfigs) {
77+
Boolean useRegex = tableConfig.get(KuduSourceOptions.USE_REGEX);
78+
if (useRegex != null && useRegex) {
79+
result.addAll(parseKuduSourceConfigWithRegex(tableConfig, kuduCatalog));
80+
} else {
81+
result.add(parseKuduSourceConfig(tableConfig, kuduCatalog));
82+
}
6983
}
70-
KuduSourceTableConfig kuduSourceTableConfig =
71-
parseKuduSourceConfig(config, kuduCatalog);
72-
return Lists.newArrayList(kuduSourceTableConfig);
84+
85+
return result;
7386
}
7487
}
7588

@@ -86,4 +99,30 @@ public static KuduSourceTableConfig parseKuduSourceConfig(
8699
return new KuduSourceTableConfig(
87100
tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
88101
}
102+
103+
static List<KuduSourceTableConfig> parseKuduSourceConfigWithRegex(
104+
ReadonlyConfig config, KuduCatalog kuduCatalog) {
105+
String patternString = config.get(KuduBaseOptions.TABLE_NAME);
106+
if (patternString == null) {
107+
throw new IllegalArgumentException(
108+
"When `use_regex` is enabled, `table_name` must be configured");
109+
}
110+
111+
Pattern pattern = Pattern.compile(patternString);
112+
113+
List<String> allTables =
114+
kuduCatalog.listTables(kuduCatalog.getDefaultDatabase()).stream()
115+
.filter(tableName -> pattern.matcher(tableName).matches())
116+
.collect(Collectors.toList());
117+
118+
List<KuduSourceTableConfig> result = new ArrayList<>();
119+
for (String tableName : allTables) {
120+
CatalogTable catalogTable = kuduCatalog.getTable(TablePath.of(tableName));
121+
result.add(
122+
new KuduSourceTableConfig(
123+
tableName, catalogTable, config.get(KuduSourceOptions.FILTER)));
124+
}
125+
126+
return result;
127+
}
89128
}

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ public OptionRule optionRule() {
4646
return OptionRule.builder()
4747
.required(KuduSourceOptions.MASTER)
4848
.optional(KuduSourceOptions.SCHEMA)
49-
.optional(KuduSourceOptions.WORKER_COUNT)
50-
.optional(KuduSourceOptions.OPERATION_TIMEOUT)
51-
.optional(KuduSourceOptions.ADMIN_OPERATION_TIMEOUT)
52-
.optional(KuduSourceOptions.QUERY_TIMEOUT)
53-
.optional(KuduSourceOptions.SCAN_BATCH_SIZE_BYTES)
54-
.optional(KuduSourceOptions.FILTER)
55-
.optional(KuduSourceOptions.ENABLE_KERBEROS)
56-
.optional(KuduSourceOptions.KERBEROS_KRB5_CONF)
49+
.optional(
50+
KuduSourceOptions.WORKER_COUNT,
51+
KuduSourceOptions.OPERATION_TIMEOUT,
52+
KuduSourceOptions.ADMIN_OPERATION_TIMEOUT,
53+
KuduSourceOptions.QUERY_TIMEOUT,
54+
KuduSourceOptions.SCAN_BATCH_SIZE_BYTES,
55+
KuduSourceOptions.FILTER,
56+
KuduSourceOptions.USE_REGEX,
57+
KuduSourceOptions.ENABLE_KERBEROS,
58+
KuduSourceOptions.KERBEROS_KRB5_CONF)
5759
.exclusive(KuduSourceOptions.TABLE_NAME, ConnectorCommonOptions.TABLE_LIST)
5860
.conditional(
5961
KuduSourceOptions.ENABLE_KERBEROS,

0 commit comments

Comments
 (0)