Skip to content

Commit d556b6e

Browse files
branch-4.0: [Fix](StreamingJob) fix create table issues when create streaming job apache#59828 (apache#59853)
Cherry-picked from apache#59828 Co-authored-by: wudi <wudi@selectdb.com>
1 parent ff175d9 commit d556b6e

File tree

5 files changed

+272
-3
lines changed

5 files changed

+272
-3
lines changed

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import lombok.Getter;
2929
import lombok.NoArgsConstructor;
3030
import lombok.Setter;
31+
import lombok.ToString;
3132
import org.apache.logging.log4j.LogManager;
3233
import org.apache.logging.log4j.Logger;
3334
import org.springframework.web.bind.annotation.RequestBody;
@@ -67,6 +68,7 @@ private Object updateOffset(CommitOffsetRequest offsetRequest) {
6768

6869
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
6970
try {
71+
LOG.info("Committing offset with {}", offsetRequest.toString());
7072
streamingJob.commitOffset(offsetRequest);
7173
return ResponseEntityBuilder.ok("Offset committed successfully");
7274
} catch (Exception e) {
@@ -79,6 +81,7 @@ private Object updateOffset(CommitOffsetRequest offsetRequest) {
7981
@Getter
8082
@Setter
8183
@NoArgsConstructor
84+
@ToString
8285
public static class CommitOffsetRequest {
8386
public long jobId;
8487
public long taskId;

fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.io.IOException;
6060
import java.util.ArrayList;
6161
import java.util.Arrays;
62+
import java.util.Comparator;
6263
import java.util.HashMap;
6364
import java.util.LinkedHashMap;
6465
import java.util.List;
@@ -333,18 +334,42 @@ public static List<CreateTableCommand> generateCreateTableCmds(String targetDb,
333334
return createtblCmds;
334335
}
335336

336-
private static List<Column> getColumns(JdbcClient jdbcClient,
337+
public static List<Column> getColumns(JdbcClient jdbcClient,
337338
String database,
338339
String table,
339340
List<String> primaryKeys) {
340341
List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
341342
columns.forEach(col -> {
343+
Preconditions.checkArgument(!col.getType().isUnsupported(),
344+
"Unsupported column type, table:[%s], column:[%s]", table, col.getName());
345+
if (col.getType().isVarchar()) {
346+
// The length of varchar needs to be multiplied by 3.
347+
int len = col.getType().getLength() * 3;
348+
if (len > ScalarType.MAX_VARCHAR_LENGTH) {
349+
col.setType(ScalarType.createStringType());
350+
} else {
351+
col.setType(ScalarType.createVarcharType(len));
352+
}
353+
}
354+
342355
// string can not to be key
343356
if (primaryKeys.contains(col.getName())
344357
&& col.getDataType() == PrimitiveType.STRING) {
345358
col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
346359
}
347360
});
361+
362+
// sort columns for primary keys
363+
columns.sort(
364+
Comparator
365+
.comparing((Column col) -> !primaryKeys.contains(col.getName()))
366+
.thenComparing(
367+
col -> primaryKeys.contains(col.getName())
368+
? primaryKeys.indexOf(col.getName())
369+
: Integer.MAX_VALUE
370+
)
371+
);
372+
348373
return columns;
349374
}
350375

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.job.util;
19+
20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.catalog.PrimitiveType;
22+
import org.apache.doris.catalog.ScalarType;
23+
import org.apache.doris.datasource.jdbc.client.JdbcClient;
24+
25+
import org.junit.Assert;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.mockito.ArgumentMatchers;
29+
import org.mockito.Mock;
30+
import org.mockito.Mockito;
31+
import org.mockito.MockitoAnnotations;
32+
33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.List;
36+
37+
public class StreamingJobUtilsTest {
38+
39+
@Mock
40+
private JdbcClient jdbcClient;
41+
42+
@Before
43+
public void setUp() {
44+
MockitoAnnotations.initMocks(this);
45+
}
46+
47+
@Test
48+
public void testGetColumnsWithPrimaryKeySorting() throws Exception {
49+
// Prepare test data
50+
String database = "test_db";
51+
String table = "test_table";
52+
List<String> primaryKeys = Arrays.asList("id", "name");
53+
54+
// Create mock columns in random order
55+
List<Column> mockColumns = new ArrayList<>();
56+
mockColumns.add(new Column("age", ScalarType.createType(PrimitiveType.INT)));
57+
mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.BIGINT)));
58+
mockColumns.add(new Column("email", ScalarType.createVarcharType(100)));
59+
mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
60+
mockColumns.add(new Column("address", ScalarType.createVarcharType(200)));
61+
62+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
63+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
64+
65+
// Verify primary keys are at the front in correct order
66+
Assert.assertEquals(5, result.size());
67+
Assert.assertEquals("id", result.get(0).getName());
68+
Assert.assertEquals("name", result.get(1).getName());
69+
// Verify varchar primary key columns have their length multiplied by 3
70+
Column nameColumn = result.get(1);
71+
Assert.assertEquals(150, nameColumn.getType().getLength()); // 50 * 3
72+
// Verify non-primary key columns follow
73+
Assert.assertEquals("age", result.get(2).getName());
74+
Assert.assertEquals("email", result.get(3).getName());
75+
Assert.assertEquals("address", result.get(4).getName());
76+
// Verify non-primary key varchar columns also have their length multiplied by 3
77+
Column emailColumn = result.get(3);
78+
Assert.assertEquals(300, emailColumn.getType().getLength()); // 100 * 3
79+
Column addressColumn = result.get(4);
80+
Assert.assertEquals(600, addressColumn.getType().getLength()); // 200 * 3
81+
}
82+
83+
@Test
84+
public void testGetColumnsWithVarcharTypeConversion() throws Exception {
85+
String database = "test_db";
86+
String table = "test_table";
87+
List<String> primaryKeys = Arrays.asList("id");
88+
89+
List<Column> mockColumns = new ArrayList<>();
90+
mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.INT)));
91+
mockColumns.add(new Column("short_name", ScalarType.createVarcharType(50)));
92+
mockColumns.add(new Column("long_name", ScalarType.createVarcharType(20000)));
93+
94+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
95+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
96+
97+
// Verify varchar length multiplication by 3
98+
Column shortName = result.stream()
99+
.filter(col -> col.getName().equals("short_name"))
100+
.findFirst()
101+
.orElse(null);
102+
Assert.assertNotNull(shortName);
103+
Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3
104+
105+
// Verify long varchar becomes STRING type
106+
Column longName = result.stream()
107+
.filter(col -> col.getName().equals("long_name"))
108+
.findFirst()
109+
.orElse(null);
110+
Assert.assertNotNull(longName);
111+
Assert.assertTrue(longName.getType().isStringType());
112+
}
113+
114+
@Test
115+
public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception {
116+
String database = "test_db";
117+
String table = "test_table";
118+
List<String> primaryKeys = Arrays.asList("id");
119+
120+
List<Column> mockColumns = new ArrayList<>();
121+
mockColumns.add(new Column("id", ScalarType.createStringType()));
122+
mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
123+
124+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
125+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
126+
127+
// Verify string type primary key is converted to varchar
128+
Column idColumn = result.stream()
129+
.filter(col -> col.getName().equals("id"))
130+
.findFirst()
131+
.orElse(null);
132+
Assert.assertNotNull(idColumn);
133+
Assert.assertTrue(idColumn.getType().isVarchar());
134+
Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, idColumn.getType().getLength());
135+
}
136+
137+
@Test
138+
public void testGetColumnsWithEmptyPrimaryKeys() throws Exception {
139+
String database = "test_db";
140+
String table = "test_table";
141+
List<String> primaryKeys = new ArrayList<>();
142+
143+
List<Column> mockColumns = new ArrayList<>();
144+
mockColumns.add(new Column("col1", ScalarType.createType(PrimitiveType.INT)));
145+
mockColumns.add(new Column("col2", ScalarType.createVarcharType(100)));
146+
mockColumns.add(new Column("col3", ScalarType.createType(PrimitiveType.BIGINT)));
147+
148+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
149+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
150+
151+
// Verify columns maintain original order when no primary keys
152+
Assert.assertEquals(3, result.size());
153+
Assert.assertEquals("col1", result.get(0).getName());
154+
Assert.assertEquals("col2", result.get(1).getName());
155+
Assert.assertEquals("col3", result.get(2).getName());
156+
}
157+
158+
@Test
159+
public void testGetColumnsWithMultiplePrimaryKeys() throws Exception {
160+
String database = "test_db";
161+
String table = "test_table";
162+
List<String> primaryKeys = Arrays.asList("pk3", "pk1", "pk2");
163+
164+
List<Column> mockColumns = new ArrayList<>();
165+
mockColumns.add(new Column("data1", ScalarType.createType(PrimitiveType.INT)));
166+
mockColumns.add(new Column("pk1", ScalarType.createType(PrimitiveType.INT)));
167+
mockColumns.add(new Column("data2", ScalarType.createVarcharType(100)));
168+
mockColumns.add(new Column("pk2", ScalarType.createType(PrimitiveType.BIGINT)));
169+
mockColumns.add(new Column("pk3", ScalarType.createType(PrimitiveType.INT)));
170+
mockColumns.add(new Column("data3", ScalarType.createVarcharType(50)));
171+
172+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
173+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
174+
175+
// Verify primary keys are sorted in the order defined in primaryKeys list
176+
Assert.assertEquals(6, result.size());
177+
Assert.assertEquals("pk3", result.get(0).getName());
178+
Assert.assertEquals("pk1", result.get(1).getName());
179+
Assert.assertEquals("pk2", result.get(2).getName());
180+
// Verify non-primary keys follow
181+
Assert.assertEquals("data1", result.get(3).getName());
182+
Assert.assertEquals("data2", result.get(4).getName());
183+
Assert.assertEquals("data3", result.get(5).getName());
184+
}
185+
186+
@Test
187+
public void testGetColumnsWithUnsupportedColumnType() throws Exception {
188+
String database = "test_db";
189+
String table = "test_table";
190+
List<String> primaryKeys = Arrays.asList("id");
191+
192+
List<Column> mockColumns = new ArrayList<>();
193+
mockColumns.add(new Column("id", ScalarType.createType(PrimitiveType.INT)));
194+
mockColumns.add(new Column("unsupported_col", new ScalarType(PrimitiveType.UNSUPPORTED)));
195+
196+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
197+
// This should throw IllegalArgumentException due to unsupported column type
198+
try {
199+
StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
200+
Assert.fail("Expected IllegalArgumentException to be thrown");
201+
} catch (IllegalArgumentException e) {
202+
// Verify the exception message contains expected information
203+
String message = e.getMessage();
204+
Assert.assertTrue(message.contains("Unsupported column type"));
205+
Assert.assertTrue(message.contains("test_table"));
206+
Assert.assertTrue(message.contains("unsupported_col"));
207+
}
208+
}
209+
210+
@Test
211+
public void testGetColumnsWithVarcharPrimaryKeyLengthMultiplication() throws Exception {
212+
String database = "test_db";
213+
String table = "test_table";
214+
List<String> primaryKeys = Arrays.asList("pk_varchar", "pk_int");
215+
216+
List<Column> mockColumns = new ArrayList<>();
217+
mockColumns.add(new Column("pk_int", ScalarType.createType(PrimitiveType.INT)));
218+
mockColumns.add(new Column("pk_varchar", ScalarType.createVarcharType(100)));
219+
mockColumns.add(new Column("normal_varchar", ScalarType.createVarcharType(50)));
220+
221+
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(mockColumns);
222+
List<Column> result = StreamingJobUtils.getColumns(jdbcClient, database, table, primaryKeys);
223+
224+
// Verify varchar primary key column has length multiplied by 3
225+
Column pkVarcharColumn = result.stream()
226+
.filter(col -> col.getName().equals("pk_varchar"))
227+
.findFirst()
228+
.orElse(null);
229+
Assert.assertNotNull(pkVarcharColumn);
230+
Assert.assertEquals(300, pkVarcharColumn.getType().getLength()); // 100 * 3
231+
232+
// Verify normal varchar column also has length multiplied by 3
233+
Column normalVarcharColumn = result.stream()
234+
.filter(col -> col.getName().equals("normal_varchar"))
235+
.findFirst()
236+
.orElse(null);
237+
Assert.assertNotNull(normalVarcharColumn);
238+
Assert.assertEquals(150, normalVarcharColumn.getType().getLength()); // 50 * 3
239+
}
240+
}

regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ timestamp1 datetime Yes false \N NONE
3535
timestamp2 datetime(3) Yes false \N NONE
3636
timestamp3 datetime(6) Yes false \N NONE
3737
char char(5) Yes false \N NONE
38-
varchar varchar(10) Yes false \N NONE
38+
varchar varchar(30) Yes false \N NONE
3939
text text Yes false \N NONE
4040
blob text Yes false \N NONE
4141
json text Yes false \N NONE

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
5858
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
5959
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
6060
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
61+
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table3}"""
6162
sql """CREATE TABLE ${mysqlDb}.${table1} (
6263
`name` varchar(200) NOT NULL,
6364
`age` int DEFAULT NULL,
@@ -109,7 +110,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do
109110
// check table schema correct
110111
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
111112
def createTalInfo = showTbl1[0][1];
112-
assert createTalInfo.contains("`name` varchar(200)");
113+
assert createTalInfo.contains("`name` varchar(600)");
113114
assert createTalInfo.contains("`age` int");
114115
assert createTalInfo.contains("UNIQUE KEY(`name`)");
115116
assert createTalInfo.contains("DISTRIBUTED BY HASH(`name`) BUCKETS AUTO");

0 commit comments

Comments
 (0)