Skip to content

Commit 0dcd5ed

Browse files
authored
Make lots of checkstyle changes - plan to enable blocking PRs soon, still some more involved issues (#133)
* Make lots of checkstyle changes - plan to enable blocking PRs soon, still some more involved issues * Support changes in latest strimzi version * Fix kafka order
1 parent c2829e0 commit 0dcd5ed

File tree

57 files changed

+270
-274
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+270
-274
lines changed

Makefile

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ build:
1111
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner
1212
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator
1313

14-
bounce: build undeploy deploy deploy-samples deploy-config
14+
bounce: build undeploy deploy
1515

1616
clean:
1717
./gradlew clean
@@ -26,6 +26,10 @@ deploy: deploy-config
2626
kubectl apply -f ./hoptimator-k8s/src/main/resources/
2727
kubectl apply -f ./deploy
2828
kubectl apply -f ./deploy/dev/rbac.yaml
29+
kubectl wait --for=condition=Established=True \
30+
crds/subscriptions.hoptimator.linkedin.com \
31+
crds/kafkatopics.hoptimator.linkedin.com \
32+
crds/sqljobs.hoptimator.linkedin.com
2933

3034
undeploy: undeploy-config
3135
kubectl delete -f ./deploy/dev/rbac.yaml || echo "skipping"
@@ -45,9 +49,9 @@ deploy-flink: deploy
4549
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
4650
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.11.0/
4751
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
48-
kubectl apply -f deploy/dev/flink-session-cluster.yaml
49-
kubectl apply -f deploy/dev/flink-sql-gateway.yaml
50-
kubectl apply -f deploy/samples/flink-template.yaml
52+
kubectl apply -f ./deploy/dev/flink-session-cluster.yaml
53+
kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml
54+
kubectl apply -f ./deploy/samples/flink-template.yaml
5155

5256
undeploy-flink:
5357
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
@@ -61,8 +65,12 @@ undeploy-flink:
6165
deploy-kafka: deploy deploy-flink
6266
kubectl create namespace kafka || echo "skipping"
6367
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
64-
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
6568
kubectl apply -f ./deploy/samples/kafkadb.yaml
69+
kubectl apply -f ./deploy/dev/kafka.yaml
70+
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
71+
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
72+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
73+
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
6674

6775
undeploy-kafka:
6876
kubectl delete kafkatopic.kafka.strimzi.io --all || echo "skipping"
@@ -84,20 +92,12 @@ undeploy-venice:
8492
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down
8593

8694
deploy-dev-environment: deploy deploy-demo deploy-flink deploy-kafka deploy-venice
87-
kubectl wait --for=condition=Established=True \
88-
crds/subscriptions.hoptimator.linkedin.com \
89-
crds/kafkatopics.hoptimator.linkedin.com \
90-
crds/sqljobs.hoptimator.linkedin.com
91-
kubectl apply -f ./deploy/dev/
9295

9396
undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy-demo undeploy
9497
kubectl delete -f ./deploy/dev || echo "skipping"
9598

9699
# Integration test setup intended to be run locally
97100
integration-tests: deploy-dev-environment
98-
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
99-
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
100-
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
101101
kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 & echo $$! > port-forward.pid
102102
kubectl port-forward -n flink svc/flink-sql-gateway 8083 & echo $$! > port-forward-2.pid
103103
kubectl port-forward -n flink svc/basic-session-deployment-rest 8081 & echo $$! > port-forward-3.pid
@@ -108,9 +108,6 @@ integration-tests: deploy-dev-environment
108108

109109
# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
110110
integration-tests-kind: deploy-dev-environment
111-
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
112-
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
113-
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
114111
./gradlew intTest -i --no-parallel
115112

116113
generate-models:

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolatio
7676
To produce/consume Kafka data, use the following commands:
7777

7878
```
79-
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
80-
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
79+
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
80+
$ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.46.0-kafka-4.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning
8181
```
8282

8383
### Flink

deploy/dev/kafka.yaml

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,56 @@
1515
# Based on examples at:
1616
# https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka
1717

18+
apiVersion: kafka.strimzi.io/v1beta2
19+
kind: KafkaNodePool
20+
metadata:
21+
name: controller
22+
namespace: kafka
23+
labels:
24+
strimzi.io/cluster: one
25+
spec:
26+
replicas: 3
27+
roles:
28+
- controller
29+
storage:
30+
type: jbod
31+
volumes:
32+
- id: 0
33+
type: ephemeral
34+
kraftMetadata: shared
35+
---
36+
37+
apiVersion: kafka.strimzi.io/v1beta2
38+
kind: KafkaNodePool
39+
metadata:
40+
name: broker
41+
namespace: kafka
42+
labels:
43+
strimzi.io/cluster: one
44+
spec:
45+
replicas: 3
46+
roles:
47+
- broker
48+
storage:
49+
type: jbod
50+
volumes:
51+
- id: 0
52+
type: ephemeral
53+
kraftMetadata: shared
54+
---
1855

1956
apiVersion: kafka.strimzi.io/v1beta2
2057
kind: Kafka
2158
metadata:
2259
name: one
2360
namespace: kafka
61+
annotations:
62+
strimzi.io/node-pools: enabled
63+
strimzi.io/kraft: enabled
2464
spec:
2565
kafka:
26-
version: 3.8.0
66+
version: 4.0.0
67+
metadataVersion: "4.0"
2768
replicas: 1
2869
listeners:
2970
- name: plain
@@ -54,16 +95,7 @@ spec:
5495
transaction.state.log.min.isr: 1
5596
default.replication.factor: 1
5697
min.insync.replicas: 1
57-
inter.broker.protocol.version: "3.8"
5898
allow.everyone.if.no.acl.found: true
59-
storage:
60-
type: ephemeral
61-
authorization:
62-
type: simple
63-
zookeeper:
64-
replicas: 3
65-
storage:
66-
type: ephemeral
6799
entityOperator:
68100
topicOperator:
69101
watchedNamespace: default

hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public List<String> path() {
3939
}
4040

4141
protected String pathString() {
42-
return path.stream().collect(Collectors.joining("."));
42+
return String.join(".", path);
4343
}
4444

4545
@Override

hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public boolean valid() {
105105
/** For convenience only, enabling try-with-resources */
106106
public void close() {
107107
closed = true;
108-
children.values().forEach(x -> x.checkClosed());
108+
children.values().forEach(Issues::checkClosed);
109109
}
110110

111111
private void emit(String message) {
@@ -130,7 +130,7 @@ private void checkClosed() {
130130
}
131131

132132
private boolean empty() {
133-
return issues.isEmpty() && children.values().stream().allMatch(x -> x.empty());
133+
return issues.isEmpty() && children.values().stream().allMatch(Issues::empty);
134134
}
135135

136136
private String fullPath() {

hoptimator-api/src/main/java/com/linkedin/hoptimator/View.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public List<String> path() {
3232
}
3333

3434
protected String pathString() {
35-
return path.stream().collect(Collectors.joining("."));
35+
return String.join(".", path);
3636
}
3737

3838
@Override

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,17 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
3030
} else {
3131
switch (dataType.getSqlTypeName()) {
3232
case INTEGER:
33-
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
3433
case SMALLINT:
3534
return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
3635
case BIGINT:
3736
return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
3837
case VARCHAR:
38+
case CHAR:
3939
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
4040
case FLOAT:
4141
return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable());
4242
case DOUBLE:
4343
return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
44-
case CHAR:
45-
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
4644
case BOOLEAN:
4745
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
4846
case ARRAY:

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class AvroTableValidator implements Validator {
2424

2525
private final SchemaPlus schema;
2626

27-
public AvroTableValidator(SchemaPlus schema) {
27+
AvroTableValidator(SchemaPlus schema) {
2828
this.schema = schema;
2929
}
3030

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroValidatorProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
/** Provides AvroValidator. */
1313
public class AvroValidatorProvider implements ValidatorProvider {
1414

15-
@SuppressWarnings("unchecked")
1615
@Override
1716
public <T> Collection<Validator> validators(T obj) {
1817
if (obj instanceof SchemaPlus) {

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ConfigProvider.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ static ConfigProvider from(Map<String, ?> configs) {
2222
if (configs == null) {
2323
return empty();
2424
} else {
25-
return x -> configs.entrySet().stream().collect(Collectors.toMap(y -> y.getKey(), y -> y.getValue().toString()));
25+
return x -> configs.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().toString()));
2626
}
2727
}
2828

@@ -32,8 +32,7 @@ default ConfigProvider with(String key, Function<String, String> valueFunction)
3232
if (base.containsKey(key)) {
3333
throw new IllegalStateException("Key '" + key + "' previously defined.");
3434
}
35-
Map<String, String> combined = new HashMap<>();
36-
combined.putAll(base);
35+
Map<String, String> combined = new HashMap<>(base);
3736
combined.put(key, valueFunction.apply(x));
3837
return combined;
3938
};
@@ -45,8 +44,7 @@ default ConfigProvider with(Map<String, ?> configs) {
4544
}
4645
return x -> {
4746
Map<String, String> base = config(x);
48-
Map<String, String> combined = new HashMap<>();
49-
combined.putAll(base);
47+
Map<String, String> combined = new HashMap<>(base);
5048
configs.forEach((k, v) -> {
5149
if (base.containsKey(k)) {
5250
throw new IllegalStateException("Key '" + k + "' previously defined.");
@@ -66,6 +64,6 @@ default ConfigProvider with(String key, Integer value) {
6664
}
6765

6866
default ConfigProvider withPrefix(String prefix) {
69-
return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), y -> y.getValue()));
67+
return x -> config(x).entrySet().stream().collect(Collectors.toMap(y -> prefix + y.getKey(), Map.Entry::getValue));
7068
}
7169
}

0 commit comments

Comments
 (0)