Skip to content

Commit a400db2

Browse files
authored
Add SqlJob and Flink Streaming impl (#69)
1 parent 701ce75 commit a400db2

31 files changed

+1300
-122
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ deploy-dev-environment:
3434
deploy-samples: deploy
3535
kubectl wait --for=condition=Established=True \
3636
crds/subscriptions.hoptimator.linkedin.com \
37-
crds/kafkatopics.hoptimator.linkedin.com
37+
crds/kafkatopics.hoptimator.linkedin.com \
38+
crds/sqljobs.hoptimator.linkedin.com
3839
kubectl apply -f ./deploy/samples
3940

4041
deploy-config:

deploy/rbac.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ metadata:
55
name: hoptimator-operator
66
rules:
77
- apiGroups: ["hoptimator.linkedin.com"]
8-
resources: ["acls", "kafkatopics", "subscriptions"]
8+
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"]
99
verbs: ["get", "watch", "list", "update", "create"]
1010
- apiGroups: ["hoptimator.linkedin.com"]
11-
resources: ["kafkatopics/status", "subscriptions/status", "acls/status"]
11+
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"]
1212
verbs: ["get", "patch"]
1313
- apiGroups: ["flink.apache.org"]
1414
resources: ["flinkdeployments"]

deploy/samples/sqljobs.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
apiVersion: hoptimator.linkedin.com/v1alpha1
2+
kind: SqlJob
3+
metadata:
4+
name: hello-world
5+
spec:
6+
dialect: Flink
7+
executionMode: Streaming
8+
sql:
9+
- create table bh (text varchar) with ('connector' = 'blackhole');
10+
- insert into bh values ('hello world');

deploy/sqljobs.crd.yaml

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
apiVersion: apiextensions.k8s.io/v1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: sqljobs.hoptimator.linkedin.com
5+
spec:
6+
group: hoptimator.linkedin.com
7+
names:
8+
kind: SqlJob
9+
listKind: SqlJobList
10+
plural: sqljobs
11+
singular: sqljob
12+
shortNames:
13+
- sql
14+
- sj
15+
preserveUnknownFields: false
16+
scope: Namespaced
17+
versions:
18+
- name: v1alpha1
19+
served: true
20+
storage: true
21+
schema:
22+
openAPIV3Schema:
23+
description: Hoptimator generic SQL job
24+
type: object
25+
properties:
26+
apiVersion:
27+
type: string
28+
kind:
29+
type: string
30+
metadata:
31+
type: object
32+
spec:
33+
description: SQL job spec
34+
type: object
35+
properties:
36+
sql:
37+
description: SQL script the job should run.
38+
type: array
39+
items:
40+
type: string
41+
dialect:
42+
description: Flink, etc.
43+
type: string
44+
enum:
45+
- Flink
46+
default: Flink
47+
executionMode:
48+
description: Streaming or Batch.
49+
type: string
50+
enum:
51+
- Streaming
52+
- Batch
53+
default: Streaming
54+
required:
55+
- sql
56+
status:
57+
description: Filled in by the operator.
58+
type: object
59+
properties:
60+
ready:
61+
description: Whether the SqlJob is running or completed.
62+
type: boolean
63+
failed:
64+
description: Whether the SqlJob has failed.
65+
type: boolean
66+
message:
67+
description: Error or success message, for information only.
68+
type: string
69+
sql:
70+
description: The SQL being implemented by this SqlJob.
71+
type: string
72+
subresources:
73+
status: {}
74+
additionalPrinterColumns:
75+
- name: DIALECT
76+
type: string
77+
description: SQL dialect.
78+
jsonPath: .spec.dialect
79+
- name: MODE
80+
type: string
81+
description: Execution mode.
82+
jsonPath: .spec.executionMode
83+
- name: STATUS
84+
type: string
85+
description: Job status.
86+
jsonPath: .status.message
87+

hoptimator-flink-adapter/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ plugins {
66

77
dependencies {
88
implementation project(':hoptimator-catalog')
9+
implementation project(':hoptimator-models')
910
implementation project(':hoptimator-operator')
1011
implementation libs.kubernetesClient
1112
implementation libs.kubernetesExtendedClient
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.linkedin.hoptimator.catalog.flink;
2+
3+
import com.linkedin.hoptimator.catalog.Resource;
4+
5+
public class FlinkStreamingSqlJob extends Resource {
6+
7+
public FlinkStreamingSqlJob(String namespace, String name, String sql) {
8+
super("FlinkStreamingSqlJob");
9+
export("namespace", namespace);
10+
export("name", name);
11+
export("sql", sql);
12+
}
13+
}

hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.linkedin.hoptimator.operator.flink;
22

3+
import com.linkedin.hoptimator.models.V1alpha1SqlJob;
4+
import com.linkedin.hoptimator.models.V1alpha1SqlJobList;
35
import com.linkedin.hoptimator.operator.ControllerProvider;
46
import com.linkedin.hoptimator.operator.Operator;
57

68
import io.kubernetes.client.extended.controller.Controller;
9+
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
10+
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
711

812
import java.util.Collection;
913
import java.util.Collections;
@@ -16,7 +20,17 @@ public Collection<Controller> controllers(Operator operator) {
1620
operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments",
1721
"flink.apache.org", "v1beta1");
1822

19-
// We don't need a controller
20-
return Collections.emptyList();
23+
operator.registerApi("SqlJob", "sqljob", "sqljobs",
24+
"hoptimator.linkedin.com", "v1alpha1", V1alpha1SqlJob.class, V1alpha1SqlJobList.class);
25+
26+
Reconciler reconciler = new FlinkStreamingSqlJobReconciler(operator);
27+
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
28+
.withReconciler(reconciler)
29+
.withName("flink-streaming-sql-job-controller")
30+
.withWorkerCount(1)
31+
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1SqlJob.class, x).build())
32+
.build();
33+
34+
return Collections.singleton(controller);
2135
}
2236
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.linkedin.hoptimator.operator.flink;
2+
3+
import com.linkedin.hoptimator.catalog.Resource;
4+
import com.linkedin.hoptimator.catalog.flink.FlinkStreamingSqlJob;
5+
import com.linkedin.hoptimator.operator.Operator;
6+
import com.linkedin.hoptimator.models.V1alpha1SqlJob;
7+
import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.DialectEnum;
8+
import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.ExecutionModeEnum;
9+
import com.linkedin.hoptimator.models.V1alpha1SqlJobStatus;
10+
11+
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
12+
import io.kubernetes.client.extended.controller.reconciler.Request;
13+
import io.kubernetes.client.extended.controller.reconciler.Result;
14+
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.util.List;
19+
import java.util.stream.Collectors;
20+
21+
/**
22+
* Manifests streaming SqlJobs as Flink jobs.
23+
*
24+
*/
25+
public class FlinkStreamingSqlJobReconciler implements Reconciler {
26+
private final static Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class);
27+
private final static String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob";
28+
29+
private final Operator operator;
30+
31+
public FlinkStreamingSqlJobReconciler(Operator operator) {
32+
this.operator = operator;
33+
}
34+
35+
@Override
36+
public Result reconcile(Request request) {
37+
log.info("Reconciling request {}", request);
38+
String name = request.getName();
39+
String namespace = request.getNamespace();
40+
Result result = new Result(true, operator.pendingRetryDuration());
41+
42+
try {
43+
V1alpha1SqlJob object = operator.<V1alpha1SqlJob>fetch(SQLJOB, namespace, name);
44+
45+
if (object == null) {
46+
log.info("Object {}/{} deleted. Skipping.");
47+
return new Result(false);
48+
}
49+
50+
V1alpha1SqlJobStatus status = object.getStatus();
51+
if (status == null) {
52+
status = new V1alpha1SqlJobStatus();
53+
object.setStatus(status);
54+
}
55+
56+
List<String> sql = object.getSpec().getSql();
57+
String script = sql.stream().collect(Collectors.joining(";\n"));
58+
59+
DialectEnum dialect = object.getSpec().getDialect();
60+
if (!DialectEnum.FLINK.equals(dialect)) {
61+
log.info("Not Flink SQL. Skipping.");
62+
return new Result(false);
63+
}
64+
65+
ExecutionModeEnum mode = object.getSpec().getExecutionMode();
66+
if (!ExecutionModeEnum.STREAMING.equals(mode)) {
67+
log.info("Not a streaming job. Skipping.");
68+
return new Result(false);
69+
}
70+
71+
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(Resource.Environment.EMPTY);
72+
Resource sqlJob = new FlinkStreamingSqlJob(namespace, name, script);
73+
boolean allReady = true;
74+
boolean anyFailed = false;
75+
for (String yaml : sqlJob.render(templateFactory)) {
76+
operator.apply(yaml, object);
77+
if (!operator.isReady(yaml)) {
78+
allReady = false;
79+
}
80+
if (operator.isFailed(yaml)) {
81+
anyFailed = true;
82+
allReady = false;
83+
}
84+
}
85+
86+
object.getStatus().setReady(allReady);
87+
object.getStatus().setFailed(anyFailed);
88+
89+
if (allReady) {
90+
object.getStatus().setMessage("Ready.");
91+
result = new Result(false); // done
92+
}
93+
if (anyFailed) {
94+
object.getStatus().setMessage("Failed.");
95+
result = new Result(false); // done
96+
}
97+
98+
operator.apiFor(SQLJOB).updateStatus(object, x -> object.getStatus())
99+
.onFailure((x, y) -> log.error("Failed to update status of SqlJob {}: {}.", name, y.getMessage()))
100+
.throwsApiException();
101+
102+
} catch (Exception e) {
103+
log.error("Encountered exception while reconciling Flink streaming SqlJob {}/{}", namespace, name, e);
104+
return new Result(true, operator.failureRetryDuration());
105+
}
106+
return result;
107+
}
108+
}
109+
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
apiVersion: flink.apache.org/v1beta1
2+
kind: FlinkDeployment
3+
metadata:
4+
namespace: {{namespace}}
5+
name: {{name}}-flink-job
6+
spec:
7+
image: docker.io/library/hoptimator-flink-runner
8+
imagePullPolicy: Never
9+
flinkVersion: v1_16
10+
flinkConfiguration:
11+
taskmanager.numberOfTaskSlots: "1"
12+
serviceAccount: flink
13+
jobManager:
14+
resource:
15+
memory: "2048m"
16+
cpu: 0.1
17+
taskManager:
18+
resource:
19+
memory: "2048m"
20+
cpu: 0.1
21+
job:
22+
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
23+
args:
24+
- {{sql}}
25+
jarURI: local:///opt/hoptimator-flink-runner.jar
26+
parallelism: 1
27+
upgradeMode: stateless
28+
state: running
29+

hoptimator-models/generate-models.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ docker run \
1313
-u "$(pwd)/deploy/acls.crd.yaml" \
1414
-u "$(pwd)/deploy/kafkatopics.crd.yaml" \
1515
-u "$(pwd)/deploy/subscriptions.crd.yaml" \
16+
-u "$(pwd)/deploy/sqljobs.crd.yaml" \
1617
&& echo "done."

0 commit comments

Comments
 (0)