diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 729074d508e75..ac7deaa4b35ee 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -209,9 +209,6 @@ public static Set pluginLocations(String pluginPath, boolean failFast) { for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); - if (pluginPath.isEmpty()) { - log.warn("Plugin path element is empty, evaluating to {}.", pluginPathElement); - } if (!Files.exists(pluginPathElement)) { throw new FileNotFoundException(pluginPathElement.toString()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index a3da1d3a9c6b5..73987b6746c32 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -52,6 +53,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +70,8 @@ public class ConnectPluginPath { }; public static final String NO_ALIAS = "N/A"; + private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*"); + public static void main(String[] args) { Exit.exit(mainNoExit(args, System.out, System.err)); } @@ -82,7 +86,7 @@ public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { } catch (ArgumentParserException e) { parser.handleError(e); return 1; - } catch (TerseException e) { + } catch (TerseException | ConfigException e) { err.println(e.getMessage()); return 2; } catch (Throwable e) { @@ -162,6 +166,9 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) { throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser); } + for (String pluginPath : rawPluginPaths) { + validatePluginPath(pluginPath, "--plugin-path"); + } Set pluginLocations = new LinkedHashSet<>(); for (String rawWorkerConfig : rawWorkerConfigs) { Properties properties; @@ -172,6 +179,7 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa } String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG); if (pluginPath != null) { + validatePluginPath(pluginPath, WorkerConfig.PLUGIN_PATH_CONFIG); rawPluginPaths.add(pluginPath); } } @@ -192,6 +200,21 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa return pluginLocations; } + private static void validatePluginPath(String pluginPath, String configName) throws ConfigException { + String trimmed = pluginPath.trim(); + if (trimmed.isEmpty()) { + throw new ConfigException("'" + configName + "' must not be empty."); + } + + String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(trimmed, -1); + + for (String path : pluginPathElements) { + if (path.isEmpty()) { + throw new ConfigException("'" + configName + "' values must not be empty."); + } + } + } + enum Command { LIST, SYNC_MANIFESTS } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java index d10cf7b2e4595..aff3f734ba748 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -333,6 +333,63 @@ public void testSyncManifestsKeepNotFound(PluginLocationType type) { assertBadPackagingPluginsStatus(table, false); } + @Test + public void testListEmptyPluginPathArg() { + CommandResult res = runCommand( + "list", + "--plugin-path", + "" + ); + assertNotEquals(0, res.returnCode); + assertEquals("'--plugin-path' must not be empty.\n", res.err); + } + + @Test + public void testListEmptyPluginPathElementArg() { + CommandResult res = runCommand( + "list", + "--plugin-path", + "location-a,,location-b" + ); + assertNotEquals(0, res.returnCode); + assertEquals("'--plugin-path' values must not be empty.\n", res.err); + } + + @Test + public void testListEmptyPluginPathInWorkerConfig() { + Path configPath = workspace.resolve("worker-empty.properties"); + try { + Files.writeString(configPath, "plugin.path=", StandardCharsets.UTF_8); + } catch (IOException e) { + fail("Failed to create test worker config: " + e.getMessage()); + } + + CommandResult res = runCommand( + "list", + "--worker-config", + configPath.toString() + ); + assertNotEquals(0, res.returnCode); + assertEquals("'plugin.path' must not be empty.\n", res.err); + } + + @Test + public void testListEmptyPluginPathElementInWorkerConfig() { + Path configPath = workspace.resolve("worker-empty-element.properties"); + try { + Files.writeString(configPath, "plugin.path=location-a,,location-b", StandardCharsets.UTF_8); + } catch (IOException e) { + fail("Failed to create test worker config: " + e.getMessage()); + } + + CommandResult res = runCommand( + "list", + "--worker-config", + configPath.toString() + ); + assertNotEquals(0, res.returnCode); + assertEquals("'plugin.path' values must not be empty.\n", res.err); + } private static Map> assertListSuccess(CommandResult result) { assertEquals(0, result.returnCode);