Skip to content

Commit 3f8e7d5

Browse files
authored
Fix field projection when all fields are specified (#190)
* Fix issue when all fields are specified but not SELECT * * Make Quidem the same as hoptimator cli & fix tests
1 parent 695b3ed commit 3f8e7d5

File tree

4 files changed

+123
-26
lines changed

4 files changed

+123
-26
lines changed

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.linkedin.hoptimator.jdbc;
22

3+
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
4+
import com.linkedin.hoptimator.util.planner.PipelineRel;
35
import java.io.File;
46
import java.io.FileReader;
57
import java.io.IOException;
@@ -20,7 +22,11 @@
2022
import java.util.Properties;
2123
import java.util.stream.Collectors;
2224

25+
import org.apache.calcite.plan.RelOptTable;
2326
import org.apache.calcite.rel.RelRoot;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
2430
import org.junit.jupiter.api.Assertions;
2531

2632
import net.hydromatic.quidem.AbstractCommand;
@@ -88,14 +94,38 @@ public void execute(Context context, boolean execute) throws Exception {
8894
}
8995
String sql = context.previousSqlCommand().sql;
9096
HoptimatorConnection conn = (HoptimatorConnection) connection;
91-
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
97+
9298
String[] parts = line.split(" ", 2);
93-
String pipelineName = parts.length == 2 ? parts[1] : "test";
94-
Properties properties = new Properties();
95-
properties.putAll(conn.connectionProperties());
96-
properties.put(DeploymentService.PIPELINE_OPTION, pipelineName);
97-
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties)
98-
.pipeline(pipelineName, conn);
99+
String viewName = parts.length == 2 ? parts[1] : "test";
100+
String querySql = sql;
101+
SqlCreateMaterializedView create = null;
102+
SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql);
103+
if (sqlNode.getKind().belongsTo(SqlKind.DDL)) {
104+
if (sqlNode instanceof SqlCreateMaterializedView) {
105+
create = (SqlCreateMaterializedView) sqlNode;
106+
final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query);
107+
querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
108+
viewName = HoptimatorDdlUtils.viewName(create.name);
109+
} else {
110+
throw new RuntimeException("Unsupported DDL statement.");
111+
}
112+
}
113+
114+
RelRoot root = HoptimatorDriver.convert(conn, querySql).root;
115+
Properties connectionProperties = conn.connectionProperties();
116+
RelOptTable table = root.rel.getTable();
117+
if (table != null) {
118+
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
119+
} else if (create != null) {
120+
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, create.name.toString());
121+
}
122+
123+
PipelineRel.Implementor plan = DeploymentService.plan(root, Collections.emptyList(), connectionProperties);
124+
if (create != null) {
125+
HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(),
126+
new HoptimatorDriver.Prepare(conn), plan, create, querySql);
127+
}
128+
Pipeline pipeline = plan.pipeline(viewName, conn);
99129
List<String> specs = new ArrayList<>();
100130
for (Source source : pipeline.sources()) {
101131
specs.addAll(DeploymentService.specify(source, conn));

hoptimator-k8s/src/test/resources/k8s-ddl.id

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,45 @@ spec:
203203
upgradeMode: stateless
204204
state: running
205205
!specify AD_CLICKS
206+
207+
create or replace materialized view ads."PAGE_VIEWS$forward-field-order" as select CAMPAIGN_URN as PAGE_URN, MEMBER_URN as MEMBER_URN from ads.ad_clicks;
208+
apiVersion: flink.apache.org/v1beta1
209+
kind: FlinkSessionJob
210+
metadata:
211+
name: ads-database-pageviews-forward-field-order
212+
spec:
213+
deploymentName: basic-session-deployment
214+
job:
215+
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
216+
args:
217+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
218+
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
219+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
220+
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
221+
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS`
222+
jarURI: file:///opt/hoptimator-flink-runner.jar
223+
parallelism: 1
224+
upgradeMode: stateless
225+
state: running
226+
!specify PAGE_VIEWS
227+
228+
create or replace materialized view ads."PAGE_VIEWS$reverse-field-order" as select MEMBER_URN as MEMBER_URN, CAMPAIGN_URN as PAGE_URN from ads.ad_clicks;
229+
apiVersion: flink.apache.org/v1beta1
230+
kind: FlinkSessionJob
231+
metadata:
232+
name: ads-database-pageviews-reverse-field-order
233+
spec:
234+
deploymentName: basic-session-deployment
235+
job:
236+
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
237+
args:
238+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
239+
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
240+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
241+
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
242+
- INSERT INTO `ADS`.`PAGE_VIEWS` (`MEMBER_URN`, `PAGE_URN`) SELECT `MEMBER_URN`, `CAMPAIGN_URN` AS `PAGE_URN` FROM `ADS`.`AD_CLICKS`
243+
jarURI: file:///opt/hoptimator-flink-runner.jar
244+
parallelism: 1
245+
upgradeMode: stateless
246+
state: running
247+
!specify PAGE_VIEWS

hoptimator-kafka/src/test/resources/kafka-ddl-beam.id

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ spec:
1212
sql:
1313
- PLACEHOLDER
1414
configs:
15-
source.group.id: hoptimator-flink-beam-existing-topic-1
15+
source.group.id: hoptimator-flink-beam-KAFKA.existing-topic-1
1616
source.schemas: KAFKA
1717
source.tables: existing-topic-2
1818
sink.schema: KAFKA
1919
sink.table: existing-topic-1
2020
fields: '{"VALUE":"VALUE","KEY":"KEY"}'
21-
pipeline: 'existing-topic-1'
21+
pipeline: 'KAFKA.existing-topic-1'
2222
flink.app.type: 'BEAM'
2323
flink.app.name: 'hoptimator-flink-runner'
2424
---

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import javax.annotation.Nullable;
1414
import org.apache.calcite.rel.RelNode;
15+
import org.apache.calcite.rel.core.Project;
1516
import org.apache.calcite.rel.core.RelFactories;
1617
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
1718
import org.apache.calcite.rel.rel2sql.SqlImplementor;
@@ -347,7 +348,16 @@ public void implement(SqlWriter w) {
347348
String effectiveTable = suffix != null ? table + suffix : table;
348349
(new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w);
349350
RelNode project = dropFields(relNode, targetFields);
350-
(new ColumnListImplementor(project.getRowType())).implement(w);
351+
352+
// If the relNode is a Project (or subclass), the field names should already match the sink.
353+
// Otherwise, like in SELECT * situations, the relNode fields will match the source field names, so
354+
// we need to directly use targetFields to map correctly.
355+
if (relNode instanceof Project) {
356+
(new ColumnListImplementor(project.getRowType().getFieldNames())).implement(w);
357+
} else {
358+
(new ColumnListImplementor(targetFields.rightList())).implement(w);
359+
}
360+
351361
(new QueryImplementor(project, tableNameReplacements)).implement(w);
352362
w.literal(";");
353363
}
@@ -356,15 +366,35 @@ public void implement(SqlWriter w) {
356366
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
357367
private static RelNode dropFields(RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
358368
List<Integer> cols = new ArrayList<>();
359-
int i = 0;
360369
List<String> targetFieldNames = targetFields.rightList();
361-
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
362-
if (targetFieldNames.contains(field.getName())
363-
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
364-
cols.add(i);
370+
List<RelDataTypeField> sourceFields = relNode.getRowType().getFieldList();
371+
372+
// If the relNode is a Project (or subclass), the field names should already match the target
373+
// because the projection explicitly renamed them. Use name-based matching.
374+
if (relNode instanceof Project) {
375+
for (int i = 0; i < sourceFields.size(); i++) {
376+
RelDataTypeField field = sourceFields.get(i);
377+
if (targetFieldNames.contains(field.getName())
378+
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
379+
cols.add(i);
380+
}
381+
}
382+
383+
return createForceProject(relNode, cols);
384+
}
385+
386+
// Otherwise (e.g., TableScan), the projection was optimized away.
387+
// Use index-based matching from targetFields to select the right columns.
388+
for (int i = 0; i < targetFields.size(); i++) {
389+
int fieldIndex = targetFields.leftList().get(i);
390+
if (fieldIndex < sourceFields.size()) {
391+
RelDataTypeField field = sourceFields.get(fieldIndex);
392+
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
393+
cols.add(fieldIndex);
394+
}
365395
}
366-
i++;
367396
}
397+
368398
return createForceProject(relNode, cols);
369399
}
370400
}
@@ -605,25 +635,20 @@ private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSp
605635

606636
/** Implements column lists, e.g. `NAME, AGE` */
607637
class ColumnListImplementor implements ScriptImplementor {
608-
private final List<RelDataTypeField> fields;
638+
private final List<String> fieldNames;
609639

610-
public ColumnListImplementor(RelDataType dataType) {
611-
this(dataType.getFieldList());
612-
}
613-
614-
public ColumnListImplementor(List<RelDataTypeField> fields) {
615-
this.fields = fields;
640+
ColumnListImplementor(List<String> fieldNames) {
641+
this.fieldNames = fieldNames;
616642
}
617643

618644
@Override
619645
public void implement(SqlWriter w) {
620646
SqlWriter.Frame frame1 = w.startList("(", ")");
621-
List<SqlIdentifier> fieldNames = fields.stream()
622-
.map(RelDataTypeField::getName)
647+
List<SqlIdentifier> identifiers = fieldNames.stream()
623648
.map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR
624649
.map(x -> new SqlIdentifier(x, SqlParserPos.ZERO))
625650
.collect(Collectors.toList());
626-
for (SqlIdentifier fieldName : fieldNames) {
651+
for (SqlIdentifier fieldName : identifiers) {
627652
w.sep(",");
628653
fieldName.unparse(w, 0, 0);
629654
}

0 commit comments

Comments
 (0)