Skip to content

Commit 40651e8

Browse files
committed
Add option to specify flink parallelism
1 parent 07dc74d commit 40651e8

File tree

2 files changed

+9
-0
lines changed

2 files changed

+9
-0
lines changed

mathosphere-core/src/main/java/com/formulasearchengine/mathosphere/mlp/FlinkMlpRelationFinder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public static void run(FlinkMlpCommandConfig config) throws Exception {
5050
.writeAsText(config.getOutputDir(), WriteMode.OVERWRITE);
5151
//int cores = Runtime.getRuntime().availableProcessors();
5252
//env.setParallelism(1); // rounds down
53+
final int parallelism = config.getParallelism();
54+
if (parallelism > 0) {
55+
env.setParallelism(parallelism);
56+
}
5357
env.execute("Relation Finder");
5458
}
5559

mathosphere-core/src/main/java/com/formulasearchengine/mathosphere/mlp/cli/FlinkMlpCommandConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public class FlinkMlpCommandConfig extends BaseConfig implements Serializable {
1515
@Parameter(names = {"-out", "--outputDir"}, description = "path to output directory")
1616
protected String outputdir;
1717

18+
@Parameter(names = {"--treads"}, description = "how many parallel threads should be used")
19+
protected int parallelism = 0;
1820

1921
public FlinkMlpCommandConfig() {
2022
}
@@ -60,4 +62,7 @@ public String getOutputDir() {
6062
return outputdir;
6163
}
6264

65+
public int getParallelism() {
66+
return parallelism;
67+
}
6368
}

0 commit comments

Comments
 (0)