Skip to content
24 changes: 24 additions & 0 deletions docs/en/introduction/concepts/incompatible-changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ You need to check this document before you upgrade to related version.

### API Changes

- **Breaking Change: Engine REST table metrics key format**
- **Affected component**: SeaTunnel Engine REST API (job metrics in `/job-info`)
- **Description**: To support multiple Sources/Sinks/Transforms processing the same table, the key format of table-level metrics has changed from `{tableName}` to `{VertexIdentifier}.{tableName}` (for example, `Sink[0].fake.user_table`).
- **Impact**: Existing Grafana dashboards, Prometheus alert rules, and custom monitoring integrations that reference the old keys must be updated.

**Before**
```json
{
"TableSinkWriteCount": {
"fake.user_table": "15"
}
}
```

**After**
```json
{
"TableSinkWriteCount": {
"Sink[0].fake.user_table": "10",
"Sink[1].fake.user_table": "5"
}
}
```

### Configuration Changes

### Connector Changes
Expand Down
24 changes: 24 additions & 0 deletions docs/zh/introduction/concepts/incompatible-changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@

### API 变更

- **破坏性变更:Engine REST 表级指标 key 格式变化**
- **影响范围**:SeaTunnel Engine REST API(`/job-info` 返回的 job metrics 中的表级指标)
- **变更说明**:为支持多个 Source/Sink/Transform 同时处理同一张表,表级指标的 key 格式从 `{tableName}` 变更为 `{VertexIdentifier}.{tableName}`(例如 `Sink[0].fake.user_table`)。
- **影响**:依赖旧 key 的 Grafana 仪表盘、Prometheus 告警规则以及自定义监控解析逻辑需要同步修改,否则升级后会出现指标查询/告警静默失效。

**变更前**
```json
{
"TableSinkWriteCount": {
"fake.user_table": "15"
}
}
```

**变更后**
```json
{
"TableSinkWriteCount": {
"Sink[0].fake.user_table": "10",
"Sink[1].fake.user_table": "5"
}
}
```

### 配置变更

