Skip to content

Commit bd2aa0a

Browse files
yuxiqianMOBIN-F
andauthored
[FLINK-35985][transform] Correct the substring function in transform rule
This closes #3702. Co-authored-by: MOBIN <[email protected]>
1 parent 908949b commit bd2aa0a

File tree

5 files changed

+86
-13
lines changed

5 files changed

+86
-13
lines changed

docs/content.zh/docs/core-concept/transform.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
126126
| LOWER(string) | lower(string) | Returns string in lowercase. |
127127
| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. |
128128
| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
129-
| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). |
129+
| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). |
130+
| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). |
130131
| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. |
131132

132133
## Temporal Functions

docs/content/docs/core-concept/transform.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,17 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
118118

119119
## String Functions
120120

121-
| Function | Janino Code | Description |
122-
| -------------------- | ------------------------ | ------------------------------------------------- |
123-
| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. |
124-
| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. |
125-
| UPPER(string) | upper(string) | Returns string in uppercase. |
126-
| LOWER(string) | lower(string) | Returns string in lowercase. |
127-
| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. |
121+
| Function | Janino Code | Description |
122+
| -------------------- | ------------------------ |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
123+
| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. |
124+
| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. |
125+
| UPPER(string) | upper(string) | Returns string in uppercase. |
126+
| LOWER(string) | lower(string) | Returns string in lowercase. |
127+
| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. |
128128
| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
129-
| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). |
130-
| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. |
129+
| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). |
130+
| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). |
131+
| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. |
131132

132133
## Temporal Functions
133134

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,64 @@ public static boolean notLike(String str, String regex) {
361361
}
362362

363363
public static String substr(String str, int beginIndex) {
364-
return str.substring(beginIndex);
364+
return substring(str, beginIndex);
365365
}
366366

367367
public static String substr(String str, int beginIndex, int length) {
368-
return str.substring(beginIndex, beginIndex + length);
368+
return substring(str, beginIndex, length);
369+
}
370+
371+
public static String substring(String str, int beginIndex) {
372+
return substring(str, beginIndex, Integer.MAX_VALUE);
373+
}
374+
375+
public static String substring(String str, int beginIndex, int length) {
376+
if (length < 0) {
377+
LOG.error(
378+
"length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = {}",
379+
length);
380+
throw new RuntimeException(
381+
"length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = "
382+
+ length);
383+
}
384+
if (length > Integer.MAX_VALUE || beginIndex > Integer.MAX_VALUE) {
385+
LOG.error(
386+
"length or start of 'substring(str, beginIndex, length)' must be Int type, but length = {}, beginIndex = {}",
387+
beginIndex,
388+
length);
389+
throw new RuntimeException(
390+
"length or start of 'substring(str, beginIndex, length)' must be Int type, but length = "
391+
+ beginIndex
392+
+ ", beginIndex = "
393+
+ length);
394+
}
395+
if (str.isEmpty()) {
396+
return "";
397+
}
398+
399+
int startPos;
400+
int endPos;
401+
402+
if (beginIndex > 0) {
403+
startPos = beginIndex - 1;
404+
if (startPos >= str.length()) {
405+
return "";
406+
}
407+
} else if (beginIndex < 0) {
408+
startPos = str.length() + beginIndex;
409+
if (startPos < 0) {
410+
return "";
411+
}
412+
} else {
413+
startPos = 0;
414+
}
415+
416+
if ((str.length() - startPos) < length) {
417+
endPos = str.length();
418+
} else {
419+
endPos = startPos + length;
420+
}
421+
return str.substring(startPos, endPos);
369422
}
370423

371424
public static String upper(String str) {

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public void lookupOperatorOverloads(
191191
SqlTypeFamily.INTEGER,
192192
SqlTypeFamily.INTEGER)),
193193
SqlFunctionCategory.STRING);
194+
public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING;
194195

195196
// ------------------
196197
// Temporal Functions

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1499,7 +1499,24 @@ void testBuildInFunctionTransform() throws Exception {
14991499
testExpressionConditionTransform("concat('123', 'abc') = '123abc'");
15001500
testExpressionConditionTransform("upper('abc') = 'ABC'");
15011501
testExpressionConditionTransform("lower('ABC') = 'abc'");
1502-
testExpressionConditionTransform("SUBSTR('ABC', 1, 1) = 'B'");
1502+
testExpressionConditionTransform("SUBSTR('ABC', -1) = 'C'");
1503+
testExpressionConditionTransform("SUBSTR('ABC', -2, 2) = 'BC'");
1504+
testExpressionConditionTransform("SUBSTR('ABC', 0) = 'ABC'");
1505+
testExpressionConditionTransform("SUBSTR('ABC', 1) = 'ABC'");
1506+
testExpressionConditionTransform("SUBSTR('ABC', 2, 2) = 'BC'");
1507+
testExpressionConditionTransform("SUBSTR('ABC', 2, 100) = 'BC'");
1508+
testExpressionConditionTransform("SUBSTRING('ABC', -1) = 'C'");
1509+
testExpressionConditionTransform("SUBSTRING('ABC', -2, 2) = 'BC'");
1510+
testExpressionConditionTransform("SUBSTRING('ABC', 0) = 'ABC'");
1511+
testExpressionConditionTransform("SUBSTRING('ABC', 1) = 'ABC'");
1512+
testExpressionConditionTransform("SUBSTRING('ABC', 2, 2) = 'BC'");
1513+
testExpressionConditionTransform("SUBSTRING('ABC', 2, 100) = 'BC'");
1514+
testExpressionConditionTransform("SUBSTRING('ABC' FROM -1) = 'C'");
1515+
testExpressionConditionTransform("SUBSTRING('ABC' FROM -2 FOR 2) = 'BC'");
1516+
testExpressionConditionTransform("SUBSTRING('ABC' FROM 0) = 'ABC'");
1517+
testExpressionConditionTransform("SUBSTRING('ABC' FROM 1) = 'ABC'");
1518+
testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 2) = 'BC'");
1519+
testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 100) = 'BC'");
15031520
testExpressionConditionTransform("'ABC' like '^[a-zA-Z]'");
15041521
testExpressionConditionTransform("'123' not like '^[a-zA-Z]'");
15051522
testExpressionConditionTransform("abs(2) = 2");

0 commit comments

Comments
 (0)