Skip to content

Commit 089fdac

Browse files
authored
Add support for partial views (#92)
1 parent e2b5d2f commit 089fdac

File tree

12 files changed

+40
-16
lines changed

12 files changed

+40
-16
lines changed

hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,20 @@
55

66
public class Job {
77

8+
private final String name;
89
private final Sink sink;
910
private final Function<SqlDialect, String> sql;
1011

11-
public Job(Sink sink, Function<SqlDialect, String> sql) {
12+
public Job(String name, Sink sink, Function<SqlDialect, String> sql) {
13+
this.name = name;
1214
this.sink = sink;
1315
this.sql = sql;
1416
}
1517

18+
public String name() {
19+
return name;
20+
}
21+
1622
public Sink sink() {
1723
return sink;
1824
}

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
157157
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
158158
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
159159
try {
160-
List<String> specs = DeploymentService.plan(root).pipeline().specify();
160+
List<String> specs = DeploymentService.plan(root).pipeline("sink").specify();
161161
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
162162
} catch (SQLException e) {
163163
sqlline.error(e);

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,20 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
164164
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro);
165165
RelDataType rowType = materializedViewTable.getRowType(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));
166166

167+
// Suport "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
168+
// is "foo-bar" and the sink is just FOO.
169+
String sinkName = viewName.split("\\$", 2)[0];
170+
List<String> sinkPath = new ArrayList<>();
171+
sinkPath.addAll(schemaPath);
172+
sinkPath.add(sinkName);
173+
167174
// Plan a pipeline to materialize the view.
168175
RelRoot root = HoptimatorDriver.convert(context, sql).root;
169176
PipelineRel.Implementor plan = DeploymentService.plan(root);
170-
plan.setSink(database, viewPath, rowType, Collections.emptyMap());
171-
Pipeline pipeline = plan.pipeline();
177+
plan.setSink(database, sinkPath, rowType, Collections.emptyMap());
178+
Pipeline pipeline = plan.pipeline(viewName);
172179

173-
MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(), plan.pipeline());
180+
MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(), plan.pipeline(viewName));
174181
// TODO support CREATE ... WITH (options...)
175182
ValidationService.validateOrThrow(hook, MaterializedView.class);
176183
pipeline.update();

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,10 @@ public void execute(Context context, boolean execute) throws Exception {
7474
String sql = context.previousSqlCommand().sql;
7575
CalciteConnection conn = (CalciteConnection) connection;
7676
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
77+
String []parts = line.split(" ", 2);
78+
String pipelineName = parts.length == 2 ? parts[1] : "test";
7779
String specs =
78-
DeploymentService.plan(root).pipeline().specify().stream().sorted()
80+
DeploymentService.plan(root).pipeline(pipelineName).specify().stream().sorted()
7981
.collect(Collectors.joining("---\n"));
8082
String[] lines = specs.replaceAll(";\n", "\n").split("\n");
8183
context.echo(Arrays.asList(lines));

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ class K8sJobDeployer extends K8sYamlDeployer<Job> {
2626
@Override
2727
public List<String> specify(Job job) throws SQLException {
2828
Function<SqlDialect, String> sql = job.sql();
29-
Template.Environment env = new Template.SimpleEnvironment().with("name",
30-
job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT))
29+
String name = K8sUtils.canonicalizeName(job.sink().database(), job.name());
30+
Template.Environment env = new Template.SimpleEnvironment()
31+
.with("name", name)
3132
.with("database", job.sink().database())
3233
.with("schema", job.sink().schema())
3334
.with("table", job.sink().table())

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ class K8sSourceDeployer extends K8sYamlDeployer<Source> {
2323

2424
@Override
2525
public List<String> specify(Source source) throws SQLException {
26+
String name = K8sUtils.canonicalizeName(source.database(), source.table());
2627
Template.Environment env =
27-
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
28+
new Template.SimpleEnvironment()
29+
.with("name", name)
2830
.with("database", source.database())
2931
.with("schema", source.schema())
3032
.with("table", source.table())

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Collection;
44
import java.util.Locale;
5+
import java.util.stream.Stream;
56
import java.util.stream.Collectors;
67

78
import io.kubernetes.client.common.KubernetesType;
@@ -22,7 +23,12 @@ public static String canonicalizeName(Collection<String> parts) {
2223

2324
// TODO: Robust and reversible canonicalization
2425
public static String canonicalizeName(String name) {
25-
return name.toLowerCase(Locale.ROOT).replace("_", "");
26+
return name.toLowerCase(Locale.ROOT).replace("_", "").replace("$", "-");
27+
}
28+
29+
public static String canonicalizeName(String database, String name) {
30+
return Stream.of(database, name).filter(x -> x != null).map(x -> canonicalizeName(x))
31+
.collect(Collectors.joining("-"));
2632
}
2733

2834
// see:

hoptimator-kafka/src/test/resources/kafka-ddl.id

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ spec:
5151
config:
5252
retention.ms: 7200000
5353
segment.bytes: 1073741824
54-
!specify
54+
!specify existing-topic-1

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void setQuery(RelNode query) {
126126
}
127127

128128
/** Combine Deployables into a Pipeline */
129-
public Pipeline pipeline() throws SQLException {
129+
public Pipeline pipeline(String name) throws SQLException {
130130
List<Deployable> deployables = new ArrayList<>();
131131
for (Source source : sources.keySet()) {
132132
deployables.addAll(DeploymentService.deployables(source, Source.class));
@@ -138,7 +138,7 @@ public Pipeline pipeline() throws SQLException {
138138
}
139139
Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions);
140140
ConnectionService.configure(sink, Sink.class);
141-
Job job = new Job(sink, sql());
141+
Job job = new Job(name, sink, sql());
142142
RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
143143
deployables.addAll(DeploymentService.deployables(sink, Sink.class));
144144
deployables.addAll(DeploymentService.deployables(job, Job.class));

hoptimator-venice/src/test/resources/venice-ddl-insert-all.id

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ spec:
2121
parallelism: 1
2222
upgradeMode: stateless
2323
state: running
24-
!specify
24+
!specify test-store-1

0 commit comments

Comments
 (0)