### 连接器变更
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ public void testCommittedMetricsWithCheckpoint() throws Exception {
Assertions.assertTrue(committedBytesPerSec > 0);

String table1CommittedCount =
responseFinal.path("metrics.TableSinkCommittedCount.'fake.table1'");
responseFinal.path("metrics.TableSinkCommittedCount.'Sink[0].fake.table1'");
String table2CommittedCount =
responseFinal.path("metrics.TableSinkCommittedCount.'fake.public.table2'");
responseFinal.path("metrics.TableSinkCommittedCount.'Sink[1].fake.public.table2'");
Assertions.assertNotNull(table1CommittedCount);
Assertions.assertNotNull(table2CommittedCount);

Expand All @@ -225,9 +225,9 @@ public void testCommittedMetricsWithCheckpoint() throws Exception {
Assertions.assertEquals(finalCommitted, table1Committed + table2Committed);

String table1CommittedBytes =
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.table1'");
responseFinal.path("metrics.TableSinkCommittedBytes.'Sink[0].fake.table1'");
String table2CommittedBytes =
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.public.table2'");
responseFinal.path("metrics.TableSinkCommittedBytes.'Sink[1].fake.public.table2'");
Assertions.assertNotNull(table1CommittedBytes);
Assertions.assertNotNull(table2CommittedBytes);

Expand All @@ -236,21 +236,22 @@ public void testCommittedMetricsWithCheckpoint() throws Exception {

Double table1CommittedQPS =
Double.parseDouble(
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.table1'"));
responseFinal.path("metrics.TableSinkCommittedQPS.'Sink[0].fake.table1'"));
Double table2CommittedQPS =
Double.parseDouble(
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.public.table2'"));
responseFinal.path(
"metrics.TableSinkCommittedQPS.'Sink[1].fake.public.table2'"));
Assertions.assertTrue(table1CommittedQPS > 0);
Assertions.assertTrue(table2CommittedQPS > 0);

Double table1CommittedBytesPerSec =
Double.parseDouble(
responseFinal.path(
"metrics.TableSinkCommittedBytesPerSeconds.'fake.table1'"));
"metrics.TableSinkCommittedBytesPerSeconds.'Sink[0].fake.table1'"));
Double table2CommittedBytesPerSec =
Double.parseDouble(
responseFinal.path(
"metrics.TableSinkCommittedBytesPerSeconds.'fake.public.table2'"));
"metrics.TableSinkCommittedBytesPerSeconds.'Sink[1].fake.public.table2'"));
Assertions.assertTrue(table1CommittedBytesPerSec > 0);
Assertions.assertTrue(table2CommittedBytesPerSec > 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ public void multiTableMetrics() {
.body("metrics.SourceReceivedCount", equalTo("15"))
.body("metrics.SinkWriteCount", equalTo("15"))
.body(
"metrics.TableSourceReceivedCount.'fake.table1'",
"metrics.TableSourceReceivedCount.'Source[0].fake.table1'",
equalTo("10"))
.body(
"metrics.TableSourceReceivedCount.'fake.public.table2'",
"metrics.TableSourceReceivedCount.'Source[1].fake.public.table2'",
equalTo("5"))
.body(
"metrics.TableSinkWriteCount.'fake.table1'",
"metrics.TableSinkWriteCount.'Sink[0].fake.table1'",
equalTo("10"))
.body(
"metrics.TableSinkWriteCount.'fake.public.table2'",
"metrics.TableSinkWriteCount.'Sink[1].fake.public.table2'",
equalTo("5"))
.body(
"metrics.SourceReceivedBytes",
Expand All @@ -122,62 +122,62 @@ public void multiTableMetrics() {
"metrics.SinkWriteBytes",
equalTo(String.valueOf(dataSize * 15)))
.body(
"metrics.TableSourceReceivedBytes.'fake.table1'",
"metrics.TableSourceReceivedBytes.'Source[0].fake.table1'",
equalTo(String.valueOf(dataSize * 10)))
.body(
"metrics.TableSourceReceivedBytes.'fake.public.table2'",
"metrics.TableSourceReceivedBytes.'Source[1].fake.public.table2'",
equalTo(String.valueOf(dataSize * 5)))
.body(
"metrics.TableSinkWriteBytes.'fake.table1'",
"metrics.TableSinkWriteBytes.'Sink[0].fake.table1'",
equalTo(String.valueOf(dataSize * 10)))
.body(
"metrics.TableSinkWriteBytes.'fake.public.table2'",
"metrics.TableSinkWriteBytes.'Sink[1].fake.public.table2'",
equalTo(String.valueOf(dataSize * 5)));
Assertions.assertTrue(
Double.parseDouble(response.path("metrics.SourceReceivedQPS"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSourceReceivedQPS.'fake.table1'"))
"metrics.TableSourceReceivedQPS.'Source[0].fake.table1'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSourceReceivedQPS.'fake.public.table2'"))
"metrics.TableSourceReceivedQPS.'Source[1].fake.public.table2'"))
> 0
&& Double.parseDouble(
response.path("metrics.SinkWriteQPS"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSinkWriteQPS.'fake.table1'"))
"metrics.TableSinkWriteQPS.'Sink[0].fake.table1'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSinkWriteQPS.'fake.public.table2'"))
"metrics.TableSinkWriteQPS.'Sink[1].fake.public.table2'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.SourceReceivedBytesPerSeconds"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSourceReceivedBytesPerSeconds.'fake.table1'"))
"metrics.TableSourceReceivedBytesPerSeconds.'Source[0].fake.table1'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSourceReceivedBytesPerSeconds.'fake.public.table2'"))
"metrics.TableSourceReceivedBytesPerSeconds.'Source[1].fake.public.table2'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.SinkWriteBytesPerSeconds"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSinkWriteBytesPerSeconds.'fake.table1'"))
"metrics.TableSinkWriteBytesPerSeconds.'Sink[0].fake.table1'"))
> 0
&& Double.parseDouble(
response.path(
"metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'"))
"metrics.TableSinkWriteBytesPerSeconds.'Sink[1].fake.public.table2'"))
> 0);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,10 @@ public void testGetJobInfoByJobId() {
"jobDag.vertexInfoMap[1].tablePaths[0]",
equalTo("fake"))
.body(
"metrics.TableSourceReceivedCount.fake",
"metrics.TableSourceReceivedCount.'Source[0].fake'",
equalTo("5"))
.body(
"metrics.TableSinkWriteCount.fake",
"metrics.TableSinkWriteCount.'Sink[0].fake'",
equalTo("5"))
.body("metrics.SinkWriteCount", equalTo("5"))
.body("metrics.SourceReceivedCount", equalTo("5"))
Expand Down Expand Up @@ -634,10 +634,10 @@ public void testGetJobInfoByJobId() {
"jobDag.vertexInfoMap[1].tablePaths[0]",
equalTo("fake"))
.body(
"metrics.TableSourceReceivedCount.fake",
"metrics.TableSourceReceivedCount.'Source[0].fake'",
equalTo("5"))
.body(
"metrics.TableSinkWriteCount.fake",
"metrics.TableSinkWriteCount.'Sink[0].fake'",
equalTo("5"))
.body("metrics.SinkWriteCount", equalTo("5"))
.body("metrics.SourceReceivedCount", equalTo("5"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public JobConfigParser(
this.isStartWithSavePoint = isStartWithSavePoint;
}

static String createSourceActionName(int configIndex, String pluginName) {
public static String createSourceActionName(int configIndex, String pluginName) {
return String.format("Source[%s]-%s", configIndex, pluginName);
}

static String createSinkActionName(int configIndex, String pluginName, String table) {
public static String createSinkActionName(int configIndex, String pluginName, String table) {
return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
}

static String createTransformActionName(int configIndex, String pluginName) {
public static String createTransformActionName(int configIndex, String pluginName) {
return String.format("Transform[%s]-%s", configIndex, pluginName);
Comment on lines +49 to 58
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods createSourceActionName, createSinkActionName, and createTransformActionName are changed from package-private to public visibility. While this change enables their use in tests, there is no documentation explaining their purpose, parameters, or expected usage. Consider adding JavaDoc comments to these newly public methods to help external callers understand how to use them correctly, especially since they are now part of the public API of the class.

Copilot uses AI. Check for mistakes.
}
}
Loading