Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
## Key Features

- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [support multiple table write](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

> By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
Expand Down Expand Up @@ -304,6 +305,75 @@ The input parameter requirements are as follows:
```
Note:key/value is of type byte[].

### Multiple Table Write

Kafka Sink supports writing data from multiple tables to different Kafka topics. When the upstream source generates data from multiple tables, you can use the `${table_name}` placeholder in the `topic` configuration to dynamically route data to the corresponding topic based on the table name.

#### Configuration Example

```hocon
env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
row.num = 100
schema = {
table = "test_topic_1"
fields {
c_string = string
c_int = int
c_bigint = bigint
}
}
},
{
row.num = 200
schema = {
table = "test_topic_2"
fields {
c_string = string
c_double = double
c_timestamp = timestamp
}
}
}
]
}
}

sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "${table_name}"
format = json
}
}
```

In this example:
- FakeSource generates data for two tables: `test_topic_1` (100 rows) and `test_topic_2` (200 rows)
- The `topic = "${table_name}"` configuration dynamically routes data to Kafka topics based on the source table name
- Data from `test_topic_1` table will be written to `test_topic_1` Kafka topic
- Data from `test_topic_2` table will be written to `test_topic_2` Kafka topic

You can also use `partition_key_fields` with multiple table write:

```hocon
sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "${table_name}"
format = json
partition_key_fields = ["c_string"]
}
}
```

## Changelog

<ChangeLog />
2 changes: 1 addition & 1 deletion docs/zh/concept/connector-v2-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Sink connector有一些公共的核心特性,每个sink connector在不同程

如果sink connector支持基于主键写入行类型(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE),我们认为它支持cdc(更改数据捕获,change data capture)。

### 支持多表读取
### 支持多表写入

支持在一个 SeaTunnel 作业中写入多个表,用户可以通过[配置占位符](./sink-options-placeholders.md)动态指定表的标识符。

Expand Down
70 changes: 70 additions & 0 deletions docs/zh/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
## 主要特性

- [x] [精确一次](../../concept/connector-v2-features.md)
- [x] [支持多表写入](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

> 默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka
Expand Down Expand Up @@ -306,6 +307,75 @@ sink {
```
Note:key/value 需要 byte[]类型.

### 多表写入

Kafka Sink 支持将多个表的数据写入到不同的 Kafka topic。当上游数据源产生多个表的数据时,可以在 `topic` 配置中使用 `${table_name}` 占位符,根据表名动态路由数据到对应的 topic。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${database_name} and ${schema_name} should also be supported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ${datasample_name} ${schema_name} variable is already supported, I will modify the document

#### 配置示例

```hocon
env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
row.num = 100
schema = {
table = "test_topic_1"
fields {
c_string = string
c_int = int
c_bigint = bigint
}
}
},
{
row.num = 200
schema = {
table = "test_topic_2"
fields {
c_string = string
c_double = double
c_timestamp = timestamp
}
}
}
]
}
}

sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "${table_name}"
format = json
}
}
```

在此示例中:
- FakeSource 为两个表生成数据:`test_topic_1`(100行)和 `test_topic_2`(200行)
- `topic = "${table_name}"` 配置根据源表名动态路由数据到 Kafka topic
- 来自 `test_topic_1` 表的数据将写入 `test_topic_1` Kafka topic
- 来自 `test_topic_2` 表的数据将写入 `test_topic_2` Kafka topic

你也可以在多表写入时使用 `partition_key_fields`:

```hocon
sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "${table_name}"
format = json
partition_key_fields = ["c_string"]
}
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.seatunnel.api.sink.MultiTableResourceManager;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

/**
* Multi-table resource manager for Kafka sink.
*
* <p>Manages the shared KafkaProducerManager across multiple table writers.
*/
@AllArgsConstructor
@Slf4j
public class KafkaMultiTableResourceManager
implements MultiTableResourceManager<KafkaProducerManager> {

private final KafkaProducerManager producerManager;

@Override
public Optional<KafkaProducerManager> getSharedResource() {
return Optional.of(producerManager);
}

@Override
public void close() {
log.info("Closing Kafka multi-table resource manager");
if (producerManager != null) {
producerManager.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

/**
* Manages shared KafkaProducer instances for multi-table sink.
*
* <p>For non-transaction mode: All writers share a single KafkaProducer instance.
*
* <p>For exactly-once mode: Each queue index has its own KafkaProducer with unique
* transactional.id.
*/
@Slf4j
@Getter
public class KafkaProducerManager {

private final Properties kafkaProperties;
private final boolean isExactlyOnce;

/** Shared producer for non-transaction mode */
private volatile KafkaProducer<byte[], byte[]> sharedProducer;

/** Transaction producers for exactly-once mode, keyed by queue index */
private final Map<Integer, KafkaProducer<byte[], byte[]>> transactionProducers;

public KafkaProducerManager(Properties kafkaProperties, boolean isExactlyOnce) {
this.kafkaProperties = kafkaProperties;
this.isExactlyOnce = isExactlyOnce;
this.transactionProducers = new ConcurrentHashMap<>();

if (!isExactlyOnce) {
log.info("Creating shared KafkaProducer for non-transaction mode");
this.sharedProducer = new KafkaProducer<>(kafkaProperties);
}
}

/**
* Get a KafkaProducer instance.
*
* @param queueIndex the queue index for transaction mode
* @param transactionPrefix the transaction id prefix for exactly-once mode
* @return KafkaProducer instance
*/
public KafkaProducer<byte[], byte[]> getProducer(int queueIndex, String transactionPrefix) {
if (!isExactlyOnce) {
return sharedProducer;
}

return transactionProducers.computeIfAbsent(
queueIndex,
idx -> {
Properties props = new Properties();
props.putAll(kafkaProperties);
String transactionalId = transactionPrefix + "-shared-" + idx;
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
log.info(
"Creating transactional KafkaProducer for queue index {} with transactional.id: {}",
idx,
transactionalId);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
producer.initTransactions();
return producer;
});
}

/**
* Check if the producer for the given queue index exists.
*
* @param queueIndex the queue index
* @return true if the producer exists
*/
public boolean containsProducer(int queueIndex) {
if (!isExactlyOnce) {
return sharedProducer != null;
}
return transactionProducers.containsKey(queueIndex);
}

/** Close all KafkaProducer instances. */
public void close() {
if (sharedProducer != null) {
log.info("Closing shared KafkaProducer");
try {
sharedProducer.flush();
sharedProducer.close();
} catch (Exception e) {
log.warn("Failed to close shared KafkaProducer", e);
}
sharedProducer = null;
}

transactionProducers.forEach(
(idx, producer) -> {
log.info("Closing transactional KafkaProducer for queue index {}", idx);
try {
producer.flush();
producer.close();
} catch (Exception e) {
log.warn(
"Failed to close transactional KafkaProducer for queue index {}",
idx,
e);
}
});
transactionProducers.clear();
}
}
Loading