Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion docs/en/connector-v2/source/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ The tested kudu version is 1.11.1.
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
| use_regex | Bool | No | false | Control regular expression matching for `table_name`. When set to `true`, the `table_name` will be treated as a regular expression pattern and can match multiple tables. When set to `false` or not specified, the `table_name` will be treated as an exact table name (no regex matching). |
| filter | String | No | - | Kudu scan filter expressions,example id > 100 AND id < 200. |
| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
| table_list | Array | No | - | The list of tables to be read. you can use this configuration instead of `table_path` example: ```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
| table_list | Array | No | - | The list of tables to be read. you can use this configuration instead of `table_name`, for example: ```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```. You can also configure `use_regex = true` inside each entry to enable regex matching for `table_name`. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |

## Task Example
Expand Down Expand Up @@ -143,6 +144,76 @@ sink {
}
```

### Table Matching With Regex

The Kudu Source supports using regular expressions on `table_name` to match multiple tables (including whole-database style synchronization, since Kudu tables are in a single logical database).

#### Exact Table Name

Use `table_name` to specify a single Kudu table with an exact name:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table_1"
}
}
```

#### Regex Matching

Use `table_name` as a regex pattern and enable `use_regex` to read multiple tables with one configuration:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
# Match tables like kudu_source_table_1, kudu_source_table_2, etc.
table_name = "kudu_source_table_\\d+"
use_regex = true
}
}
```

You can also combine regex entries in `table_list`:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
table_list = [
{
table_name = "kudu_source_table_1"
},
{
table_name = "kudu_source_table_2"
},
{
# Regex matching - any table whose name starts with prefix_ and ends with digits
table_name = "prefix_\\d+"
use_regex = true
}
]
}
}
```

#### Whole-Database Matching

You can also synchronize all tables in the current Kudu cluster (or all business tables in the current instance, if there are no system tables) by using a catch-all regex:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
# Match all tables in the current Kudu cluster
table_name = ".*"
use_regex = true
}
}
```

## Changelog

<ChangeLog />
Expand Down
73 changes: 72 additions & 1 deletion docs/zh/connector-v2/source/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ import ChangeLog from '../changelog/connector-kudu.md';
| kerberos_krb5conf | String | 否 | - | Kerberos krb5 conf。注意所有 zeta 节点都需要有此文件。 |
| scan_token_query_timeout | Long | 否 | 30000 | 连接扫描令牌的超时时间。如果未设置,将与 operationTimeout 相同。 |
| scan_token_batch_size_bytes | Int | 否 | 1024 * 1024 | Kudu 扫描字节数。一次读取的最大字节数,默认为 1MB。 |
| use_regex | Bool | 否 | false | 控制 `table_name` 的正则匹配。当设置为 `true` 时,`table_name` 将被视为正则表达式模式,可以匹配多张表。当设置为 `false` 或未指定时,`table_name` 将被视为精确表名(不进行正则匹配)。 |
| filter | String | 否 | - | Kudu 扫描过滤表达式,例如 id > 100 AND id < 200。 |
| schema | Map | 否 | 1024 * 1024 | SeaTunnel Schema。 |
| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替 `table_path`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替 `table_name`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```。也可以在每个 entry 中配置 `use_regex = true` 来对 `table_name` 启用正则匹配。 |
| common-options | | 否 | - | 源插件通用参数,请参考 [源通用选项](../source-common-options.md) 详见。 |

## 任务示例
Expand Down Expand Up @@ -143,6 +144,76 @@ sink {
}
```

### 使用正则表达式匹配表

Kudu Source 支持在 `table_name` 上使用正则表达式来匹配多张表(由于 Kudu 逻辑上只有一个 database,因此也可以用来实现“整库表”同步)。

#### 精确表名

使用 `table_name` 指定单个 Kudu 表的精确名称:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table_1"
}
}
```

#### 正则匹配

将 `table_name` 视为正则表达式,并开启 `use_regex`,即可用一条配置匹配多张表:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
# 匹配 kudu_source_table_1、kudu_source_table_2 等
table_name = "kudu_source_table_\\d+"
use_regex = true
}
}
```

