Skip to content

Commit f846e1d

Browse files
committed
[FLINK-37266][python] Fix the issue that Python dependencies options doesn't work for PythonDriver
This closes #27176.
1 parent bc1563d commit f846e1d

File tree

5 files changed

+88
-3
lines changed

5 files changed

+88
-3
lines changed

flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public static void main(String[] args) throws Throwable {
7575
.getConfiguration()
7676
.toMap());
7777

78+
config.addAll(pythonDriverOptions.getPythonDependencyConfig());
79+
7880
// start gateway server
7981
GatewayServer gatewayServer = PythonEnvUtils.startGatewayServer();
8082
PythonEnvUtils.setGatewayServer(gatewayServer);

flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.client.python;
2020

21+
import org.apache.flink.configuration.Configuration;
22+
2123
import javax.annotation.Nonnull;
2224
import javax.annotation.Nullable;
2325

@@ -29,12 +31,19 @@
2931
/** Options for the {@link PythonDriver}. */
3032
final class PythonDriverOptions {
3133

34+
private final Configuration pythonDependencyConfig;
35+
3236
@Nullable private final String entryPointModule;
3337

3438
@Nullable private final String entryPointScript;
3539

3640
@Nonnull private final List<String> programArgs;
3741

42+
@Nonnull
43+
Configuration getPythonDependencyConfig() {
44+
return pythonDependencyConfig;
45+
}
46+
3847
@Nullable
3948
String getEntryPointModule() {
4049
return entryPointModule;
@@ -50,9 +59,11 @@ List<String> getProgramArgs() {
5059
}
5160

5261
PythonDriverOptions(
62+
@Nonnull Configuration pythonDependencyConfig,
5363
@Nullable String entryPointModule,
5464
@Nullable String entryPointScript,
5565
List<String> programArgs) {
66+
this.pythonDependencyConfig = requireNonNull(pythonDependencyConfig);
5667
this.entryPointModule = entryPointModule;
5768
this.entryPointScript = entryPointScript;
5869
this.programArgs = requireNonNull(programArgs, "programArgs");

flink-python/src/main/java/org/apache/flink/client/python/PythonDriverOptionsParserFactory.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.client.python;
2020

21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.python.util.PythonDependencyUtils;
2123
import org.apache.flink.runtime.entrypoint.FlinkParseException;
2224
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
2325

@@ -26,7 +28,13 @@
2628

2729
import javax.annotation.Nonnull;
2830

31+
import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
32+
import static org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
33+
import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
34+
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
2935
import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
36+
import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
37+
import static org.apache.flink.client.cli.CliFrontendParser.PYTHON_PATH;
3038
import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
3139

3240
/**
@@ -40,6 +48,12 @@ public Options getOptions() {
4048
final Options options = new Options();
4149
options.addOption(PY_OPTION);
4250
options.addOption(PYMODULE_OPTION);
51+
options.addOption(PYFILES_OPTION);
52+
options.addOption(PYREQUIREMENTS_OPTION);
53+
options.addOption(PYARCHIVE_OPTION);
54+
options.addOption(PYEXEC_OPTION);
55+
options.addOption(PYCLIENTEXEC_OPTION);
56+
options.addOption(PYTHON_PATH);
4357
return options;
4458
}
4559

@@ -67,7 +81,13 @@ public PythonDriverOptions createResult(@Nonnull CommandLine commandLine)
6781
"The Python entry point has not been specified. It can be specified with options -py or -pym");
6882
}
6983

84+
Configuration pythonDependencyConfig =
85+
PythonDependencyUtils.parsePythonDependencyConfiguration(commandLine);
86+
7087
return new PythonDriverOptions(
71-
entryPointModule, entryPointScript, commandLine.getArgList());
88+
pythonDependencyConfig,
89+
entryPointModule,
90+
entryPointScript,
91+
commandLine.getArgList());
7292
}
7393
}

flink-python/src/test/java/org/apache/flink/client/python/PythonDriverOptionsParserFactoryTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,23 @@
1818

1919
package org.apache.flink.client.python;
2020

21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.runtime.entrypoint.FlinkParseException;
2223
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
2324

2425
import org.junit.jupiter.api.Test;
2526

2627
import java.util.List;
2728

29+
import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES;
30+
import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
31+
import static org.apache.flink.python.PythonOptions.PYTHON_EXECUTABLE;
32+
import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
33+
import static org.apache.flink.python.PythonOptions.PYTHON_PATH;
34+
import static org.apache.flink.python.PythonOptions.PYTHON_REQUIREMENTS;
2835
import static org.assertj.core.api.Assertions.assertThat;
2936
import static org.assertj.core.api.Assertions.assertThatThrownBy;
37+
import static org.junit.jupiter.api.Assertions.assertTrue;
3038

3139
/** Tests for the {@link PythonDriverOptionsParserFactory}. */
3240
class PythonDriverOptionsParserFactoryTest {
@@ -66,6 +74,46 @@ void testEntrypointNotSpecified() {
6674
.isInstanceOf(FlinkParseException.class);
6775
}
6876

77+
@Test
78+
void testPythonDependencies() throws FlinkParseException {
79+
final String[] args = {
80+
"--python",
81+
"xxx.py",
82+
"-pyfs",
83+
"dep1.zip,dep2.zip",
84+
"-pyreq",
85+
"requirements.txt",
86+
"-pyarch",
87+
"venv.zip",
88+
"-pyexec",
89+
"python3.9",
90+
"-pyclientexec",
91+
"python3.9",
92+
"--pyPythonPath",
93+
"/python/lib64/python3.9",
94+
"--input",
95+
"in.txt",
96+
};
97+
98+
final PythonDriverOptions pythonCommandOptions = commandLineParser.parse(args);
99+
100+
assertTrue(pythonCommandOptions.getEntryPointScript().isPresent());
101+
assertThat(pythonCommandOptions.getEntryPointScript().get()).isEqualTo("xxx.py");
102+
103+
// verify the python program arguments
104+
final List<String> programArgs = pythonCommandOptions.getProgramArgs();
105+
assertThat(programArgs).containsExactly("--input", "in.txt");
106+
107+
// verify python dependencies
108+
Configuration configuration = pythonCommandOptions.getPythonDependencyConfig();
109+
assertThat(configuration.get(PYTHON_FILES)).isEqualTo("dep1.zip,dep2.zip");
110+
assertThat(configuration.get(PYTHON_REQUIREMENTS)).isEqualTo("requirements.txt");
111+
assertThat(configuration.get(PYTHON_ARCHIVES)).isEqualTo("venv.zip");
112+
assertThat(configuration.get(PYTHON_EXECUTABLE)).isEqualTo("python3.9");
113+
assertThat(configuration.get(PYTHON_CLIENT_EXECUTABLE)).isEqualTo("python3.9");
114+
assertThat(configuration.get(PYTHON_PATH)).isEqualTo("/python/lib64/python3.9");
115+
}
116+
69117
private void verifyPythonDriverOptionsParsing(final String[] args) throws FlinkParseException {
70118
final PythonDriverOptions pythonCommandOptions = commandLineParser.parse(args);
71119

flink-python/src/test/java/org/apache/flink/client/python/PythonDriverTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.client.python;
2020

21+
import org.apache.flink.configuration.Configuration;
22+
2123
import org.junit.jupiter.api.Test;
2224
import py4j.GatewayServer;
2325

@@ -50,7 +52,8 @@ void testConstructCommandsWithEntryPointModule() {
5052
args.add("--input");
5153
args.add("in.txt");
5254

53-
PythonDriverOptions pythonDriverOptions = new PythonDriverOptions("xxx", null, args);
55+
PythonDriverOptions pythonDriverOptions =
56+
new PythonDriverOptions(new Configuration(), "xxx", null, args);
5457
List<String> commands = PythonDriver.constructPythonCommands(pythonDriverOptions);
5558
// verify the generated commands
5659
assertThat(commands).containsExactly("-u", "-m", "xxx", "--input", "in.txt");
@@ -62,7 +65,8 @@ void testConstructCommandsWithEntryPointScript() {
6265
args.add("--input");
6366
args.add("in.txt");
6467

65-
PythonDriverOptions pythonDriverOptions = new PythonDriverOptions(null, "xxx.py", args);
68+
PythonDriverOptions pythonDriverOptions =
69+
new PythonDriverOptions(new Configuration(), null, "xxx.py", args);
6670
List<String> commands = PythonDriver.constructPythonCommands(pythonDriverOptions);
6771
assertThat(commands).containsExactly("-u", "xxx.py", "--input", "in.txt");
6872
}

0 commit comments

Comments
 (0)