Skip to content

Commit a8d90b1

Browse files
authored
Add support for Flink Beam templates (#163)
* Add new FlinkBeam type and regenerate k8s models * Add new template option to avoid matching if a specific field is present * Add field map logic for trivial expressions + lazy evaluation of template functions * Add template equality checks * Add beam template, some testing, and some fixes * Add fieldMap support for nested field references and add more thorough testing * Hide magic strings
1 parent af27482 commit a8d90b1

File tree

54 files changed

+1530
-116
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1530
-116
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ deploy-flink: deploy
5656
kubectl apply -f ./deploy/dev/flink-session-cluster.yaml
5757
kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml
5858
kubectl apply -f ./deploy/samples/flink-template.yaml
59+
kubectl apply -f ./deploy/samples/flink-beam-template.yaml
5960

6061
undeploy-flink:
6162
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"

deploy/config/hoptimator-configmap.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ data:
1313
game.properties: |
1414
enemy.types=aliens,monsters
1515
player.maximum-lives=5
16+
1617
user-interface.properties: |
1718
color.good=purple
1819
color.bad=yellow
19-
allow.textmode=true
20+
allow.textmode=true
21+
22+
flink.config: |
23+
flink.app.name=hoptimator-flink-runner
24+
flink.app.type=SQL
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
## This template adds Flink Beam support.
2+
3+
apiVersion: hoptimator.linkedin.com/v1alpha1
4+
kind: JobTemplate
5+
metadata:
6+
name: flink-beam-sqljob-template
7+
spec:
8+
yaml: |
9+
apiVersion: hoptimator.linkedin.com/v1alpha1
10+
kind: SqlJob
11+
metadata:
12+
name: {{name}}
13+
spec:
14+
dialect: FlinkBeam
15+
executionMode: Streaming
16+
sql:
17+
- PLACEHOLDER
18+
configs:
19+
{{flink.app.type==BEAM}}
20+
source.group.id: hoptimator-flink-beam-{{pipeline}}
21+
source.schemas: {{sourceSchemas}}
22+
source.tables: {{sourceTables}}
23+
sink.schema: {{schema}}
24+
sink.table: {{table}}
25+
fields: {{fieldMap}}
26+
{{flinkconfigs}}

deploy/samples/flink-template.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ spec:
1616
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1717
args:
1818
- {{flinksql}}
19-
jarURI: file:///opt/hoptimator-flink-runner.jar
19+
jarURI: file:///opt/{{flink.app.name}}.jar
2020
parallelism: {{flink.parallelism:1}}
2121
upgradeMode: stateless
2222
state: running
23+
{{flink.app.type==SQL}}

hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,90 @@
11
package com.linkedin.hoptimator;
22

3-
import java.util.function.Function;
3+
import java.util.Map;
4+
import java.util.Set;
45

56

7+
/**
8+
* Represents a data pipeline job with lazy-evaluated template functions.
9+
*
10+
* <p>A Job encapsulates the configuration and execution logic for a data pipeline,
11+
* including the destination sink and a collection of lazy-evaluated functions that
12+
* generate SQL scripts, field mappings, and other template values on demand.
13+
*/
614
public class Job implements Deployable {
715

816
private final String name;
17+
private final Set<Source> sources;
918
private final Sink sink;
10-
private final Function<SqlDialect, String> sql;
1119

12-
public Job(String name, Sink sink, Function<SqlDialect, String> sql) {
20+
/**
21+
* Lazy-evaluated template functions that generate various outputs for the job.
22+
*
23+
* <p>This map contains functions that are evaluated on-demand when specific
24+
* template values are needed. Each function takes a {@link SqlDialect} parameter
25+
* and returns a string representation of the requested output.
26+
*
27+
* @see ThrowingFunction
28+
* @see SqlDialect
29+
*/
30+
private final Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals;
31+
32+
public Job(String name, Set<Source> sources, Sink sink, Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals) {
1333
this.name = name;
34+
this.sources = sources;
1435
this.sink = sink;
15-
this.sql = sql;
36+
this.lazyEvals = lazyEvals;
1637
}
1738

1839
public String name() {
1940
return name;
2041
}
2142

43+
public Set<Source> sources() {
44+
return sources;
45+
}
46+
2247
public Sink sink() {
2348
return sink;
2449
}
2550

26-
public Function<SqlDialect, String> sql() {
27-
return sql;
51+
public ThrowingFunction<SqlDialect, String> sql() {
52+
return eval("sql");
53+
}
54+
55+
public ThrowingFunction<SqlDialect, String> query() {
56+
return eval("query");
57+
}
58+
59+
public ThrowingFunction<SqlDialect, String> fieldMap() {
60+
return eval("fieldMap");
61+
}
62+
63+
/**
64+
* Retrieves a lazy-evaluated template function by key.
65+
*
66+
* <p>This method provides access to the template functions stored in {@link #lazyEvals}.
67+
* The returned function can be called with a {@link SqlDialect} to generate the
68+
* corresponding output string.
69+
*
70+
* <p><strong>Available Keys:</strong>
71+
* <ul>
72+
* <li><code>"sql"</code> - Complete SQL script with INSERT INTO statements</li>
73+
* <li><code>"query"</code> - SELECT query portion only</li>
74+
* <li><code>"fieldMap"</code> - JSON mapping of source to destination fields</li>
75+
* </ul>
76+
*
77+
* @param key The template function key to retrieve
78+
* @return The lazy-evaluated function, or {@code null} if the key doesn't exist
79+
*
80+
* @see #lazyEvals
81+
* @see ThrowingFunction#apply(Object)
82+
*/
83+
private ThrowingFunction<SqlDialect, String> eval(String key) {
84+
if (!lazyEvals.containsKey(key)) {
85+
throw new IllegalArgumentException("Unknown eval key: " + key + ". Available keys: " + lazyEvals.keySet());
86+
}
87+
return lazyEvals.get(key);
2888
}
2989

3090
@Override

hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package com.linkedin.hoptimator;
22

33
import java.util.List;
4-
import java.util.function.Function;
54

65

76
public class MaterializedView extends View implements Deployable {
87

98
private final String database;
10-
private final Function<SqlDialect, String> pipelineSql;
9+
private final ThrowingFunction<SqlDialect, String> pipelineSql;
1110
private final Pipeline pipeline;
1211

13-
public MaterializedView(String database, List<String> path, String viewSql, Function<SqlDialect, String> pipelineSql,
12+
public MaterializedView(String database, List<String> path, String viewSql, ThrowingFunction<SqlDialect, String> pipelineSql,
1413
Pipeline pipeline) {
1514
super(path, viewSql);
1615
this.database = database;
@@ -22,7 +21,7 @@ public Pipeline pipeline() {
2221
return pipeline;
2322
}
2423

25-
public Function<SqlDialect, String> pipelineSql() {
24+
public ThrowingFunction<SqlDialect, String> pipelineSql() {
2625
return pipelineSql;
2726
}
2827

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.linkedin.hoptimator;
2+
3+
import java.sql.SQLException;
4+
5+
6+
/**
7+
* Functional interface that allows functions to throw SQLException.
8+
*/
9+
@FunctionalInterface
10+
public interface ThrowingFunction<T, R> {
11+
R apply(T t) throws SQLException;
12+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.linkedin.hoptimator;
2+
3+
import java.sql.SQLException;
4+
5+
6+
/**
7+
* Functional interface that allows suppliers to throw SQLException.
8+
*/
9+
@FunctionalInterface
10+
public interface ThrowingSupplier<T> {
11+
T get() throws SQLException;
12+
}

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ public void execute(String line, DispatchCallback dispatchCallback) {
127127
RelOptTable table = root.rel.getTable();
128128
if (table != null) {
129129
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
130+
} else if (create != null) {
131+
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, create.name.toString());
130132
}
131133
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
132134
if (create != null) {
@@ -301,6 +303,8 @@ public void execute(String line, DispatchCallback dispatchCallback) {
301303
RelOptTable table = root.rel.getTable();
302304
if (table != null) {
303305
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
306+
} else if (create != null) {
307+
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, create.name.toString());
304308
}
305309
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
306310
if (create != null) {

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Collections;
1818
import java.util.List;
1919
import java.util.Objects;
20+
import java.util.Properties;
2021
import java.util.stream.Collectors;
2122

2223
import org.apache.calcite.rel.RelRoot;
@@ -90,7 +91,10 @@ public void execute(Context context, boolean execute) throws Exception {
9091
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
9192
String[] parts = line.split(" ", 2);
9293
String pipelineName = parts.length == 2 ? parts[1] : "test";
93-
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), conn.connectionProperties())
94+
Properties properties = new Properties();
95+
properties.putAll(conn.connectionProperties());
96+
properties.put(DeploymentService.PIPELINE_OPTION, pipelineName);
97+
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties)
9498
.pipeline(pipelineName, conn);
9599
List<String> specs = new ArrayList<>();
96100
for (Source source : pipeline.sources()) {

0 commit comments

Comments
 (0)