Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class AbstractCommandBuilder {
String master;
String remote;
protected String propertiesFile;
protected boolean loadSparkDefaults;
final List<String> appArgs;
final List<String> jars;
final List<String> files;
Expand Down Expand Up @@ -362,21 +363,35 @@ Map<String, String> getEffectiveConfig() throws IOException {
}

/**
* Loads the configuration file for the application, if it exists. This is either the
* user-specified properties file, or the spark-defaults.conf file under the Spark configuration
* directory.
* Load the configuration file(s) for the application - from the user-specified properties
* file, and/or the spark-defaults.conf file under the Spark configuration directory, if exists.
* Configurations from user-specified properties file take precedence over spark-defaults.conf.
*/
private Properties loadPropertiesFile() throws IOException {
Properties props = new Properties();
File propsFile;
if (propertiesFile != null) {
propsFile = new File(propertiesFile);
File propsFile = new File(propertiesFile);
checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
} else {
propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
props = loadPropertiesFile(propsFile);
}

if (propsFile.isFile()) {
Properties defaultsProps = new Properties();
if (propertiesFile == null || loadSparkDefaults) {
defaultsProps = loadPropertiesFile(new File(getConfDir(), DEFAULT_PROPERTIES_FILE));
}

for (Map.Entry<Object, Object> entry : defaultsProps.entrySet()) {
if (!props.containsKey(entry.getKey())) {
props.put(entry.getKey(), entry.getValue());
}
}

return props;
}

private Properties loadPropertiesFile(File propsFile) throws IOException {
Properties props = new Properties();
if (propsFile != null && propsFile.isFile()) {
try (InputStreamReader isr = new InputStreamReader(
new FileInputStream(propsFile), StandardCharsets.UTF_8)) {
props.load(isr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ List<String> buildSparkSubmitArgs(boolean includeRemote) {
args.add(propertiesFile);
}

if (loadSparkDefaults) {
args.add(parser.LOAD_SPARK_DEFAULTS);
}

if (isExample) {
jars.addAll(findExamplesJars());
}
Expand Down Expand Up @@ -550,6 +554,7 @@ protected boolean handle(String opt, String value) {
}
case DEPLOY_MODE -> deployMode = value;
case PROPERTIES_FILE -> propertiesFile = value;
case LOAD_SPARK_DEFAULTS -> loadSparkDefaults = true;
case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value);
case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,91 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {

private static File dummyPropsFile;
private static File connectPropsFile;
private static File driverMemPropsFile;
private static SparkSubmitOptionParser parser;

@BeforeAll
public static void setUp() throws Exception {
dummyPropsFile = File.createTempFile("spark", "properties");
connectPropsFile = File.createTempFile("spark", "properties");
Files.writeString(connectPropsFile.toPath(), "spark.remote=sc://connect-server:15002");
driverMemPropsFile = File.createTempFile("spark", "properties");
Files.writeString(driverMemPropsFile.toPath(),
"spark.driver.memory=4g\nspark.driver.memoryOverhead=768m");
parser = new SparkSubmitOptionParser();
}

@AfterAll
public static void cleanUp() throws Exception {
dummyPropsFile.delete();
connectPropsFile.delete();
driverMemPropsFile.delete();
}

@Test
public void testGetEffectiveConfig() throws Exception {
doTestGetEffectiveConfig(null, true, true);
doTestGetEffectiveConfig(null, true, false);
doTestGetEffectiveConfig(null, false, true);
doTestGetEffectiveConfig(null, false, false);
doTestGetEffectiveConfig(driverMemPropsFile, true, true);
doTestGetEffectiveConfig(driverMemPropsFile, true, false);
doTestGetEffectiveConfig(driverMemPropsFile, false, true);
doTestGetEffectiveConfig(driverMemPropsFile, false, false);
}

private void doTestGetEffectiveConfig(
File propertiesFile, boolean loadSparkDefaults, boolean confDriverMemory) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test use loadSparkDefaults to conditionally check something? Seems no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used at line 118

SparkSubmitCommandBuilder launcher =
newCommandBuilder(Collections.emptyList());
launcher.loadSparkDefaults = loadSparkDefaults;
launcher.conf.put("spark.foo", "bar");
launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home")
+ "/launcher/src/test/resources");

if (propertiesFile != null) {
launcher.setPropertiesFile(propertiesFile.getAbsolutePath());
}

if (confDriverMemory) {
launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "2g");
}

Map<String, String> effectiveConfig = launcher.getEffectiveConfig();

assertEquals("bar", effectiveConfig.get("spark.foo"));
if (confDriverMemory) {
assertEquals("2g", effectiveConfig.get(SparkLauncher.DRIVER_MEMORY));
} else if (propertiesFile != null) {
try (FileReader reader = new FileReader(propertiesFile, StandardCharsets.UTF_8)) {
Properties props = new Properties();
props.load(reader);
if (props.containsKey(SparkLauncher.DRIVER_MEMORY)) {
assertEquals(props.getProperty(SparkLauncher.DRIVER_MEMORY),
effectiveConfig.get(SparkLauncher.DRIVER_MEMORY));
}
}
} else {
assertEquals("1g", effectiveConfig.get(SparkLauncher.DRIVER_MEMORY));
}

if (propertiesFile != null) {
try (FileReader reader = new FileReader(propertiesFile, StandardCharsets.UTF_8)) {
Properties props = new Properties();
props.load(reader);
if (props.containsKey("spark.driver.memoryOverhead")) {
assertEquals(props.getProperty("spark.driver.memoryOverhead"),
effectiveConfig.get("spark.driver.memoryOverhead"));
}
}
if (loadSparkDefaults) {
assertEquals("/driver", effectiveConfig.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the default props contain DRIVER_EXTRA_CLASSPATH?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
assertFalse(effectiveConfig.containsKey(SparkLauncher.DRIVER_EXTRA_CLASSPATH));
}
} else {
assertEquals("/driver", effectiveConfig.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require loadSparkDefaults to be true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only present at spark-defaults.conf

}
}

@Test
Expand Down