Skip to content

Commit fe9978c

Browse files
authored
Prevent NPE that may occur if the KafkaServer CR did not have trust anchors or client certificates. (kroxylicious#2093)
Signed-off-by: Keith Wall <[email protected]>
1 parent 887281b commit fe9978c

File tree

6 files changed

+266
-21
lines changed

6 files changed

+266
-21
lines changed

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyIngressReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.time.Clock;
1010
import java.util.List;
1111
import java.util.Objects;
12+
import java.util.Optional;
1213

1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public List<EventSource<?, KafkaProxyIngress>> prepareEventSources(EventSourceCo
5556
.withSecondaryToPrimaryMapper(proxy -> ResourcesUtil.findReferrers(context,
5657
proxy,
5758
KafkaProxyIngress.class,
58-
ingress -> ingress.getSpec().getProxyRef()))
59+
ingress -> Optional.of(ingress.getSpec().getProxyRef())))
5960
.build();
6061
return List.of(new InformerEventSource<>(configuration, context));
6162
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaServiceReconciler.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.time.Clock;
1010
import java.util.List;
1111
import java.util.Optional;
12+
import java.util.Set;
1213

1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
@@ -21,6 +22,8 @@
2122
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
2223
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2324
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
25+
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
26+
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
2427
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
2528

2629
import io.kroxylicious.kubernetes.api.common.AnyLocalRef;
@@ -29,6 +32,7 @@
2932
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceSpec;
3033
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicespec.Tls;
3134
import io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicespec.tls.TrustAnchorRef;
35+
import io.kroxylicious.proxy.tag.VisibleForTesting;
3236

3337
import edu.umd.cs.findbugs.annotations.Nullable;
3438

@@ -71,28 +75,59 @@ public List<EventSource<?, KafkaService>> prepareEventSources(EventSourceContext
7175
Secret.class,
7276
KafkaService.class)
7377
.withName(SECRETS_EVENT_SOURCE_NAME)
74-
.withPrimaryToSecondaryMapper((KafkaService cluster) -> ResourcesUtil.localRefAsResourceId(cluster, cluster.getSpec().getTls().getCertificateRef()))
75-
.withSecondaryToPrimaryMapper(secret -> ResourcesUtil.findReferrers(context,
76-
secret,
77-
KafkaService.class,
78-
service -> service.getSpec().getTls().getCertificateRef()))
78+
.withPrimaryToSecondaryMapper(kafkaServiceToSecret())
79+
.withSecondaryToPrimaryMapper(secretToKafkaService(context))
7980
.build();
8081
InformerEventSourceConfiguration<ConfigMap> serviceToConfigMap = InformerEventSourceConfiguration.from(
8182
ConfigMap.class,
8283
KafkaService.class)
8384
.withName(CONFIG_MAPS_EVENT_SOURCE_NAME)
84-
.withPrimaryToSecondaryMapper(
85-
(KafkaService cluster) -> ResourcesUtil.localRefAsResourceId(cluster, asRef(cluster.getSpec().getTls().getTrustAnchorRef())))
86-
.withSecondaryToPrimaryMapper(configMap -> ResourcesUtil.findReferrers(context,
87-
configMap,
88-
KafkaService.class,
89-
service -> asRef(service.getSpec().getTls().getTrustAnchorRef())))
85+
.withPrimaryToSecondaryMapper(kafkaServiceToConfigMap())
86+
.withSecondaryToPrimaryMapper(configMapToKafkaService(context))
9087
.build();
9188
return List.of(
9289
new InformerEventSource<>(serviceToSecret, context),
9390
new InformerEventSource<>(serviceToConfigMap, context));
9491
}
9592

