Skip to content

Commit 50e4f13

Browse files
Use TableTriggers As TableTemplates With Hint-able Expressions (#149)
* Use TableTriggers As TableTemplates With Hintable Expressions * Addressing Comments * Revert testing version change * Rebase Master * Revert testing version change * Do not use try with resources, otherwise, the resource is closed at the block end * Update integration test to include the new trigger linked via hints * Fix Integration testing cleanup and sorting --------- Co-authored-by: Shrinand Thakkar <[email protected]>
1 parent 89c8a9d commit 50e4f13

File tree

6 files changed

+105
-38
lines changed

6 files changed

+105
-38
lines changed

deploy/samples/demodb.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,37 @@ spec:
4848
- Modify
4949
connector: |
5050
connector = blackhole
51+
52+
---
53+
54+
apiVersion: hoptimator.linkedin.com/v1alpha1
55+
kind: TableTemplate
56+
metadata:
57+
name: ads-source-trigger-template
58+
spec:
59+
databases:
60+
- ads-database
61+
methods:
62+
- Scan
63+
yaml: |
64+
apiVersion: hoptimator.linkedin.com/v1alpha1
65+
kind: TableTrigger
66+
metadata:
67+
name: {{name}}-trigger
68+
spec:
69+
schema: KAFKA
70+
table: {{offline.table.name}}
71+
yaml: |
72+
apiVersion: batch/v1
73+
kind: Job
74+
metadata:
75+
name: {{name}}-job
76+
spec:
77+
template:
78+
spec:
79+
containers:
80+
- name: hello
81+
image: alpine/k8s:1.33.0
82+
command: ["bash", "-c", "echo {{name}}-trigger fired at `date`"]
83+
restartPolicy: Never
84+
backoffLimit: 4

hoptimator

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
88
sqlline.SqlLine \
99
-ac sqlline.HoptimatorAppConfig \
1010
--verbose \
11-
-u "jdbc:hoptimator://fun=mysql" -n "" -p "" -nn "Hoptimator" $@
11+
-u "jdbc:hoptimator://fun=mysql;hints=offline.table.name=ads_offline" -n "" -p "" -nn "Hoptimator" $@
1212

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,39 @@
66
import java.util.stream.Collectors;
77

88
import com.linkedin.hoptimator.Source;
9+
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
910
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
1011
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
1112
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateSpec;
13+
import com.linkedin.hoptimator.util.DeploymentService;
1214
import com.linkedin.hoptimator.util.Template;
1315

14-
1516
/** Specifies an abstract Source with concrete YAML by applying TableTemplates. */
1617
class K8sSourceDeployer extends K8sYamlDeployer {
17-
18+
private final K8sContext context;
1819
private final Source source;
1920
private final K8sApi<V1alpha1TableTemplate, V1alpha1TableTemplateList> tableTemplateApi;
2021

2122
K8sSourceDeployer(Source source, K8sContext context) {
2223
super(context);
24+
this.context = context;
2325
this.source = source;
2426
this.tableTemplateApi = new K8sApi<>(context, K8sApiEndpoints.TABLE_TEMPLATES);
2527
}
2628

2729
@Override
2830
public List<String> specify() throws SQLException {
2931
String name = K8sUtils.canonicalizeName(source.database(), source.table());
32+
HoptimatorConnection connection = context.connection();
3033
Template.Environment env =
3134
new Template.SimpleEnvironment()
3235
.with("name", name)
3336
.with("database", source.database())
3437
.with("schema", source.schema())
3538
.with("table", source.table())
36-
.with(source.options());
39+
.with(source.options())
40+
.with(DeploymentService.parseHints(connection.connectionProperties()));
41+
3742
return tableTemplateApi.list()
3843
.stream()
3944
.map(V1alpha1TableTemplate::getSpec)
@@ -43,6 +48,7 @@ public List<String> specify() throws SQLException {
4348
.map(V1alpha1TableTemplateSpec::getYaml)
4449
.filter(Objects::nonNull)
4550
.map(x -> new Template.SimpleTemplate(x).render(env))
51+
.filter(Objects::nonNull) // Filter out null templates (which might have errored out due to missing hint expressions)
4652
.collect(Collectors.toList());
4753
}
4854
}

hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@ public void k8sValidationScript() throws Exception {
2929
@Test
3030
@Tag("integration")
3131
public void k8sMetadataTables() throws Exception {
32-
run("k8s-metadata.id");
32+
run("k8s-metadata.id", "hints=offline.table.name=ads_offline");
3333
}
3434
}

hoptimator-k8s/src/test/resources/k8s-metadata.id

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,49 +61,66 @@ select name, failed from "k8s".pipelines order by name;
6161
!ok
6262

6363
select name, failed from "k8s".pipeline_elements order by name;
64-
+----------------------------------------+--------+
65-
| NAME | FAILED |
66-
+----------------------------------------+--------+
67-
| FlinkSessionJob/ads-database-audience2 | false |
68-
| FlinkSessionJob/ads-database-pages | false |
69-
+----------------------------------------+--------+
70-
(2 rows)
64+
+---------------------------------------------+--------+
65+
| NAME | FAILED |
66+
+---------------------------------------------+--------+
67+
| FlinkSessionJob/ads-database-audience2 | false |
68+
| FlinkSessionJob/ads-database-pages | false |
69+
| TableTrigger/ads-database-pageviews-trigger | false |
70+
+---------------------------------------------+--------+
71+
(3 rows)
7172

7273
!ok
7374

74-
select * from "k8s".pipeline_element_map order by element_name;
75-
+----------------------------------------+---------------+
76-
| ELEMENT_NAME | PIPELINE_NAME |
77-
+----------------------------------------+---------------+
78-
| FlinkSessionJob/ads-database-audience2 | ads-audience2 |
79-
| FlinkSessionJob/ads-database-pages | ads-pages |
80-
+----------------------------------------+---------------+
81-
(2 rows)
75+
select * from "k8s".pipeline_element_map order by element_name, pipeline_name;
76+
+---------------------------------------------+---------------+
77+
| ELEMENT_NAME | PIPELINE_NAME |
78+
+---------------------------------------------+---------------+
79+
| FlinkSessionJob/ads-database-audience2 | ads-audience2 |
80+
| FlinkSessionJob/ads-database-pages | ads-pages |
81+
| TableTrigger/ads-database-pageviews-trigger | ads-audience2 |
82+
| TableTrigger/ads-database-pageviews-trigger | ads-pages |
83+
+---------------------------------------------+---------------+
84+
(4 rows)
8285

8386
!ok
8487

85-
select pl.name as pipeline_name, pe.element_name, pe.failed as element_failed from "k8s".pipelines pl inner join (select t2.element_name, t1.failed, t2.pipeline_name from "k8s".pipeline_elements t1 inner join "k8s".pipeline_element_map t2 on t1.name = t2.element_name) pe on pl.name = pe.pipeline_name order by pipeline_name;
86-
+---------------+----------------------------------------+----------------+
87-
| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED |
88-
+---------------+----------------------------------------+----------------+
89-
| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false |
90-
| ads-pages | FlinkSessionJob/ads-database-pages | false |
91-
+---------------+----------------------------------------+----------------+
92-
(2 rows)
88+
select pl.name as pipeline_name, pe.element_name, pe.failed as element_failed from "k8s".pipelines pl inner join (select t2.element_name, t1.failed, t2.pipeline_name from "k8s".pipeline_elements t1 inner join "k8s".pipeline_element_map t2 on t1.
89+
name = t2.element_name) pe on pl.name = pe.pipeline_name order by pipeline_name, element_name;
90+
+---------------+---------------------------------------------+----------------+
91+
| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED |
92+
+---------------+---------------------------------------------+----------------+
93+
| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false |
94+
| ads-audience2 | TableTrigger/ads-database-pageviews-trigger | false |
95+
| ads-pages | FlinkSessionJob/ads-database-pages | false |
96+
| ads-pages | TableTrigger/ads-database-pageviews-trigger | false |
97+
+---------------+---------------------------------------------+----------------+
98+
(4 rows)
9399

94100
!ok
95101

96102
select name, "SCHEMA", "TABLE" from "k8s".TABLE_TRIGGERS;
97-
+--------------------+--------+------------------+
98-
| NAME | SCHEMA | TABLE |
99-
+--------------------+--------+------------------+
100-
| test-table-trigger | KAFKA | existing-topic-1 |
101-
+--------------------+--------+------------------+
102-
(1 row)
103+
+--------------------------------+--------+------------------+
104+
| NAME | SCHEMA | TABLE |
105+
+--------------------------------+--------+------------------+
106+
| test-table-trigger | KAFKA | existing-topic-1 |
107+
| ads-database-pageviews-trigger | KAFKA | ads_offline |
108+
+--------------------------------+--------+------------------+
109+
(2 rows)
103110

104111
!ok
105112

106113
drop materialized view ads.pages;
107114
(0 rows modified)
108115

109116
!update
117+
118+
drop materialized view ads.audience;
119+
(0 rows modified)
120+
121+
!update
122+
123+
drop materialized view ads.audience2;
124+
(0 rows modified)
125+
126+
!update

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.linkedin.hoptimator.util;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
36
import java.util.LinkedHashMap;
47
import java.util.Locale;
58
import java.util.Map;
@@ -11,7 +14,7 @@
1114

1215
/** A convenient way to generate K8s YAML. */
1316
public interface Template {
14-
17+
Logger log = LoggerFactory.getLogger(Template.class);
1518
String render(Environment env);
1619

1720
/** Exposes environment variables to templates */
@@ -201,9 +204,16 @@ public String render(Environment env) {
201204
String key = m.group(2);
202205
String defaultValue = m.group(4);
203206
String transform = m.group(5);
204-
String value = env.getOrDefault(key, () -> defaultValue);
205-
if (value == null) {
206-
throw new IllegalArgumentException(template + " has no value for variable " + key + ".");
207+
String value;
208+
try {
209+
value = env.getOrDefault(key, () -> defaultValue);
210+
if (value == null) {
211+
log.warn("Template variable '{}' resolved to null. Skipping template.", key);
212+
return null;
213+
}
214+
} catch (IllegalArgumentException e) {
215+
log.warn("Missing template variable '{}' in environment: {}. Skipping template.", key, e.getMessage());
216+
return null;
207217
}
208218
String transformedValue = applyTransform(value, transform);
209219
String quotedPrefix = Matcher.quoteReplacement(prefix);

0 commit comments

Comments
 (0)