Skip to content

Commit c19c235

Browse files
authored
Add TableTriggers (#138)
* Add TableTriggers and controller * Add TABLE_TRIGGERS metadata table * Add table trigger unit tests * Add table trigger metadata integration test * Relax pipeline table integration tests * Add sample cron trigger * Bug fixes h/t srnand
1 parent 6658b36 commit c19c235

File tree

54 files changed

+1541
-60
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1541
-60
lines changed

.github/workflows/integration-tests.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ jobs:
5555
kubectl get flinkdeployments
5656
kubectl get subscriptions
5757
kubectl get pipelines
58+
kubectl get tabletemplates
59+
kubectl get jobtemplates
60+
kubectl get tabletriggers
61+
kubectl get cronjobs
62+
kubectl get jobs
5863
- name: Capture Flink Job Logs
5964
if: always()
6065
run: |

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ quickstart: build deploy
4040

4141
deploy-demo: deploy
4242
kubectl apply -f ./deploy/samples/demodb.yaml
43+
kubectl apply -f ./deploy/samples/tabletriggers.yaml
44+
kubectl apply -f ./deploy/samples/crontrigger.yaml
4345

4446
undeploy-demo: undeploy
4547
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
48+
kubectl delete -f ./deploy/samples/tabletriggers.yaml || echo "skipping"
49+
kubectl delete -f ./deploy/samples/crontrigger.yaml || echo "skipping"
4650

4751
deploy-flink: deploy
4852
kubectl create namespace flink || echo "skipping"
@@ -65,9 +69,10 @@ undeploy-flink:
6569
deploy-kafka: deploy deploy-flink
6670
kubectl create namespace kafka || echo "skipping"
6771
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
72+
sleep 10 # avoid kubectl race condition
73+
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
6874
kubectl apply -f ./deploy/samples/kafkadb.yaml
6975
kubectl apply -f ./deploy/dev/kafka.yaml
70-
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
7176
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
7277
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
7378
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m

deploy/rbac.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ metadata:
55
name: hoptimator-operator
66
rules:
77
- apiGroups: ["hoptimator.linkedin.com"]
8-
resources: ["acls", "databases", "engines", "jobtemplates", "kafkatopics", "pipelines", "sqljobs", "subscriptions", "tabletemplates", "views"]
8+
resources: ["acls", "databases", "engines", "jobtemplates", "kafkatopics", "pipelines", "sqljobs", "subscriptions", "tabletemplates", "tabletriggers", "views"]
99
verbs: ["get", "watch", "list", "update", "create"]
1010
- apiGroups: ["hoptimator.linkedin.com"]
11-
resources: ["acls/status", "kafkatopics/status", "pipelines/status", "sqljobs/status", "subscriptions/status"]
12-
verbs: ["get", "patch"]
11+
resources: ["acls/status", "kafkatopics/status", "pipelines/status", "sqljobs/status", "subscriptions/status", "tabletriggers/status"]
12+
verbs: ["get", "patch", "update", "create"]
1313
- apiGroups: ["flink.apache.org"]
1414
resources: ["flinkdeployments", "flinksessionjobs"]
1515
verbs: ["get", "update", "create"]
16+
- apiGroups: ["batch"]
17+
resources: ["jobs"]
18+
verbs: ["list", "create", "delete"]
1619

deploy/samples/crontrigger.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
apiVersion: batch/v1
2+
kind: CronJob
3+
metadata:
4+
name: cron-trigger
5+
spec:
6+
schedule: "* * * * *"
7+
jobTemplate:
8+
spec:
9+
ttlSecondsAfterFinished: 10
10+
template:
11+
spec:
12+
serviceAccountName: hoptimator-operator
13+
containers:
14+
- name: trigger
15+
image: alpine/k8s:1.33.0
16+
imagePullPolicy: IfNotPresent
17+
command:
18+
- /bin/bash
19+
- -c
20+
- >-
21+
kubectl patch tabletrigger test-table-trigger -p
22+
"{\"status\":{\"timestamp\": \"`date +%FT%TZ`\"}}"
23+
--subresource=status --type=merge
24+
restartPolicy: OnFailure

deploy/samples/tabletriggers.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
apiVersion: hoptimator.linkedin.com/v1alpha1
2+
kind: TableTrigger
3+
metadata:
4+
name: test-table-trigger
5+
spec:
6+
schema: KAFKA
7+
table: existing-topic-1
8+
yaml: |
9+
apiVersion: batch/v1
10+
kind: Job
11+
metadata:
12+
name: test-table-trigger-job
13+
spec:
14+
template:
15+
spec:
16+
containers:
17+
- name: hello
18+
image: alpine/k8s:1.33.0
19+
command: ["bash", "-c", "echo test-table-trigger fired at `date`"]
20+
restartPolicy: Never
21+
backoffLimit: 4
22+
ttlSecondsAfterFinished: 90
23+
24+

generate-models.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@ docker run \
1717
-u "$(pwd)/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml" \
1818
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
1919
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
20+
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml" \
2021
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
2122
&& echo "done."

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public abstract class QuidemTestBase {
3636

3737
protected void run(String resourceName) throws IOException, URISyntaxException {
38-
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI(), "");
38+
run(resourceName, "");
3939
}
4040

4141
protected void run(String resourceName, String jdbcProperties) throws IOException, URISyntaxException {
@@ -62,7 +62,12 @@ protected void run(URI resource, String jdbcProperties) throws IOException {
6262
for (String line : output) {
6363
System.out.println(line);
6464
}
65-
Assertions.assertIterableEquals(input, output);
65+
for (int i = 0; i < input.size(); i++) {
66+
String line = output.get(i);
67+
String expected = input.get(i);
68+
Assertions.assertEquals(expected, line,
69+
"Context:\n" + String.join("\n", output.subList(i, Math.min(i + 10, output.size() - 1))));
70+
}
6671
}
6772

6873
private static final class CustomCommandHandler implements CommandHandler {

hoptimator-k8s/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
plugins {
22
id 'java'
3+
id 'java-test-fixtures'
34
id 'maven-publish'
45
}
56

@@ -10,6 +11,10 @@ dependencies {
1011
implementation libs.calcite.server
1112
implementation libs.kubernetes.client
1213

14+
testFixturesImplementation libs.kubernetes.client
15+
testFixturesImplementation project(':hoptimator-api')
16+
testFixturesImplementation project(':hoptimator-util')
17+
1318
// These are included in case the respective Databases are installed
1419
testRuntimeOnly project(':hoptimator-demodb')
1520
testRuntimeOnly project(':hoptimator-kafka')

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
import io.kubernetes.client.common.KubernetesListObject;
1515
import io.kubernetes.client.common.KubernetesObject;
1616
import io.kubernetes.client.openapi.models.V1OwnerReference;
17+
import io.kubernetes.client.util.Yaml;
1718
import io.kubernetes.client.util.generic.GenericKubernetesApi;
1819
import io.kubernetes.client.util.generic.KubernetesApiResponse;
20+
import io.kubernetes.client.util.generic.options.DeleteOptions;
1921
import io.kubernetes.client.util.generic.options.ListOptions;
2022

2123
import com.linkedin.hoptimator.util.Api;
@@ -115,10 +117,20 @@ public void delete(T obj) throws SQLException {
115117
if (obj.getMetadata().getNamespace() == null && !endpoint.clusterScoped()) {
116118
obj.getMetadata().namespace(context.namespace());
117119
}
120+
delete(obj.getMetadata().getNamespace(), obj.getMetadata().getName());
121+
}
122+
123+
public void delete(String namespace, String name) throws SQLException {
124+
DeleteOptions options = new DeleteOptions();
125+
options.setPropagationPolicy("Background"); // ensure deletes cascade
118126
KubernetesApiResponse<T> resp =
119-
context.generic(endpoint).delete(obj.getMetadata().getNamespace(), obj.getMetadata().getName());
120-
K8sUtils.checkResponse("Error deleting " + obj.getMetadata().getName(), resp);
121-
log.info("Deleted K8s obj: {}:{}", obj.getKind(), obj.getMetadata().getName());
127+
context.generic(endpoint).delete(namespace, name, options);
128+
K8sUtils.checkResponse("Error deleting " + name, resp);
129+
log.info("Deleted K8s obj: {}:{}", endpoint.kind(), name);
130+
}
131+
132+
public void delete(String name) throws SQLException {
133+
delete(context.namespace(), name);
122134
}
123135

124136
@Override
@@ -170,7 +182,7 @@ public void updateStatus(T obj, Object status) throws SQLException {
170182
obj.getMetadata().namespace(context.namespace());
171183
}
172184
KubernetesApiResponse<T> resp = context.generic(endpoint).updateStatus(obj, x -> status);
173-
K8sUtils.checkResponse("Error updating status of " + obj.getMetadata().getName(), resp);
185+
K8sUtils.checkResponse(() -> "Error updating status of " + Yaml.dump(obj), resp);
174186
log.info("Updated K8s obj status: {}:{}", obj.getKind(), obj.getMetadata().getName());
175187
}
176188
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import io.kubernetes.client.openapi.models.V1ConfigMap;
44
import io.kubernetes.client.openapi.models.V1ConfigMapList;
5+
import io.kubernetes.client.openapi.models.V1Job;
6+
import io.kubernetes.client.openapi.models.V1JobList;
57
import io.kubernetes.client.openapi.models.V1Namespace;
68
import io.kubernetes.client.openapi.models.V1NamespaceList;
79
import io.kubernetes.client.openapi.models.V1Secret;
@@ -17,6 +19,8 @@
1719
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
1820
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
1921
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
22+
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
23+
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList;
2024
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
2125
import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList;
2226

@@ -31,6 +35,9 @@ public final class K8sApiEndpoints {
3135
public static final K8sApiEndpoint<V1ConfigMap, V1ConfigMapList> CONFIG_MAPS =
3236
new K8sApiEndpoint<>("ConfigMap", "", "v1", "configmaps", false,
3337
V1ConfigMap.class, V1ConfigMapList.class);
38+
public static final K8sApiEndpoint<V1Job, V1JobList> JOBS =
39+
new K8sApiEndpoint<>("Job", "batch", "v1", "jobs", false,
40+
V1Job.class, V1JobList.class);
3441

3542
// Hoptimator custom resources
3643
public static final K8sApiEndpoint<V1alpha1Database, V1alpha1DatabaseList> DATABASES =
@@ -51,6 +58,9 @@ public final class K8sApiEndpoints {
5158
public static final K8sApiEndpoint<V1alpha1JobTemplate, V1alpha1JobTemplateList> JOB_TEMPLATES =
5259
new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false,
5360
V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class);
61+
public static final K8sApiEndpoint<V1alpha1TableTrigger, V1alpha1TableTriggerList> TABLE_TRIGGERS =
62+
new K8sApiEndpoint<>("TableTrigger", "hoptimator.linkedin.com", "v1alpha1", "tabletriggers", false,
63+
V1alpha1TableTrigger.class, V1alpha1TableTriggerList.class);
5464

5565
private K8sApiEndpoints() {
5666
}

0 commit comments

Comments
 (0)