93+
@VisibleForTesting
94+
static PrimaryToSecondaryMapper<KafkaService> kafkaServiceToSecret() {
95+
return (KafkaService cluster) -> Optional.ofNullable(cluster.getSpec())
96+
.map(KafkaServiceSpec::getTls)
97+
.map(Tls::getCertificateRef)
98+
.map(cr -> ResourcesUtil.localRefAsResourceId(cluster, cr)).orElse(Set.of());
99+
}
100+
101+
@VisibleForTesting
102+
static SecondaryToPrimaryMapper<Secret> secretToKafkaService(EventSourceContext<KafkaService> context) {
103+
return secret -> ResourcesUtil.findReferrers(context,
104+
secret,
105+
KafkaService.class,
106+
service -> Optional.ofNullable(service.getSpec())
107+
.map(KafkaServiceSpec::getTls)
108+
.map(Tls::getCertificateRef));
109+
}
110+
111+
@VisibleForTesting
112+
static SecondaryToPrimaryMapper<ConfigMap> configMapToKafkaService(EventSourceContext<KafkaService> context) {
113+
return configMap -> ResourcesUtil.findReferrers(context,
114+
configMap,
115+
KafkaService.class,
116+
service -> Optional.ofNullable(service.getSpec())
117+
.map(KafkaServiceSpec::getTls)
118+
.map(Tls::getTrustAnchorRef)
119+
.map(KafkaServiceReconciler::asRef));
120+
}
121+
122+
@VisibleForTesting
123+
static PrimaryToSecondaryMapper<KafkaService> kafkaServiceToConfigMap() {
124+
return (KafkaService cluster) -> Optional.ofNullable(cluster.getSpec())
125+
.map(KafkaServiceSpec::getTls)
126+
.map(Tls::getTrustAnchorRef)
127+
.map(tar -> ResourcesUtil.localRefAsResourceId(cluster, asRef(tar)))
128+
.orElse(Set.of());
129+
}
130+
96131
@Override
97132
public UpdateControl<KafkaService> reconcile(KafkaService service, Context<KafkaService> context) {
98133

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ResourcesUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,11 @@ static <O extends HasMetadata, R extends HasMetadata> Set<ResourceID> localRefsA
257257
static <O extends HasMetadata, R extends HasMetadata> Set<ResourceID> findReferrers(EventSourceContext<?> context,
258258
R referent,
259259
Class<O> owner,
260-
Function<O, LocalRef<R>> refAccessor) {
260+
Function<O, Optional<LocalRef<R>>> refAccessor) {
261261
return ResourcesUtil.filteredResourceIdsInSameNamespace(context,
262262
referent,
263263
owner,
264-
primary -> ResourcesUtil.isReferent(refAccessor.apply(primary), referent));
264+
primary -> refAccessor.apply(primary).map(lr -> ResourcesUtil.isReferent(lr, referent)).orElse(false));
265265
}
266266

