From 2fda65147e20066ce2b7e31e32214b0a278cb6bb Mon Sep 17 00:00:00 2001 From: majialong Date: Sat, 4 Oct 2025 23:48:03 +0800 Subject: [PATCH 1/3] MINOR: Add plugin.path config non-empty check in connect-plugin-path.sh --- .../apache/kafka/tools/ConnectPluginPath.java | 22 +++++++ .../kafka/tools/ConnectPluginPathTest.java | 57 +++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index a3da1d3a9c6b5..db352e7e4c235 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -52,6 +52,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +69,8 @@ public class ConnectPluginPath { }; public static final String NO_ALIAS = "N/A"; + private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*"); + public static void main(String[] args) { Exit.exit(mainNoExit(args, System.out, System.err)); } @@ -162,6 +165,9 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) { throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser); } + for (String pluginPath : rawPluginPaths) { + validatePluginPath(pluginPath, "--plugin-path"); + } Set pluginLocations = new LinkedHashSet<>(); for (String rawWorkerConfig : rawWorkerConfigs) { Properties properties; @@ -172,6 +178,7 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa } String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG); if (pluginPath != null) { + validatePluginPath(pluginPath, WorkerConfig.PLUGIN_PATH_CONFIG); rawPluginPaths.add(pluginPath); } } @@ -192,6 +199,21 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa return pluginLocations; } + private static void validatePluginPath(String pluginPath, String configName) throws TerseException { + String trimmed = pluginPath.trim(); + if (trimmed.isEmpty()) { + throw new TerseException("'" + configName + "' must not be empty."); + } + + String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(trimmed, -1); + + for (String path : pluginPathElements) { + if (path.isEmpty()) { + throw new TerseException("'" + configName + "' values must not be empty."); + } + } + } + enum Command { LIST, SYNC_MANIFESTS } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java index d10cf7b2e4595..192161cc90377 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -333,6 +333,63 @@ public void testSyncManifestsKeepNotFound(PluginLocationType type) { assertBadPackagingPluginsStatus(table, false); } + @Test + public void testListEmptyPluginPathArg() { + CommandResult res = runCommand( + "list", + "--plugin-path", + "" + ); + assertNotEquals(0, res.returnCode); + assertTrue("'--plugin-path' must not be empty.\n".equals(res.err)); + } + + @Test + public void testListEmptyPluginPathElementArg() { + CommandResult res = runCommand( + "list", + "--plugin-path", + "location-a,,location-b" + ); + assertNotEquals(0, res.returnCode); + assertTrue("'--plugin-path' values must not be empty.\n".equals(res.err)); + } + + @Test + public void testListEmptyPluginPathInWorkerConfig() { + Path configPath = workspace.resolve("worker-empty.properties"); + try { + Files.writeString(configPath, "plugin.path=", StandardCharsets.UTF_8); + } catch (IOException e) { + fail("Failed to create test worker config: " + e.getMessage()); + } + + CommandResult res = runCommand( + "list", + "--worker-config", + configPath.toString() + ); + assertNotEquals(0, res.returnCode); + assertTrue("'plugin.path' must not be empty.\n".equals(res.err)); + } + + @Test + public void testListEmptyPluginPathElementInWorkerConfig() { + Path configPath = workspace.resolve("worker-empty-element.properties"); + try { + Files.writeString(configPath, "plugin.path=location-a,,location-b", StandardCharsets.UTF_8); + } catch (IOException e) { + fail("Failed to create test worker config: " + e.getMessage()); + } + + CommandResult res = runCommand( + "list", + "--worker-config", + configPath.toString() + ); + assertNotEquals(0, res.returnCode); + assertTrue("'plugin.path' values must not be empty.\n".equals(res.err)); + } private static Map> assertListSuccess(CommandResult result) { assertEquals(0, result.returnCode); From 2210830a8bdee7a5b909e64a722a4862847fc9ef Mon Sep 17 00:00:00 2001 From: majialong Date: Sun, 5 Oct 2025 00:45:58 +0800 Subject: [PATCH 2/3] Remove plugin.path empty warning log in PluginUtils --- .../apache/kafka/connect/runtime/isolation/PluginUtils.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 729074d508e75..ac7deaa4b35ee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -209,9 +209,6 @@ public static Set pluginLocations(String pluginPath, boolean failFast) { for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); - if (pluginPath.isEmpty()) { - log.warn("Plugin path element is empty, evaluating to {}.", pluginPathElement); - } if (!Files.exists(pluginPathElement)) { throw new FileNotFoundException(pluginPathElement.toString()); } From f22501b7bda1d9df7bfbe9db1ae7a75ff42dd02c Mon Sep 17 00:00:00 2001 From: majialong Date: Sun, 5 Oct 2025 11:59:18 +0800 Subject: [PATCH 3/3] Throws ConfigException when validation fails --- .../java/org/apache/kafka/tools/ConnectPluginPath.java | 9 +++++---- .../org/apache/kafka/tools/ConnectPluginPathTest.java | 8 ++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index db352e7e4c235..73987b6746c32 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -85,7 +86,7 @@ public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { } catch (ArgumentParserException e) { parser.handleError(e); return 1; - } catch (TerseException e) { + } catch (TerseException | ConfigException e) { err.println(e.getMessage()); return 2; } catch (Throwable e) { @@ -199,17 +200,17 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa return pluginLocations; } - private static void validatePluginPath(String pluginPath, String configName) throws TerseException { + private static void validatePluginPath(String pluginPath, String configName) throws ConfigException { String trimmed = pluginPath.trim(); if (trimmed.isEmpty()) { - throw new TerseException("'" + configName + "' must not be empty."); + throw new ConfigException("'" + configName + "' must not be empty."); } String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(trimmed, -1); for (String path : pluginPathElements) { if (path.isEmpty()) { - throw new TerseException("'" + configName + "' values must not be empty."); + throw new ConfigException("'" + configName + "' values must not be empty."); } } } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java index 192161cc90377..aff3f734ba748 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -341,7 +341,7 @@ public void testListEmptyPluginPathArg() { "" ); assertNotEquals(0, res.returnCode); - assertTrue("'--plugin-path' must not be empty.\n".equals(res.err)); + assertEquals("'--plugin-path' must not be empty.\n", res.err); } @Test @@ -352,7 +352,7 @@ public void testListEmptyPluginPathElementArg() { "location-a,,location-b" ); assertNotEquals(0, res.returnCode); - assertTrue("'--plugin-path' values must not be empty.\n".equals(res.err)); + assertEquals("'--plugin-path' values must not be empty.\n", res.err); } @Test @@ -370,7 +370,7 @@ public void testListEmptyPluginPathInWorkerConfig() { configPath.toString() ); assertNotEquals(0, res.returnCode); - assertTrue("'plugin.path' must not be empty.\n".equals(res.err)); + assertEquals("'plugin.path' must not be empty.\n", res.err); } @Test @@ -388,7 +388,7 @@ public void testListEmptyPluginPathElementInWorkerConfig() { configPath.toString() ); assertNotEquals(0, res.returnCode); - assertTrue("'plugin.path' values must not be empty.\n".equals(res.err)); + assertEquals("'plugin.path' values must not be empty.\n", res.err); } private static Map> assertListSuccess(CommandResult result) {