Skip to content

Commit e2b5d2f

Browse files
authored
Fix compound identifier bug (#91)
1 parent ffdba68 commit e2b5d2f

File tree

7 files changed

+55
-55
lines changed

7 files changed

+55
-55
lines changed

hoptimator-kafka/src/test/resources/kafka-ddl.id

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ spec:
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json')
16-
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json')
17-
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `existing-topic-2`
15+
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.format'='json')
17+
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.format'='json')
19+
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
1820
jarURI: file:///opt/hoptimator-flink-runner.jar
1921
parallelism: 1
2022
upgradeMode: stateless

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,9 @@ public Pipeline pipeline() throws SQLException {
148148
private ScriptImplementor script() throws SQLException {
149149
ScriptImplementor script = ScriptImplementor.empty();
150150
for (Map.Entry<Source, RelDataType> source : sources.entrySet()) {
151+
script = script.database(source.getKey().schema());
151152
Map<String, String> configs = ConnectionService.configure(source.getKey(), Source.class);
152-
script = script.connector(source.getKey().table(), source.getValue(), configs);
153+
script = script.connector(source.getKey().schema(), source.getKey().table(), source.getValue(), configs);
153154
}
154155
return script;
155156
}
@@ -163,8 +164,9 @@ public Function<SqlDialect, String> sql() throws SQLException {
163164
}
164165
Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions);
165166
Map<String, String> sinkConfigs = ConnectionService.configure(sink, Sink.class);
166-
script = script.connector(sink.table(), targetRowType, sinkConfigs);
167-
script = script.insert(sink.table(), query, targetFields);
167+
script = script.database(sink.schema());
168+
script = script.connector(sink.schema(), sink.table(), targetRowType, sinkConfigs);
169+
script = script.insert(sink.schema(), sink.table(), query, targetFields);
168170
RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
169171
return script.seal();
170172
}

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ default ScriptImplementor query(RelNode relNode) {
9393
}
9494

