Skip to content

Commit f322dac

Browse files
authored
Fix NPE again (#44)
1 parent b7c6a9d commit f322dac

File tree

4 files changed

+25
-3
lines changed

4 files changed

+25
-3
lines changed

deploy/samples/subscriptions.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,19 @@ apiVersion: hoptimator.linkedin.com/v1alpha1
33
kind: Subscription
44
metadata:
55
name: products
6+
spec:
7+
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
8+
database: RAWKAFKA
9+
10+
---
11+
12+
apiVersion: hoptimator.linkedin.com/v1alpha1
13+
kind: Subscription
14+
metadata:
15+
name: products-with-hints
616
spec:
717
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
818
database: RAWKAFKA
919
hints:
10-
numPartitions: "2"
20+
kafka.numPartitions: "7"
21+

etc/integration-tests.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
1616
-- MySQL CDC -> Kafka
1717
SELECT * FROM RAWKAFKA."products" LIMIT 1;
1818

19+
-- Same, but with hints:
20+
SELECT * FROM RAWKAFKA."products-with-hints" LIMIT 1;
21+
1922
-- test insert into command
2023
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
2124
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ metadata:
55
namespace: {{pipeline.namespace}}
66
spec:
77
topicName: {{topicName}}
8-
numPartitions: {{numPartitions:null}}
8+
numPartitions: {{kafka.numPartitions:null}}
99
clientOverrides:
1010
{{clientOverrides}}

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public Result reconcile(Request request) {
9393

9494
// For sink resources, also expose hints.
9595
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
96-
.orElse(new Resource.SimpleEnvironment(object.getSpec().getHints())));
96+
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));
9797

9898
// Render resources related to all source tables.
9999
List<String> upstreamResources = pipeline.upstreamResources().stream()
@@ -284,5 +284,13 @@ public static Controller controller(Operator operator, HoptimatorPlanner.Factory
284284
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build())
285285
.build();
286286
}
287+
288+
private static Map<String, String> map(Map<String, String> m) {
289+
if (m == null) {
290+
return Collections.emptyMap();
291+
} else {
292+
return m;
293+
}
294+
}
287295
}
288296

0 commit comments

Comments
 (0)