也可以在 `table_list` 中组合精确表和正则表:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
table_list = [
{
table_name = "kudu_source_table_1"
},
{
table_name = "kudu_source_table_2"
},
{
# 使用正则匹配,以 prefix_ 开头、以数字结尾的所有表
table_name = "prefix_\\d+"
use_regex = true
}
]
}
}
```

#### 整库匹配

如果当前 Kudu 实例中只有业务表,或者你希望“一次性同步所有表”,可以使用一个全匹配的正则:

```hocon
source {
kudu {
kudu_masters = "kudu-master:7051"
# 匹配当前 Kudu 实例中的所有表
table_name = ".*"
use_regex = true
}
}
```

## 变更日志

<ChangeLog />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public class KuduSourceOptions extends KuduBaseOptions {
.withDescription(
"Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB");

public static final Option<Boolean> USE_REGEX =
Options.key("use_regex")
.booleanType()
.defaultValue(false)
.withDescription(
"Control regular expression matching for table_name. When set to true, "
+ "the table_name will be treated as a regular expression pattern. "
+ "When set to false or not specified, the table_name will be treated "
+ "as an exact table name (no regex matching).");

public static final Option<String> FILTER =
Options.key("filter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.kudu.config;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.table.catalog.Catalog;
Expand All @@ -32,8 +30,10 @@
import lombok.Getter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Getter
Expand Down Expand Up @@ -61,15 +61,28 @@ public static List<KuduSourceTableConfig> of(ReadonlyConfig config) {

try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
kuduCatalog.open();

List<ReadonlyConfig> tableConfigs = new ArrayList<>();
if (config.getOptional(ConnectorCommonOptions.TABLE_LIST).isPresent()) {
return config.get(ConnectorCommonOptions.TABLE_LIST).stream()
.map(ReadonlyConfig::fromMap)
.map(readonlyConfig -> parseKuduSourceConfig(readonlyConfig, kuduCatalog))
.collect(Collectors.toList());
tableConfigs =
config.get(ConnectorCommonOptions.TABLE_LIST).stream()
.map(ReadonlyConfig::fromMap)
.collect(Collectors.toList());
} else {
tableConfigs.add(config);
}

List<KuduSourceTableConfig> result = new ArrayList<>();
for (ReadonlyConfig tableConfig : tableConfigs) {
Boolean useRegex = tableConfig.get(KuduSourceOptions.USE_REGEX);
if (useRegex != null && useRegex) {
result.addAll(parseKuduSourceConfigWithRegex(tableConfig, kuduCatalog));
} else {
result.add(parseKuduSourceConfig(tableConfig, kuduCatalog));
}
}
KuduSourceTableConfig kuduSourceTableConfig =
parseKuduSourceConfig(config, kuduCatalog);
return Lists.newArrayList(kuduSourceTableConfig);

return result;
}
}

Expand All @@ -86,4 +99,30 @@ public static KuduSourceTableConfig parseKuduSourceConfig(
return new KuduSourceTableConfig(
tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
}

static List<KuduSourceTableConfig> parseKuduSourceConfigWithRegex(
ReadonlyConfig config, KuduCatalog kuduCatalog) {
String patternString = config.get(KuduBaseOptions.TABLE_NAME);
if (patternString == null) {
throw new IllegalArgumentException(
"When `use_regex` is enabled, `table_name` must be configured");
}

Pattern pattern = Pattern.compile(patternString);

List<String> allTables =
kuduCatalog.listTables(kuduCatalog.getDefaultDatabase()).stream()
.filter(tableName -> pattern.matcher(tableName).matches())
.collect(Collectors.toList());

List<KuduSourceTableConfig> result = new ArrayList<>();
for (String tableName : allTables) {
CatalogTable catalogTable = kuduCatalog.getTable(TablePath.of(tableName));
result.add(
new KuduSourceTableConfig(
tableName, catalogTable, config.get(KuduSourceOptions.FILTER)));
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(KuduSourceOptions.MASTER)
.optional(KuduSourceOptions.SCHEMA)
.optional(KuduSourceOptions.WORKER_COUNT)
.optional(KuduSourceOptions.OPERATION_TIMEOUT)
.optional(KuduSourceOptions.ADMIN_OPERATION_TIMEOUT)
.optional(KuduSourceOptions.QUERY_TIMEOUT)
.optional(KuduSourceOptions.SCAN_BATCH_SIZE_BYTES)
.optional(KuduSourceOptions.FILTER)
.optional(KuduSourceOptions.ENABLE_KERBEROS)
.optional(KuduSourceOptions.KERBEROS_KRB5_CONF)
.optional(
KuduSourceOptions.WORKER_COUNT,
KuduSourceOptions.OPERATION_TIMEOUT,
KuduSourceOptions.ADMIN_OPERATION_TIMEOUT,
KuduSourceOptions.QUERY_TIMEOUT,
KuduSourceOptions.SCAN_BATCH_SIZE_BYTES,
KuduSourceOptions.FILTER,
KuduSourceOptions.USE_REGEX,
KuduSourceOptions.ENABLE_KERBEROS,
KuduSourceOptions.KERBEROS_KRB5_CONF)
.exclusive(KuduSourceOptions.TABLE_NAME, ConnectorCommonOptions.TABLE_LIST)
.conditional(
KuduSourceOptions.ENABLE_KERBEROS,
Expand Down
Loading
Loading