-
Couldn't load subscription status.
- Fork 2.1k
[FLINK-38247] Handle BIGINT UNSIGNED overflow in PreparedStatement #4117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ | |
| import org.apache.flink.table.catalog.ObjectPath; | ||
| import org.apache.flink.table.catalog.ResolvedSchema; | ||
| import org.apache.flink.table.catalog.UniqueConstraint; | ||
| import org.apache.flink.table.data.DecimalData; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; | ||
| import org.apache.flink.table.types.DataType; | ||
|
|
@@ -1447,6 +1448,98 @@ private boolean hasNextData(final CloseableIterator<?> iterator) | |
| } | ||
| } | ||
|
|
||
| @Test | ||
| void testUnsignedBigintPrimaryKeyChunking() throws Exception { | ||
| customDatabase.createAndInitialize(); | ||
|
|
||
| String db = customDatabase.getDatabaseName(); | ||
| String table = "unsigned_bigint_pk"; | ||
| try (MySqlConnection connection = getConnection()) { | ||
| connection.setAutoCommit(false); | ||
| String createSql = | ||
| String.format( | ||
| "CREATE TABLE %s.%s (\n" | ||
| + " `order_id` BIGINT UNSIGNED NOT NULL,\n" | ||
| + " `desc` VARCHAR(512) NOT NULL,\n" | ||
| + " PRIMARY KEY (`order_id`)\n" | ||
| + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;", | ||
| StatementUtils.quote(db), StatementUtils.quote(table)); | ||
| // Insert sample data including values near UNSIGNED BIGINT max | ||
| String insertSql = | ||
| String.format( | ||
| "INSERT INTO %s.%s (`order_id`, `desc`) VALUES " | ||
| + "(1, 'flink'),(2, 'flink'),(3, 'flink'),(4, 'flink'),(5, 'flink')," | ||
| + "(6, 'flink'),(7, 'flink'),(8, 'flink'),(9, 'flink'),(10, 'flink')," | ||
| + "(11, 'flink'),(12, 'flink')," | ||
| + "(18446744073709551604, 'flink'),(18446744073709551605, 'flink')," | ||
| + "(18446744073709551606, 'flink'),(18446744073709551607, 'flink')," | ||
| + "(18446744073709551608, 'flink'),(18446744073709551609, 'flink')," | ||
| + "(18446744073709551610, 'flink'),(18446744073709551611, 'flink')," | ||
| + "(18446744073709551612, 'flink'),(18446744073709551613, 'flink')," | ||
| + "(18446744073709551614, 'flink'),(18446744073709551615, 'flink');", | ||
| StatementUtils.quote(db), StatementUtils.quote(table)); | ||
| // Drop if exists to be idempotent across runs, then create and insert | ||
| connection.execute( | ||
| String.format( | ||
| "DROP TABLE IF EXISTS %s.%s;", | ||
| StatementUtils.quote(db), StatementUtils.quote(table)), | ||
| createSql, | ||
| insertSql); | ||
| connection.commit(); | ||
| } | ||
|
|
||
| // Build a source reading only the unsigned_bigint_pk table | ||
| DataType dataType = | ||
| DataTypes.ROW( | ||
| DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)), | ||
| DataTypes.FIELD("desc", DataTypes.STRING())); | ||
| LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); | ||
| InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType); | ||
| RowDataDebeziumDeserializeSchema deserializer = | ||
| RowDataDebeziumDeserializeSchema.newBuilder() | ||
| .setPhysicalRowType((RowType) dataType.getLogicalType()) | ||
| .setResultTypeInfo(typeInfo) | ||
| .build(); | ||
|
|
||
| MySqlSource<RowData> source = | ||
| MySqlSource.<RowData>builder() | ||
| .hostname(MYSQL_CONTAINER.getHost()) | ||
| .port(MYSQL_CONTAINER.getDatabasePort()) | ||
| .username(customDatabase.getUsername()) | ||
| .password(customDatabase.getPassword()) | ||
| .serverTimeZone("UTC") | ||
| .databaseList(db) | ||
| .tableList(db + "." + table) | ||
| .deserializer(deserializer) | ||
| .startupOptions(StartupOptions.initial()) | ||
| .chunkKeyColumn(new ObjectPath(db, table), "order_id") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You add Which means that this problem was not resolved. |
||
| .build(); | ||
|
|
||
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
| env.setParallelism(1); | ||
| try (CloseableIterator<RowData> it = | ||
| env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source") | ||
| .executeAndCollect()) { | ||
| // Expect 24 records as inserted above | ||
| List<String> result = fetchRowData(it, 24, this::stringifyUnsignedPkRow); | ||
| // Validate a couple of boundary values exist to ensure chunking across unsigned range | ||
| // works | ||
| assertThat(result) | ||
| .contains( | ||
| "+I[1, flink]", | ||
| "+I[12, flink]", | ||
| "+I[18446744073709551604, flink]", | ||
| "+I[18446744073709551615, flink]"); | ||
| } | ||
| } | ||
|
|
||
| private String stringifyUnsignedPkRow(RowData row) { | ||
| DecimalData decimal = row.getDecimal(0, 20, 0); | ||
| String orderId = decimal.toBigDecimal().toPlainString(); | ||
| String desc = row.getString(1).toString(); | ||
| return "+I[" + orderId + ", " + desc + "]"; | ||
| } | ||
|
|
||
| /** | ||
| * A {@link DebeziumDeserializationSchema} implementation which sleep given milliseconds after | ||
| * deserialize per record, this class is designed for test. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.cdc.connectors.mysql.source.utils; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import java.lang.reflect.Proxy; | ||
| import java.math.BigInteger; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.SQLException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| /** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */ | ||
| class StatementUtilsTest { | ||
|
|
||
| @Test | ||
| void testSetSafeObjectCorrectlyHandlesOverflow() throws SQLException { | ||
| Map<String, Object> invocationDetails = new HashMap<>(); | ||
| PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); | ||
|
|
||
| long overflowValue = Long.MAX_VALUE + 1L; | ||
| BigInteger expectedBigInteger = new BigInteger(Long.toUnsignedString(overflowValue)); | ||
|
|
||
| // Use the safe method | ||
| StatementUtils.setSafeObject(psProxy, 1, overflowValue); | ||
|
|
||
| // Assert that it correctly used setObject with the converted BigInteger value | ||
| assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); | ||
| assertThat(invocationDetails.get("value")).isInstanceOf(BigInteger.class); | ||
| assertThat(invocationDetails.get("value")).isEqualTo(expectedBigInteger); | ||
| } | ||
|
|
||
| @Test | ||
| void testDirectSetObjectFailsOnOverflow() throws SQLException { | ||
| Map<String, Object> invocationDetails = new HashMap<>(); | ||
| PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); | ||
|
|
||
| long overflowValue = Long.MAX_VALUE + 1L; | ||
|
|
||
| // Directly call the unsafe method on the proxy | ||
| psProxy.setObject(1, overflowValue); | ||
|
|
||
| // Assert that it incorrectly used setObject, preserving the wrong negative long value | ||
| assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); | ||
| assertThat(invocationDetails.get("value")).isInstanceOf(Long.class); | ||
| assertThat(invocationDetails.get("value")).isEqualTo(Long.MIN_VALUE); | ||
| } | ||
|
|
||
| @Test | ||
| void testSetSafeObjectHandlesRegularValues() throws SQLException { | ||
| Map<String, Object> invocationDetails = new HashMap<>(); | ||
| PreparedStatement psProxy = createPreparedStatementProxy(invocationDetails); | ||
|
|
||
| // Test with a common Long | ||
| StatementUtils.setSafeObject(psProxy, 1, 123L); | ||
| assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); | ||
| assertThat(invocationDetails.get("value")).isEqualTo(123L); | ||
| invocationDetails.clear(); | ||
|
|
||
| // Test with a String | ||
| StatementUtils.setSafeObject(psProxy, 2, "test"); | ||
| assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); | ||
| assertThat(invocationDetails.get("value")).isEqualTo("test"); | ||
| invocationDetails.clear(); | ||
|
|
||
| // Test with null | ||
| StatementUtils.setSafeObject(psProxy, 3, null); | ||
| assertThat(invocationDetails.get("methodName")).isEqualTo("setObject"); | ||
| assertThat(invocationDetails.get("value")).isNull(); | ||
| invocationDetails.clear(); | ||
| } | ||
|
|
||
| private PreparedStatement createPreparedStatementProxy(Map<String, Object> invocationDetails) { | ||
| return (PreparedStatement) | ||
| Proxy.newProxyInstance( | ||
| StatementUtilsTest.class.getClassLoader(), | ||
| new Class<?>[] {PreparedStatement.class}, | ||
| (proxy, method, args) -> { | ||
| String methodName = method.getName(); | ||
| if (methodName.equals("setObject")) { | ||
| invocationDetails.put("methodName", methodName); | ||
| invocationDetails.put("parameterIndex", args[0]); | ||
| invocationDetails.put("value", args[1]); | ||
| } | ||
| return null; | ||
| }); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be BigInteger type, you can verify this in itcase.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of sql lists for possible test case:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lvyanquan
Hello! I added itcase in MySqlSourceITCase.
I think JDBC may surface BIGINT UNSIGNED as Long (values > Long.MAX_VALUE appear negative due to two’s complement)
So In this code, 1) detect negative Longs 2) bind them as BigInteger, ensuring values near 2^64−1 are handled correctly.
I Added an IT in MySqlSourceITCase that creates unsigned_bigint_pk and verifies boundary values. And I saw that the test worked well.