diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index d4d3faf83e3b0..caa2d2b5854e1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -49,6 +49,7 @@ abstract class AbstractCommandBuilder { String master; String remote; protected String propertiesFile; + protected boolean loadSparkDefaults; final List appArgs; final List jars; final List files; @@ -362,21 +363,35 @@ Map getEffectiveConfig() throws IOException { } /** - * Loads the configuration file for the application, if it exists. This is either the - * user-specified properties file, or the spark-defaults.conf file under the Spark configuration - * directory. + * Load the configuration file(s) for the application - from the user-specified properties + * file, and/or the spark-defaults.conf file under the Spark configuration directory, if exists. + * Configurations from user-specified properties file take precedence over spark-defaults.conf. */ private Properties loadPropertiesFile() throws IOException { Properties props = new Properties(); - File propsFile; if (propertiesFile != null) { - propsFile = new File(propertiesFile); + File propsFile = new File(propertiesFile); checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile); - } else { - propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE); + props = loadPropertiesFile(propsFile); } - if (propsFile.isFile()) { + Properties defaultsProps = new Properties(); + if (propertiesFile == null || loadSparkDefaults) { + defaultsProps = loadPropertiesFile(new File(getConfDir(), DEFAULT_PROPERTIES_FILE)); + } + + for (Map.Entry entry : defaultsProps.entrySet()) { + if (!props.containsKey(entry.getKey())) { + props.put(entry.getKey(), entry.getValue()); + } + } + + return props; + } + + private Properties loadPropertiesFile(File propsFile) throws IOException { + Properties props = new Properties(); + if (propsFile != null && propsFile.isFile()) { try (InputStreamReader isr = new InputStreamReader( new FileInputStream(propsFile), StandardCharsets.UTF_8)) { props.load(isr); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index bdbb954dbe087..7b9f90ac7b7a6 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -251,6 +251,10 @@ List buildSparkSubmitArgs(boolean includeRemote) { args.add(propertiesFile); } + if (loadSparkDefaults) { + args.add(parser.LOAD_SPARK_DEFAULTS); + } + if (isExample) { jars.addAll(findExamplesJars()); } @@ -550,6 +554,7 @@ protected boolean handle(String opt, String value) { } case DEPLOY_MODE -> deployMode = value; case PROPERTIES_FILE -> propertiesFile = value; + case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true; case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value); case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index f0a295fb4228e..d1dba85a534f2 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -38,6 +38,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { private static File dummyPropsFile; private static File connectPropsFile; + private static File driverMemPropsFile; private static SparkSubmitOptionParser parser; @BeforeAll @@ -45,6 +46,9 @@ public static void setUp() throws Exception { dummyPropsFile = File.createTempFile("spark", "properties"); connectPropsFile = File.createTempFile("spark", "properties"); Files.writeString(connectPropsFile.toPath(), "spark.remote=sc://connect-server:15002"); + driverMemPropsFile = File.createTempFile("spark", "properties"); + Files.writeString(driverMemPropsFile.toPath(), + "spark.driver.memory=4g\nspark.driver.memoryOverhead=768m"); parser = new SparkSubmitOptionParser(); } @@ -52,6 +56,73 @@ public static void setUp() throws Exception { public static void cleanUp() throws Exception { dummyPropsFile.delete(); connectPropsFile.delete(); + driverMemPropsFile.delete(); + } + + @Test + public void testGetEffectiveConfig() throws Exception { + doTestGetEffectiveConfig(null, true, true); + doTestGetEffectiveConfig(null, true, false); + doTestGetEffectiveConfig(null, false, true); + doTestGetEffectiveConfig(null, false, false); + doTestGetEffectiveConfig(driverMemPropsFile, true, true); + doTestGetEffectiveConfig(driverMemPropsFile, true, false); + doTestGetEffectiveConfig(driverMemPropsFile, false, true); + doTestGetEffectiveConfig(driverMemPropsFile, false, false); + } + + private void doTestGetEffectiveConfig( + File propertiesFile, boolean loadSparkDefaults, boolean confDriverMemory) throws Exception { + SparkSubmitCommandBuilder launcher = + newCommandBuilder(Collections.emptyList()); + launcher.loadSparkDefaults = loadSparkDefaults; + launcher.conf.put("spark.foo", "bar"); + launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home") + + "/launcher/src/test/resources"); + + if (propertiesFile != null) { + launcher.setPropertiesFile(propertiesFile.getAbsolutePath()); + } + + if (confDriverMemory) { + launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "2g"); + } + + Map effectiveConfig = launcher.getEffectiveConfig(); + + assertEquals("bar", effectiveConfig.get("spark.foo")); + if (confDriverMemory) { + assertEquals("2g", effectiveConfig.get(SparkLauncher.DRIVER_MEMORY)); + } else if (propertiesFile != null) { + try (FileReader reader = new FileReader(propertiesFile, StandardCharsets.UTF_8)) { + Properties props = new Properties(); + props.load(reader); + if (props.containsKey(SparkLauncher.DRIVER_MEMORY)) { + assertEquals(props.getProperty(SparkLauncher.DRIVER_MEMORY), + effectiveConfig.get(SparkLauncher.DRIVER_MEMORY)); + } + } + } else { + assertEquals("1g", effectiveConfig.get(SparkLauncher.DRIVER_MEMORY)); + } + + if (propertiesFile != null) { + try (FileReader reader = new FileReader(propertiesFile, StandardCharsets.UTF_8)) { + Properties props = new Properties(); + props.load(reader); + if (props.containsKey("spark.driver.memoryOverhead")) { + assertEquals(props.getProperty("spark.driver.memoryOverhead"), + effectiveConfig.get("spark.driver.memoryOverhead")); + } + } + if (loadSparkDefaults) { + assertEquals("/driver", effectiveConfig.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH)); + } else { + assertFalse(effectiveConfig.containsKey(SparkLauncher.DRIVER_EXTRA_CLASSPATH)); + } + } else { + assertEquals("/driver", effectiveConfig.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH)); + } } @Test