diff --git a/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java b/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java index d860c6e30fa43..2784291f508c5 100644 --- a/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java +++ b/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java @@ -27,6 +27,8 @@ import java.util.Locale; import java.util.Objects; import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.IntStream; import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES; @@ -287,6 +289,16 @@ public static long parseBytes(String text) throws IllegalArgumentException { throw new NumberFormatException("text does not start with a number"); } + String unitsRegex = MemoryUnit.getRegularExpressionForAllUnits(); + Pattern regexPattern = Pattern.compile("^(\\d+)\\s*" + unitsRegex + "?$"); + Matcher matcher = regexPattern.matcher(trimmed.toLowerCase(Locale.US)); + + if (!matcher.matches()) { + throw new IllegalArgumentException( + "Memory size must be an integer value optionally followed by a unit. Found: " + + text); + } + final long value; try { value = Long.parseLong(number); // this throws a NumberFormatException on overflow @@ -388,6 +400,21 @@ public static String getAllUnits() { TERA_BYTES.getUnits()); } + public static String getRegularExpressionForAllUnits() { + String delimiter = "|"; + String units = + String.join( + delimiter, + new String[] { + String.join(delimiter, BYTES.getUnits()), + String.join(delimiter, KILO_BYTES.getUnits()), + String.join(delimiter, MEGA_BYTES.getUnits()), + String.join(delimiter, GIGA_BYTES.getUnits()), + String.join(delimiter, TERA_BYTES.getUnits()), + }); + return "(" + units + ")"; + } + public static boolean hasUnit(String text) { Objects.requireNonNull(text, "text cannot be null"); diff --git a/flink-core-api/src/test/java/org/apache/flink/configuration/MemorySizeTest.java b/flink-core-api/src/test/java/org/apache/flink/configuration/MemorySizeTest.java index a619f9f00470d..2d2c58e254034 100644 --- a/flink-core-api/src/test/java/org/apache/flink/configuration/MemorySizeTest.java +++ b/flink-core-api/src/test/java/org/apache/flink/configuration/MemorySizeTest.java @@ -179,6 +179,10 @@ void testParseInvalid() { // negative number assertThatThrownBy(() -> MemorySize.parseBytes("-100 bytes")) .isInstanceOf(IllegalArgumentException.class); + + // fractional number + assertThatThrownBy(() -> MemorySize.parseBytes("1.5g")) + .isInstanceOf(IllegalArgumentException.class); } @Test