267267
/**

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/VirtualKafkaClusterReconciler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
219219
.withSecondaryToPrimaryMapper(proxy -> ResourcesUtil.findReferrers(context,
220220
proxy,
221221
VirtualKafkaCluster.class,
222-
cluster -> cluster.getSpec().getProxyRef()))
222+
cluster -> Optional.of(cluster.getSpec().getProxyRef())))
223223
.build();
224224

225225
InformerEventSourceConfiguration<ConfigMap> clusterToProxyConfigState = InformerEventSourceConfiguration.from(
@@ -230,9 +230,9 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
230230
.withSecondaryToPrimaryMapper(configMap -> ResourcesUtil.findReferrers(context,
231231
configMap,
232232
VirtualKafkaCluster.class,
233-
cluster -> new AnyLocalRefBuilder().withGroup("").withKind("ConfigMap")
233+
cluster -> Optional.of(new AnyLocalRefBuilder().withGroup("").withKind("ConfigMap")
234234
.withName(cluster.getSpec().getProxyRef().getName() + CONFIG_STATE_CONFIG_MAP_SUFFIX)
235-
.build()))
235+
.build())))
236236
.build();
237237

238238
InformerEventSourceConfiguration<KafkaService> clusterToService = InformerEventSourceConfiguration.from(
@@ -244,7 +244,7 @@ public List<EventSource<?, VirtualKafkaCluster>> prepareEventSources(EventSource
244244
.withSecondaryToPrimaryMapper(service -> ResourcesUtil.findReferrers(context,
245245
service,
246246
VirtualKafkaCluster.class,
247-
cluster -> cluster.getSpec().getTargetKafkaServiceRef()))
247+
cluster -> Optional.of(cluster.getSpec().getTargetKafkaServiceRef())))
248248
.build();
249249

250250
InformerEventSourceConfiguration<KafkaProxyIngress> clusterToIngresses = InformerEventSourceConfiguration.from(

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/KafkaServiceReconcilerTest.java

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.List;
1414
import java.util.Optional;
1515
import java.util.function.Consumer;
16+
import java.util.stream.Stream;
1617

1718
import org.junit.jupiter.api.BeforeEach;
1819
import org.junit.jupiter.api.Test;
@@ -24,13 +25,18 @@
2425

2526
import io.fabric8.kubernetes.api.model.ConfigMap;
2627
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
28+
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
2729
import io.fabric8.kubernetes.api.model.Secret;
2830
import io.fabric8.kubernetes.api.model.SecretBuilder;
2931
import io.fabric8.kubernetes.client.KubernetesClient;
32+
import io.fabric8.kubernetes.client.dsl.MixedOperation;
33+
import io.fabric8.kubernetes.client.dsl.Resource;
3034
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
3135
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
3236
import io.javaoperatorsdk.operator.api.reconciler.Context;
37+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
3338
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
39+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3440

3541
import io.kroxylicious.kubernetes.api.common.Condition;
3642
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService;
@@ -40,6 +46,7 @@
4046
import io.kroxylicious.kubernetes.operator.assertj.KafkaServiceStatusAssert;
4147

4248
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.mockito.ArgumentMatchers.any;
4350
import static org.mockito.Mockito.mock;
4451
import static org.mockito.Mockito.when;
4552

@@ -123,11 +130,11 @@ class KafkaServiceReconcilerTest {
123130

124131
// @formatter:on
125132

126-
private KafkaServiceReconciler kafkaProtocolFilterReconciler;
133+
private KafkaServiceReconciler kafkaServiceReconciler;
127134

128135
@BeforeEach
129136
void setUp() {
130-
kafkaProtocolFilterReconciler = new KafkaServiceReconciler(Clock.systemUTC());
137+
kafkaServiceReconciler = new KafkaServiceReconciler(Clock.systemUTC());
131138
}
132139

133140
@Test
@@ -372,7 +379,7 @@ private static void mockGetSecret(
372379
void shouldSetResolvedRefs(KafkaService kafkaService, Context<KafkaService> context, Consumer<ConditionListAssert> asserter) {
373380

374381
// When
375-
final UpdateControl<KafkaService> updateControl = kafkaProtocolFilterReconciler.reconcile(kafkaService, context);
382+
final UpdateControl<KafkaService> updateControl = kafkaServiceReconciler.reconcile(kafkaService, context);
376383

377384
// Then
378385
assertThat(updateControl).isNotNull();
@@ -382,4 +389,135 @@ void shouldSetResolvedRefs(KafkaService kafkaService, Context<KafkaService> cont
382389
.conditionList();
383390
asserter.accept(c);
384391
}
392+
393+
@Test
394+
void canMapFromKafkaServiceWithTrustAnchorToConfigMap() {
395+
// Given
396+
var mapper = KafkaServiceReconciler.kafkaServiceToConfigMap();
397+
398+
// When
399+
var secondaryResourceIDs = mapper.toSecondaryResourceIDs(SERVICE);
400+
401+
// Then
402+
assertThat(secondaryResourceIDs).containsExactly(ResourceID.fromResource(PEM_CONFIG_MAP));
403+
}
404+
405+
@Test
406+
void canMapFromKafkaServiceWithoutTrustAnchorToConfigMap() {
407+
// Given
408+
var mapper = KafkaServiceReconciler.kafkaServiceToConfigMap();
409+
var serviceNoTrustAnchor = new KafkaServiceBuilder(SERVICE).editSpec().editTls().withTrustAnchorRef(null).endTls().endSpec().build();
410+
411+
// When
412+
var secondaryResourceIDs = mapper.toSecondaryResourceIDs(serviceNoTrustAnchor);
413+
414+
// Then
415+
assertThat(secondaryResourceIDs).isEmpty();
416+
}
417+
418+
@Test
419+
void canMapFromConfigMapToKafkaService() {
420+
// Given
421+
EventSourceContext<KafkaService> eventSourceContext = mock();
422+
KubernetesClient client = mock();
423+
when(eventSourceContext.getClient()).thenReturn(client);
424+
425+
KubernetesResourceList<KafkaService> mockList = mockKafkaServiceListOperation(client);
426+
when(mockList.getItems()).thenReturn(List.of(SERVICE));
427+
428+
var mapper = KafkaServiceReconciler.configMapToKafkaService(eventSourceContext);
429+
430+
// When
431+
var primaryResourceIDs = mapper.toPrimaryResourceIDs(PEM_CONFIG_MAP);
432+
433+
// Then
434+
assertThat(primaryResourceIDs).containsExactly(ResourceID.fromResource(SERVICE));
435+
}
436+
437+
static Stream<Arguments> mappingToConfigMapToleratesKafkaServicesWithoutTls() {
438+
return Stream.of(
439+
Arguments.argumentSet("without tls", new KafkaServiceBuilder(SERVICE).editSpec().withTls(null).endSpec().build()),
440+
Arguments.argumentSet("with tls but without trust anchor",
441+
new KafkaServiceBuilder(SERVICE).editSpec().editTls().withTrustAnchorRef(null).endTls().endSpec().build()));
442+
}
443+
444+
@ParameterizedTest
445+
@MethodSource
446+
void mappingToConfigMapToleratesKafkaServicesWithoutTls(KafkaService service) {
447+
// Given
448+
EventSourceContext<KafkaService> eventSourceContext = mock();
449+
KubernetesClient client = mock();
450+
when(eventSourceContext.getClient()).thenReturn(client);
451+
452+
KubernetesResourceList<KafkaService> mockList = mockKafkaServiceListOperation(client);
453+
when(mockList.getItems()).thenReturn(List.of(service));
454+
455+
// When
456+
var mapper = KafkaServiceReconciler.configMapToKafkaService(eventSourceContext);
457+
458+
// Then
459+
var primaryResourceIDs = mapper.toPrimaryResourceIDs(new ConfigMapBuilder().withNewMetadata().withName("cm").endMetadata().build());
460+
assertThat(primaryResourceIDs).isEmpty();
461+
}
462+
463+
@Test
464+
void canMapFromKafkaServiceWithClientCertToSecret() {
465+
// Given
466+
var mapper = KafkaServiceReconciler.kafkaServiceToSecret();
467+
468+
// When
469+
var secondaryResourceIDs = mapper.toSecondaryResourceIDs(SERVICE);
470+
471+
// Then
472+
assertThat(secondaryResourceIDs).containsExactly(ResourceID.fromResource(TLS_SECRET));
473+
}
474+
475+
@Test
476+
void canMapFromKafkaServiceWithoutClientCertToSecret() {
477+
// Given
478+
var mapper = KafkaServiceReconciler.kafkaServiceToSecret();
479+
var serviceNoCert = new KafkaServiceBuilder(SERVICE).editSpec().editTls().withCertificateRef(null).endTls().endSpec().build();
480+
481+
// When
482+
var secondaryResourceIDs = mapper.toSecondaryResourceIDs(serviceNoCert);
483+
484+
// Then
485+
assertThat(secondaryResourceIDs).isEmpty();
486+
}
487+
488+
static Stream<Arguments> mappingToSecretToleratesKafkaServicesWithoutTls() {
489+
return Stream.of(
490+
Arguments.argumentSet("without tls", new KafkaServiceBuilder(SERVICE).editSpec().withTls(null).endSpec().build()),
491+
Arguments.argumentSet("with tls but without client cert",
492+
new KafkaServiceBuilder(SERVICE).editSpec().editTls().withCertificateRef(null).endTls().endSpec().build()));
493+
}
494+
495+
@ParameterizedTest
496+
@MethodSource
497+
void mappingToSecretToleratesKafkaServicesWithoutTls(KafkaService service) {
498+
// Given
499+
EventSourceContext<KafkaService> eventSourceContext = mock();
500+
KubernetesClient client = mock();
501+
when(eventSourceContext.getClient()).thenReturn(client);
502+
503+
KubernetesResourceList<KafkaService> mockList = mockKafkaServiceListOperation(client);
504+
when(mockList.getItems()).thenReturn(List.of(service));
505+
506+
// When
507+
var mapper = KafkaServiceReconciler.secretToKafkaService(eventSourceContext);
508+
509+
// Then
510+
var primaryResourceIDs = mapper.toPrimaryResourceIDs(new SecretBuilder().withNewMetadata().withName("secret").endMetadata().build());
511+
assertThat(primaryResourceIDs).isEmpty();
512+
}
513+
514+
private KubernetesResourceList<KafkaService> mockKafkaServiceListOperation(KubernetesClient client) {
515+
MixedOperation<KafkaService, KubernetesResourceList<KafkaService>, Resource<KafkaService>> mockOperation = mock();
516+
when(client.resources(KafkaService.class)).thenReturn(mockOperation);
517+
KubernetesResourceList<KafkaService> mockList = mock();
518+
when(mockOperation.list()).thenReturn(mockList);
519+
when(mockOperation.inNamespace(any())).thenReturn(mockOperation);
520+
return mockList;
521+
}
522+
385523
}

0 commit comments

Comments
 (0)