Skip to content

[Feature][Connectors-v2] kafka support multiple table sink#10154

Open
misi1987107 wants to merge 16 commits intoapache:devfrom
misi1987107:feature-kafkasink-multipletable
Open

[Feature][Connectors-v2] kafka support multiple table sink#10154
misi1987107 wants to merge 16 commits intoapache:devfrom
misi1987107:feature-kafkasink-multipletable

Conversation

@misi1987107
Copy link
Contributor

Purpose of this pull request

#5652

kafka support multiple table sink

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

davidzollo
davidzollo previously approved these changes Dec 10, 2025
Copy link
Contributor

@davidzollo davidzollo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
LGTM

@corgy-w
Copy link
Contributor

corgy-w commented Dec 10, 2025

@misi1987107 retry ci


@Slf4j
public class KafkaMultiTableSinkIT extends TestSuiteBase implements TestResource {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already KafkaIT in e2e, why add a new KafkaMultiTableSinkIT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,I can merge it

### 多表写入

Kafka Sink 支持将多个表的数据写入到不同的 Kafka topic。当上游数据源产生多个表的数据时,可以在 `topic` 配置中使用 `${table_name}` 占位符,根据表名动态路由数据到对应的 topic。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${database_name} and ${schema_name} should also be supported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ${datasample_name} ${schema_name} variable is already supported, I will modify the document

Copy link
Collaborator

@chl-wxp chl-wxp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@LiJie20190102 LiJie20190102 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@misi1987107
Copy link
Contributor Author

misi1987107 commented Jan 13, 2026

@davidzollo Could you please review it, Thanks

# Conflicts:
#	docs/en/connectors/sink/Kafka.md
#	docs/zh/connectors/sink/Kafka.md
misi added 2 commits February 6, 2026 09:32
# Conflicts:
#	seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
Copy link
Collaborator

@chl-wxp chl-wxp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines 332 to +333
String jobStatus = container.getJobStatus(String.valueOf(jobId));
Assertions.assertEquals("CANCELED", jobStatus);
Assertions.assertEquals("CANCELING", jobStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the state of the original test class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be transitioned from CANCELING to CANCELED
related #9995

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026-02-09T04:29:04.3693872Z Caused by: org.opentest4j.AssertionFailedError: expected: CANCELED but was: CANCELING
2026-02-09T04:29:04.3694221Z at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
2026-02-09T04:29:04.3694564Z at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
2026-02-09T04:29:04.3694780Z at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
2026-02-09T04:29:04.3695186Z at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
2026-02-09T04:29:04.3695447Z at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
2026-02-09T04:29:04.3695648Z at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
2026-02-09T04:29:04.3696146Z at org.apache.seatunnel.e2e.connector.kafka.KafkaKerberosIT.lambda$testNotKerberosConfig$4(KafkaKerberosIT.java:333)
2026-02-09T04:29:04.3696400Z at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
2026-02-09T04:29:04.3696814Z at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
2026-02-09T04:29:04.3697143Z at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
2026-02-09T04:29:04.3697305Z at java.util.concurrent.FutureTask.run(FutureTask.java:266)
2026-02-09T04:29:04.3697659Z at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
2026-02-09T04:29:04.3697899Z at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
2026-02-09T04:29:04.3697999Z at java.lang.Thread.run(Thread.java:750)

davidzollo

This comment was marked as outdated.

@corgy-w corgy-w changed the title [feature] kafka support multiple table sink [Feature][Connectors-v2] kafka support multiple table sink Feb 6, 2026
@davidzollo davidzollo dismissed their stale review February 7, 2026 09:32

need dismiss

@github-actions github-actions bot removed the approved label Feb 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants