Skip to content

Commit 689300b

Browse files
authored
Fix ScriptImplementor projection when using filters and add many tests (#136)
* Add some missing error handling and tests around errors * Add tests for venice primitive keys * Pass through missing properties * Fix issues with top-level filter passing through wrong fields to ScriptImplementor
1 parent 7ae6ec7 commit 689300b

File tree

18 files changed

+208
-155
lines changed

18 files changed

+208
-155
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ undeploy-kafka:
8383
# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
8484
deploy-venice: deploy deploy-flink
8585
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml up -d --wait
86-
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
87-
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
86+
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchemaRecord.avsc schemas/valueSchema.avsc
87+
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-primitive schemas/keySchemaPrimitive.avsc schemas/valueSchema.avsc
8888
kubectl apply -f ./deploy/samples/venicedb.yaml
8989

9090
undeploy-venice:
@@ -101,7 +101,7 @@ integration-tests: deploy-dev-environment
101101
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
102102
kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid
103103
kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid
104-
./gradlew intTest --no-parallel || kill `cat port-forward.pid port-forward-2.pid, port-forward-3.pid`
104+
./gradlew intTest --no-parallel || kill `cat port-forward.pid port-forward-2.pid port-forward-3.pid`
105105
kill `cat port-forward.pid`
106106
kill `cat port-forward-2.pid`
107107
kill `cat port-forward-3.pid`
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"int"
File renamed without changes.

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import com.linkedin.hoptimator.Pipeline;
6666
import com.linkedin.hoptimator.View;
6767
import com.linkedin.hoptimator.util.DeploymentService;
68+
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable;
6869
import com.linkedin.hoptimator.util.planner.PipelineRel;
6970

7071

@@ -99,7 +100,14 @@ public DdlExecutor getDdlExecutor() {
99100
@Override
100101
public void execute(SqlCreateView create, CalcitePrepare.Context context) {
101102
final Pair<CalciteSchema, String> pair = schema(context, true, create.name);
103+
if (pair.left == null) {
104+
throw new DdlException(create, "Schema for " + create.name + " not found.");
105+
}
102106
final SchemaPlus schemaPlus = pair.left.plus();
107+
if (schemaPlus.getTable(pair.right) instanceof HoptimatorJdbcTable) {
108+
throw new DdlException(create,
109+
"Cannot overwrite physical table " + pair.right + " with a view.");
110+
}
103111
for (Function function : schemaPlus.getFunctions(pair.right)) {
104112
if (function.getParameters().isEmpty()) {
105113
if (!create.getReplace()) {
@@ -139,17 +147,22 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
139147
public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context context) {
140148
final Pair<CalciteSchema, String> pair = schema(context, true, create.name);
141149
if (pair.left == null) {
142-
throw new DdlException(create, "Schema for " + create.name.getSimple() + " not found.");
150+
throw new DdlException(create, "Schema for " + create.name + " not found.");
143151
}
144-
if (pair.left.plus().getTable(pair.right) != null) {
152+
final SchemaPlus schemaPlus = pair.left.plus();
153+
if (schemaPlus.getTable(pair.right) != null) {
154+
if (schemaPlus.getTable(pair.right) instanceof HoptimatorJdbcTable) {
155+
throw new DdlException(create,
156+
"Cannot overwrite physical table " + pair.right + " with a view.");
157+
}
145158
// Materialized view exists.
146159
if (!create.ifNotExists && !create.getReplace()) {
147160
// They did not specify IF NOT EXISTS, so give error.
148161
throw new DdlException(create,
149162
"View " + pair.right + " already exists. Use CREATE OR REPLACE to update.");
150163
}
151164
if (create.getReplace()) {
152-
pair.left.plus().removeTable(pair.right);
165+
schemaPlus.removeTable(pair.right);
153166
} else {
154167
// nothing to do
155168
return;
@@ -160,7 +173,6 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
160173
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
161174
final List<String> schemaPath = pair.left.path(null);
162175

163-
SchemaPlus schemaPlus = pair.left.plus();
164176
String schemaName = schemaPlus.getName();
165177
String viewName = pair.right;
166178
List<String> viewPath = new ArrayList<>(schemaPath);
@@ -190,7 +202,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
190202
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName);
191203
List<String> sinkPath = new ArrayList<>(schemaPath);
192204
sinkPath.add(sinkName);
193-
Table sink = pair.left.plus().getTable(sinkName);
205+
Table sink = schemaPlus.getTable(sinkName);
194206

195207
final RelDataType rowType;
196208
if (sink != null) {

hoptimator-jdbc/src/test/resources/validation.id

Lines changed: 0 additions & 17 deletions
This file was deleted.

hoptimator-jdbc/src/test/resources/views.id

Lines changed: 0 additions & 61 deletions
This file was deleted.

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void execute(Context context, boolean execute) throws Exception {
8080
String sql = context.previousSqlCommand().sql;
8181
HoptimatorConnection conn = (HoptimatorConnection) connection;
8282
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
83-
String []parts = line.split(" ", 2);
83+
String[] parts = line.split(" ", 2);
8484
String pipelineName = parts.length == 2 ? parts[1] : "test";
8585
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), conn.connectionProperties())
8686
.pipeline(pipelineName, conn.connectionProperties());

hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@ public void k8sDdlScript() throws Exception {
1919
public void k8sDdlScriptFunction() throws Exception {
2020
run("k8s-ddl-function.id", "fun=mysql");
2121
}
22+
23+
@Test
24+
@Tag("integration")
25+
public void k8sValidationScript() throws Exception {
26+
run("k8s-validation.id");
27+
}
2228
}

hoptimator-k8s/src/test/resources/k8s-ddl.id

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,41 @@ select * from ads.target2;
8888

8989
!ok
9090

91+
create or replace materialized view ads."PAGE_VIEWS$filter" as select first_name as page_urn, last_name as member_urn from profile.members where first_name = 'Alice';
92+
(0 rows modified)
93+
94+
!update
95+
96+
select * from ads."PAGE_VIEWS$filter";
97+
+----------+------------+
98+
| PAGE_URN | MEMBER_URN |
99+
+----------+------------+
100+
| Alice | Addison |
101+
+----------+------------+
102+
(1 row)
103+
104+
!ok
105+
106+
create or replace materialized view ads."PAGE_VIEWS$filter" (member_urn) as select last_name as member_urn from profile.members where last_name = 'Addison';
107+
(0 rows modified)
108+
109+
!update
110+
111+
select * from ads."PAGE_VIEWS$filter";
112+
+------------+
113+
| MEMBER_URN |
114+
+------------+
115+
| Addison |
116+
+------------+
117+
(1 row)
118+
119+
!ok
120+
121+
drop materialized view ads."PAGE_VIEWS$filter";
122+
(0 rows modified)
123+
124+
!update
125+
91126
drop materialized view ads.target2;
92127
(0 rows modified)
93128

@@ -106,4 +141,25 @@ drop materialized view ads.audience2;
106141
drop view ads.audience;
107142
(0 rows modified)
108143

109-
!update
144+
!update
145+
146+
insert into ads.page_views select first_name as page_urn, last_name as member_urn from profile.members where first_name = 'Alice';
147+
apiVersion: flink.apache.org/v1beta1
148+
kind: FlinkSessionJob
149+
metadata:
150+
name: ads-database-pageviews
151+
spec:
152+
deploymentName: basic-session-deployment
153+
job:
154+
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
155+
args:
156+
- CREATE DATABASE IF NOT EXISTS `PROFILE` WITH ()
157+
- CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
158+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
159+
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
160+
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `FIRST_NAME` AS `PAGE_URN`, `LAST_NAME` AS `MEMBER_URN` FROM `PROFILE`.`MEMBERS` WHERE `FIRST_NAME` = 'Alice'
161+
jarURI: file:///opt/hoptimator-flink-runner.jar
162+
parallelism: 1
163+
upgradeMode: stateless
164+
state: running
165+
!specify PAGE_VIEWS
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
!set outputformat mysql
2+
!use k8s
3+
4+
create or replace materialized view ads.page_views as select first_name, last_name from profile.members;
5+
Cannot overwrite physical table PAGE_VIEWS with a view.
6+
!error
7+
8+
create or replace materialized view invalid.page_views as select first_name, last_name from profile.members;
9+
Schema for INVALID.PAGE_VIEWS not found.
10+
!error
11+
12+
create or replace materialized view ads.audience as select first_name, last_name from ads.page_views natural join profile.members;
13+
(0 rows modified)
14+
15+
!update
16+
17+
create materialized view ads.audience as select first_name, last_name from ads.page_views natural join profile.members;
18+
View AUDIENCE already exists. Use CREATE OR REPLACE to update.
19+
!error
20+
21+
drop materialized view ads.audience;
22+
(0 rows modified)
23+
24+
!update
25+
26+
create or replace materialized view ads."PAGE_VIEWS$myview" as select first_name from profile.members;
27+
Field FIRST_NAME not found in sink schema
28+
!error
29+
30+
create or replace materialized view ads."PAGE_VIEWS$myview" (page_urn, member_urn) as select first_name from profile.members;
31+
List of column aliases must have same degree as table; table has 1 columns ('FIRST_NAME'), whereas alias list has 2 columns
32+
!error
33+
34+
create or replace materialized view ads."PAGE_VIEWS$myview" (page_urn) as select first_name, last_name from profile.members;
35+
List of column aliases must have same degree as table; table has 2 columns ('FIRST_NAME', 'LAST_NAME'), whereas alias list has 1 columns
36+
!error

0 commit comments

Comments
 (0)