9595
/** Append a connector definition, e.g. `CREATE TABLE ... WITH (...)` */
96-
default ScriptImplementor connector(String name, RelDataType rowType, Map<String, String> connectorConfig) {
97-
return with(new ConnectorImplementor(name, rowType, connectorConfig));
96+
default ScriptImplementor connector(String schema, String table, RelDataType rowType, Map<String, String> connectorConfig) {
97+
return with(new ConnectorImplementor(schema, table, rowType, connectorConfig));
9898
}
9999

100100
/** Append a database definition, e.g. `CREATE DATABASE ...` */
@@ -103,8 +103,8 @@ default ScriptImplementor database(String database) {
103103
}
104104

105105
/** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */
106-
default ScriptImplementor insert(String name, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
107-
return with(new InsertImplementor(name, relNode, targetFields));
106+
default ScriptImplementor insert(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
107+
return with(new InsertImplementor(schema, table, relNode, targetFields));
108108
}
109109

110110
/** Render the script as DDL/SQL in the default dialect */
@@ -157,15 +157,9 @@ public void implement(SqlWriter w) {
157157
/** Implements an arbitrary RelNode as a query */
158158
class QueryImplementor implements ScriptImplementor {
159159
private final RelNode relNode;
160-
private final boolean dropDatabaseName;
161160

162161
public QueryImplementor(RelNode relNode) {
163-
this(relNode, true);
164-
}
165-
166-
public QueryImplementor(RelNode relNode, boolean dropDatabaseName) {
167162
this.relNode = relNode;
168-
this.dropDatabaseName = dropDatabaseName;
169163
}
170164

171165
@Override
@@ -198,32 +192,24 @@ public SqlNode visit(SqlCall call) {
198192

199193
private static class UnflattenMemberAccess extends SqlShuttle {
200194
private final Set<String> sinkFieldList;
201-
private final boolean dropDatabaseName;
202195

203196
UnflattenMemberAccess(QueryImplementor outer) {
204197
this.sinkFieldList = outer.relNode.getRowType().getFieldList()
205198
.stream()
206199
.map(RelDataTypeField::getName)
207200
.collect(Collectors.toSet());
208-
this.dropDatabaseName = outer.dropDatabaseName;
209201
}
210202

211203
// SqlShuttle gets called for every field in SELECT and every table name in FROM alike
212204
// For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR`
213-
// or just `BAR` if we need to drop the database name (For Flink)
214205
@Override
215206
public SqlNode visit(SqlIdentifier id) {
216207
if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) {
217208
id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO));
218209
} else {
219-
SqlIdentifier replacement;
220-
if (id.names.size() == 2 && this.dropDatabaseName) {
221-
replacement = new SqlIdentifier(id.names.subList(1, id.names.size()), SqlParserPos.ZERO);
222-
} else {
223-
replacement = new SqlIdentifier(id.names.stream()
224-
.flatMap(x -> Stream.of(x.split("\\$")))
225-
.collect(Collectors.toList()), SqlParserPos.ZERO);
226-
}
210+
SqlIdentifier replacement = new SqlIdentifier(id.names.stream()
211+
.flatMap(x -> Stream.of(x.split("\\$")))
212+
.collect(Collectors.toList()), SqlParserPos.ZERO);
227213
id.assignNamesFrom(replacement);
228214
}
229215
return id;
@@ -239,20 +225,22 @@ public SqlNode visit(SqlIdentifier id) {
239225
* - NULL fields are promoted to BYTES
240226
*/
241227
class ConnectorImplementor implements ScriptImplementor {
242-
private final String name;
228+
private final String schema;
229+
private final String table;
243230
private final RelDataType rowType;
244231
private final Map<String, String> connectorConfig;
245232

246-
public ConnectorImplementor(String name, RelDataType rowType, Map<String, String> connectorConfig) {
247-
this.name = name;
233+
public ConnectorImplementor(String schema, String table, RelDataType rowType, Map<String, String> connectorConfig) {
234+
this.schema = schema;
235+
this.table = table;
248236
this.rowType = rowType;
249237
this.connectorConfig = connectorConfig;
250238
}
251239

252240
@Override
253241
public void implement(SqlWriter w) {
254242
w.keyword("CREATE TABLE IF NOT EXISTS");
255-
(new IdentifierImplementor(name)).implement(w);
243+
(new CompoundIdentifierImplementor(schema, table)).implement(w);
256244
SqlWriter.Frame frame1 = w.startList("(", ")");
257245
(new RowTypeSpecImplementor(rowType)).implement(w);
258246
if (rowType.getField("PRIMARY_KEY", true, false) != null) {
@@ -296,20 +284,22 @@ public void implement(SqlWriter w) {
296284
*
297285
* */
298286
class InsertImplementor implements ScriptImplementor {
299-
private final String name;
287+
private final String schema;
288+
private final String table;
300289
private final RelNode relNode;
301290
private final ImmutableList<Pair<Integer, String>> targetFields;
302291

303-
public InsertImplementor(String name, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
304-
this.name = name;
292+
public InsertImplementor(String schema, String table, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
293+
this.schema = schema;
294+
this.table = table;
305295
this.relNode = relNode;
306296
this.targetFields = targetFields;
307297
}
308298

309299
@Override
310300
public void implement(SqlWriter w) {
311301
w.keyword("INSERT INTO");
312-
(new IdentifierImplementor(name)).implement(w);
302+
(new CompoundIdentifierImplementor(schema, table)).implement(w);
313303
RelNode project = dropFields(relNode, targetFields);
314304
(new ColumnListImplementor(project.getRowType())).implement(w);
315305
(new QueryImplementor(project)).implement(w);
@@ -394,17 +384,17 @@ public void implement(SqlWriter w) {
394384

395385
/** Implements an identifier like TRACKING.'PageViewEvent' */
396386
class CompoundIdentifierImplementor implements ScriptImplementor {
397-
private final String database;
398-
private final String name;
387+
private final String schema;
388+
private final String table;
399389

400-
public CompoundIdentifierImplementor(String database, String name) {
401-
this.database = database;
402-
this.name = name;
390+
public CompoundIdentifierImplementor(String schema, String table) {
391+
this.schema = schema;
392+
this.table = table;
403393
}
404394

405395
@Override
406396
public void implement(SqlWriter w) {
407-
SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(new String[]{database, name}), SqlParserPos.ZERO);
397+
SqlIdentifier identifier = new SqlIdentifier(Arrays.asList(new String[]{schema, table}), SqlParserPos.ZERO);
408398
identifier.unparse(w, 0, 0);
409399
}
410400
}

hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ public void flattenUnflatten() {
4141
flattenedNames);
4242
RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
4343
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
44-
String originalConnector = new ScriptImplementor.ConnectorImplementor("T1",
44+
String originalConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
4545
rowType, Collections.emptyMap()).sql();
46-
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("T1",
46+
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
4747
unflattenedType, Collections.emptyMap()).sql();
4848
Assertions.assertEquals(originalConnector, unflattenedConnector,
4949
"Flattening and unflattening data types should have no impact on connector");
50-
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ROW(`QUX` VARCHAR, "
50+
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (`FOO` ROW(`QUX` VARCHAR, "
5151
+ "`QIZ` VARCHAR), `BAR` ROW(`BAZ` VARCHAR)) WITH ();", unflattenedConnector,
5252
"Flattened-unflattened connector should be correct");
5353
}
@@ -73,9 +73,9 @@ public void flattenNestedArrays() {
7373
.collect(Collectors.toList());
7474
Assertions.assertIterableEquals(Arrays.asList(new String[]{"FOO", "BAR", "CAR"}),
7575
flattenedNames);
76-
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("T1",
76+
String flattenedConnector = new ScriptImplementor.ConnectorImplementor("S", "T1",
7777
flattenedType, Collections.emptyMap()).sql();
78-
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ANY ARRAY, "
78+
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (`FOO` ANY ARRAY, "
7979
+ "`BAR` ANY ARRAY, `CAR` FLOAT ARRAY) WITH ();", flattenedConnector,
8080
"Flattened connector should have simplified arrays");
8181
}

hoptimator-venice/src/test/resources/venice-ddl-insert-all.id

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ spec:
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
16-
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
17-
- INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store`
15+
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17+
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
1820
jarURI: file:///opt/hoptimator-flink-runner.jar
1921
parallelism: 1
2022
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ spec:
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
16-
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
17-
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `test-store`
15+
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17+
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
1820
jarURI: file:///opt/hoptimator-flink-runner.jar
1921
parallelism: 1
2022
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-select.id

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ spec:
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
16-
- CREATE TABLE IF NOT EXISTS `SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ()
17-
- INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `test-store-1`
15+
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
17+
- CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ()
19+
- INSERT INTO `PIPELINE`.`SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
1820
jarURI: file:///opt/hoptimator-flink-runner.jar
1921
parallelism: 1
2022
upgradeMode: stateless

0 commit comments

Comments
 (0)