Skip to content

Commit 6aafc90

Browse files
authored
feat: support multiple planner output dialects (#31)
* feat: support multiple planner output dialects * fix: minor
1 parent 36c397b commit 6aafc90

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.hoptimator;
22

3+
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
34
import sqlline.SqlLine;
45
import sqlline.CommandHandler;
56
import sqlline.DispatchCallback;
@@ -247,7 +248,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
247248
sqlline.output("SQL:");
248249
HopTable outputTable = new HopTable("PIPELINE", "SINK", plan.getRowType(),
249250
Collections.singletonMap("connector", "dummy"));
250-
sqlline.output(impl.insertInto(outputTable));
251+
sqlline.output(impl.insertInto(outputTable).sql(MysqlSqlDialect.DEFAULT));
251252
dispatchCallback.setToSuccess();
252253
} catch (Exception e) {
253254
sqlline.error(e.toString());
@@ -345,7 +346,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
345346
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
346347
PipelineRel plan = planner.pipeline(query);
347348
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
348-
String pipelineSql = impl.query();
349+
String pipelineSql = impl.query().sql(MysqlSqlDialect.DEFAULT);
349350
FlinkIterable iterable = new FlinkIterable(pipelineSql);
350351
Iterator<String> iter = iterable.<String>field(0, 1).iterator();
351352
switch(checkType) {
@@ -454,7 +455,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
454455
PipelineRel plan = planner.pipeline("SELECT " + query);
455456
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
456457
HopTable sink = planner.database(database).makeTable(table, impl.rowType());
457-
String pipelineSql = impl.insertInto(sink) + "\nSELECT 'SUCCESS';";
458+
String pipelineSql = impl.insertInto(sink).sql(MysqlSqlDialect.DEFAULT) + "\nSELECT 'SUCCESS';";
458459
FlinkIterable iterable = new FlinkIterable(pipelineSql);
459460
Iterator<String> iter = iterable.<String>field(0).iterator();
460461
if (iter.hasNext()) {

hoptimator-cli/src/main/java/com/linkedin/hoptimator/LocalFlinkRules.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.calcite.rel.convert.ConverterImpl;
2222
import org.apache.calcite.rel.convert.ConverterRule;
2323
import org.apache.calcite.runtime.Hook;
24+
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
2425
import org.apache.calcite.util.BuiltInMethod;
2526

2627
import java.util.Collections;
@@ -64,7 +65,7 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
6465
BlockBuilder builder = new BlockBuilder();
6566
PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), rowType, pref.preferCustom());
6667
PipelineRel.Implementor impl = new PipelineRel.Implementor(getInput());
67-
String sql = impl.query();
68+
String sql = impl.query().sql(MysqlSqlDialect.DEFAULT);
6869
Hook.QUERY_PLAN.run(sql); // for script validation in tests
6970
HoptimatorHook.QUERY_PLAN.run(sql); // ditto
7071
Expression iter = builder.append("iter", Expressions.new_(FlinkIterable.class, Expressions.constant(sql),

hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66
import org.apache.calcite.rel.type.RelProtoDataType;
77
import org.apache.calcite.rel.type.RelDataTypeImpl;
88
import org.apache.calcite.sql.SqlDialect;
9-
import org.apache.calcite.sql.dialect.MysqlSqlDialect;
9+
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
1010

1111
import com.linkedin.hoptimator.catalog.Resource;
1212
import com.linkedin.hoptimator.catalog.HopTable;
1313
import com.linkedin.hoptimator.catalog.ScriptImplementor;
1414

1515
import java.util.ArrayList;
1616
import java.util.List;
17-
import java.util.Map;
1817

1918
/**
2019
* Calling convention which implements an SQL-based streaming data pipeline.
@@ -36,8 +35,6 @@
3635
public interface PipelineRel extends RelNode {
3736

3837
Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class);
39-
SqlDialect OUTPUT_DIALECT = MysqlSqlDialect.DEFAULT; // closely resembles Flink SQL
40-
// TODO support alternative output dialects
4138

4239
void implement(Implementor implementor);
4340

@@ -72,14 +69,14 @@ private void visit(RelNode input) {
7269
}
7370

7471
/** Script ending in SELECT... */
75-
public String query() {
76-
return script.query(relNode).sql(OUTPUT_DIALECT);
72+
public ScriptImplementor query() {
73+
return script.query(relNode);
7774
}
7875

7976
/** Script ending in INSERT INTO ... */
80-
public String insertInto(HopTable sink) {
77+
public ScriptImplementor insertInto(HopTable sink) {
8178
return script.database(sink.database()).with(sink)
82-
.insert(sink.database(), sink.name(), relNode).sql(OUTPUT_DIALECT);
79+
.insert(sink.database(), sink.name(), relNode);
8380
}
8481

8582
/** Add any resources, SQL, DDL etc required to access the table. */
@@ -88,11 +85,15 @@ public void implement(HopTable table) {
8885
table.resources().forEach(x -> resource(x));
8986
}
9087

91-
/** Combine SQL and any Resources into a Pipeline */
9288
public Pipeline pipeline(HopTable sink) {
89+
return pipeline(sink, AnsiSqlDialect.DEFAULT);
90+
}
91+
92+
/** Combine SQL and any Resources into a Pipeline */
93+
public Pipeline pipeline(HopTable sink, SqlDialect sqlDialect) {
9394
List<Resource> resourcesAndJob = new ArrayList<>();
9495
resourcesAndJob.addAll(resources);
95-
resourcesAndJob.add(new SqlJob(insertInto(sink)));
96+
resourcesAndJob.add(new SqlJob(insertInto(sink).sql(sqlDialect)));
9697
return new Pipeline(resourcesAndJob, rowType());
9798
}
9899
}

0 commit comments

Comments
 (0)