Skip to content

Commit ac2add6

Browse files
Add KafkaProtocolFilterReconciler (kroxylicious#1971)
* Add KafkaProtocolFilterReconciler * RBAC * Serve /livez before operator start; operator can be healthy while starting * Log if we're returning 400 from /livez Signed-off-by: Tom Bentley <[email protected]> Co-authored-by: Robert Young <[email protected]>
1 parent bb47a43 commit ac2add6

14 files changed

+730
-28
lines changed

kroxylicious-operator/install/01.ClusterRole.kroxylicious-operator-watched.yaml

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ metadata:
1414
app.kubernetes.io/name: kroxylicious
1515
app.kubernetes.io/component: operator
1616
rules:
17-
- apiGroups:
17+
- # The operator needs to know about its own CRs
18+
apiGroups:
1819
- "kroxylicious.io"
1920
resources:
2021
- kafkaproxies
@@ -25,10 +26,8 @@ rules:
2526
- get
2627
- list
2728
- watch
28-
# - create
29-
# - patch
30-
# - update
31-
- apiGroups:
29+
- # The operator needs to update the status on its own CRs
30+
apiGroups:
3231
- "kroxylicious.io"
3332
resources:
3433
- kafkaproxies/status
@@ -37,4 +36,33 @@ rules:
3736
verbs:
3837
- get
3938
- patch
40-
- update
39+
- update
40+
- # The operator needs to know about its own CRs
41+
apiGroups:
42+
- "filter.kroxylicious.io"
43+
resources:
44+
- kafkaprotocolfilters
45+
verbs:
46+
- get
47+
- list
48+
- watch
49+
- # The operator needs to update the status on its own CRs
50+
apiGroups:
51+
- "filter.kroxylicious.io"
52+
resources:
53+
- kafkaprotocolfilters/status
54+
verbs:
55+
- get
56+
- patch
57+
- update
58+
- # the operator needs get/list/watch on these because they can be referenced
59+
# from our CRs, so the operator needs to be able to resolve them.
60+
apiGroups:
61+
- ""
62+
resources:
63+
- secrets
64+
- configmaps
65+
verbs:
66+
- get
67+
- list
68+
- watch

kroxylicious-operator/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@
358358
<io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicestatus.Conditions>
359359
io.kroxylicious.kubernetes.api.common.Condition
360360
</io.kroxylicious.kubernetes.api.v1alpha1.kafkaservicestatus.Conditions>
361+
<io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterstatus.Conditions>
362+
io.kroxylicious.kubernetes.api.common.Condition
363+
</io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterstatus.Conditions>
361364
<io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
362365
java.lang.Object
363366
</io.kroxylicious.kubernetes.filter.api.v1alpha1.kafkaprotocolfilterspec.ConfigTemplate>
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.time.ZoneId;
11+
import java.time.ZonedDateTime;
12+
import java.util.List;
13+
import java.util.Objects;
14+
import java.util.Optional;
15+
import java.util.Set;
16+
import java.util.TreeSet;
17+
import java.util.function.Function;
18+
import java.util.stream.Collectors;
19+
import java.util.stream.Stream;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import io.fabric8.kubernetes.api.model.ConfigMap;
25+
import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
26+
import io.fabric8.kubernetes.api.model.HasMetadata;
27+
import io.fabric8.kubernetes.api.model.Secret;
28+
import io.fabric8.kubernetes.api.model.SecretVolumeSource;
29+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
30+
import io.javaoperatorsdk.operator.api.reconciler.Context;
31+
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
32+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
33+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
34+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
35+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
36+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
37+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
38+
39+
import io.kroxylicious.kubernetes.api.common.Condition;
40+
import io.kroxylicious.kubernetes.api.common.ConditionBuilder;
41+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
42+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder;
43+
44+
import edu.umd.cs.findbugs.annotations.NonNull;
45+
46+
/**
47+
* Reconciles a {@link KafkaProtocolFilter} by checking whether the {@link Secret}s
48+
* and {@link ConfigMap}s refered to in interpolated expressions actually exist, setting a
49+
* {@link Condition.Type#ResolvedRefs} {@link Condition} accordingly.
50+
*/
51+
public class KafkaProtocolFilterReconciler implements
52+
Reconciler<KafkaProtocolFilter> {
53+
54+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtocolFilterReconciler.class);
55+
private final Clock clock;
56+
private final SecureConfigInterpolator secureConfigInterpolator;
57+
58+
KafkaProtocolFilterReconciler(Clock clock, SecureConfigInterpolator secureConfigInterpolator) {
59+
this.clock = Objects.requireNonNull(clock);
60+
this.secureConfigInterpolator = Objects.requireNonNull(secureConfigInterpolator);
61+
}
62+
63+
@Override
64+
public List<EventSource<?, KafkaProtocolFilter>> prepareEventSources(EventSourceContext<KafkaProtocolFilter> context) {
65+
return List.of(
66+
new InformerEventSource<>(templateResourceReferenceEventSourceConfig(context, Secret.class,
67+
interpolationResult -> interpolationResult.volumes().stream()
68+
.flatMap(volume -> Optional.ofNullable(volume.getSecret())
69+
.map(SecretVolumeSource::getSecretName)
70+
.stream())),
71+
context),
72+
new InformerEventSource<>(templateResourceReferenceEventSourceConfig(context, ConfigMap.class,
73+
interpolationResult -> interpolationResult.volumes().stream()
74+
.flatMap(volume -> Optional.ofNullable(volume.getConfigMap())
75+
.map(ConfigMapVolumeSource::getName)
76+
.stream())),
77+
context));
78+
}
79+
80+
/**
81+
* Returns a new event source config for getting the resource dependencies of a given type present in a filter's {@code spec.configTemplate}.
82+
* @param context The context.
83+
* @param secondaryClass The Java type of resource reference (e.g. Secret)
84+
* @param resourceNameExtractor A function which extracts the name of the resources from an interpolation result.
85+
* @param <R> The type of referenced resource
86+
* @return The event source configuration
87+
*/
88+
private <R extends HasMetadata> InformerEventSourceConfiguration<R> templateResourceReferenceEventSourceConfig(
89+
EventSourceContext<KafkaProtocolFilter> context,
90+
Class<R> secondaryClass,
91+
Function<SecureConfigInterpolator.InterpolationResult, Stream<String>> resourceNameExtractor) {
92+
return InformerEventSourceConfiguration.from(
93+
secondaryClass,
94+
KafkaProtocolFilter.class)
95+
.withPrimaryToSecondaryMapper((KafkaProtocolFilter filter) -> {
96+
Object configTemplate = filter.getSpec().getConfigTemplate();
97+
var interpolationResult = secureConfigInterpolator.interpolate(configTemplate);
98+
Set<ResourceID> resourceIds = resourceNameExtractor.apply(interpolationResult)
99+
.map(name -> new ResourceID(name, ResourcesUtil.namespace(filter)))
100+
.collect(Collectors.toSet());
101+
LOGGER.debug("Filter {} references {}(s) {}", ResourcesUtil.name(filter), secondaryClass.getName(), resourceIds);
102+
return resourceIds;
103+
})
104+
.withSecondaryToPrimaryMapper(secret -> {
105+
Set<ResourceID> resourceIds = ResourcesUtil.filteredResourceIdsInSameNamespace(
106+
context,
107+
secret,
108+
KafkaProtocolFilter.class,
109+
filter -> {
110+
Object configTemplate = filter.getSpec().getConfigTemplate();
111+
var interpolationResult = secureConfigInterpolator.interpolate(configTemplate);
112+
return resourceNameExtractor.apply(interpolationResult)
113+
.anyMatch(secretNameFromVolume -> secretNameFromVolume.equals(ResourcesUtil.name(secret)));
114+
});
115+
LOGGER.debug("{} {} referenced by Filters {}", secondaryClass.getName(), ResourcesUtil.name(secret), resourceIds);
116+
return resourceIds;
117+
})
118+
.build();
119+
}
120+
121+
@Override
122+
public UpdateControl<KafkaProtocolFilter> reconcile(
123+
KafkaProtocolFilter filter,
124+
Context<KafkaProtocolFilter> context) {
125+
126+
var now = ZonedDateTime.ofInstant(clock.instant(), ZoneId.of("Z"));
127+
128+
ConditionBuilder conditionBuilder = newResolvedRefsCondition(filter, now);
129+
130+
var existingSecrets = context.getSecondaryResourcesAsStream(Secret.class)
131+
.map(ResourcesUtil::name)
132+
.collect(Collectors.toSet());
133+
LOGGER.debug("Existing secrets: {}", existingSecrets);
134+
135+
var existingConfigMaps = context.getSecondaryResourcesAsStream(ConfigMap.class)
136+
.map(ResourcesUtil::name)
137+
.collect(Collectors.toSet());
138+
LOGGER.debug("Existing configmaps: {}", existingConfigMaps);
139+
140+
var interpolationResult = secureConfigInterpolator.interpolate(filter.getSpec().getConfigTemplate());
141+
var referencedSecrets = interpolationResult.volumes().stream().flatMap(volume -> Optional.ofNullable(volume.getSecret())
142+
.map(SecretVolumeSource::getSecretName)
143+
.stream())
144+
.collect(Collectors.toCollection(TreeSet::new));
145+
LOGGER.debug("Referenced secrets: {}", referencedSecrets);
146+
147+
var referencedConfigMaps = interpolationResult.volumes().stream().flatMap(volume -> Optional.ofNullable(volume.getConfigMap())
148+
.map(ConfigMapVolumeSource::getName)
149+
.stream())
150+
.collect(Collectors.toCollection(TreeSet::new));
151+
LOGGER.debug("Referenced configmaps: {}", referencedConfigMaps);
152+
153+
if (existingSecrets.containsAll(referencedSecrets)
154+
&& existingConfigMaps.containsAll(referencedConfigMaps)) {
155+
conditionBuilder.withStatus(Condition.Status.TRUE);
156+
}
157+
else {
158+
referencedSecrets.removeAll(existingSecrets);
159+
referencedConfigMaps.removeAll(existingConfigMaps);
160+
String message = "Referenced";
161+
if (!referencedSecrets.isEmpty()) {
162+
message += " Secrets [" + String.join(", ", referencedSecrets) + "]";
163+
}
164+
if (!referencedConfigMaps.isEmpty()) {
165+
message += " ConfigMaps [" + String.join(", ", referencedConfigMaps) + "]";
166+
}
167+
message += " not found";
168+
conditionBuilder.withStatus(Condition.Status.FALSE)
169+
.withReason("MissingInterpolationReferences")
170+
.withMessage(message);
171+
}
172+
173+
KafkaProtocolFilter newFilter = newFilterWithCondition(filter, conditionBuilder.build());
174+
LOGGER.debug("Patching with status {}", newFilter.getStatus());
175+
return UpdateControl.patchStatus(newFilter);
176+
}
177+
178+
@NonNull
179+
private static KafkaProtocolFilter newFilterWithCondition(KafkaProtocolFilter filter, Condition condition) {
180+
// @formatter:off
181+
return new KafkaProtocolFilterBuilder(filter)
182+
.withNewStatus()
183+
.withObservedGeneration(ResourcesUtil.generation(filter))
184+
.withConditions(condition) // overwrite any existing conditions
185+
.endStatus()
186+
.build();
187+
// @formatter:on
188+
}
189+
190+
private static ConditionBuilder newResolvedRefsCondition(KafkaProtocolFilter filter, ZonedDateTime now) {
191+
return new ConditionBuilder()
192+
.withType(Condition.Type.ResolvedRefs)
193+
.withLastTransitionTime(now)
194+
.withObservedGeneration(ResourcesUtil.generation(filter));
195+
}
196+
197+
@Override
198+
public ErrorStatusUpdateControl<KafkaProtocolFilter> updateErrorStatus(
199+
KafkaProtocolFilter filter,
200+
Context<KafkaProtocolFilter> context,
201+
Exception e) {
202+
var now = ZonedDateTime.ofInstant(clock.instant(), ZoneId.of("Z"));
203+
// ResolvedRefs to UNKNOWN
204+
Condition condition = newResolvedRefsCondition(filter, now)
205+
.withStatus(Condition.Status.UNKNOWN)
206+
.withReason(e.getClass().getName())
207+
.withMessage(e.getMessage())
208+
.build();
209+
return ErrorStatusUpdateControl.patchStatus(newFilterWithCondition(filter, condition));
210+
}
211+
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyReconciler.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.javaoperatorsdk.operator.AggregatedOperatorException;
2424
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
2525
import io.javaoperatorsdk.operator.api.reconciler.Context;
26+
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
2627
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
2728
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
2829
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
@@ -74,13 +75,31 @@
7475
})
7576
// @formatter:on
7677
public class KafkaProxyReconciler implements
77-
Reconciler<KafkaProxy> {
78+
Reconciler<KafkaProxy>,
79+
ContextInitializer<KafkaProxy> {
7880

7981
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxyReconciler.class);
8082

8183
public static final String CONFIG_DEP = "config";
8284
public static final String DEPLOYMENT_DEP = "deployment";
8385
public static final String CLUSTERS_DEP = "clusters";
86+
static final String SEC = "sec";
87+
private final SecureConfigInterpolator secureConfigInterpolator;
88+
89+
public KafkaProxyReconciler(SecureConfigInterpolator secureConfigInterpolator) {
90+
this.secureConfigInterpolator = secureConfigInterpolator;
91+
}
92+
93+
static SecureConfigInterpolator secureConfigInterpolator(Context<KafkaProxy> context) {
94+
return context.managedWorkflowAndDependentResourceContext().getMandatory(SEC, SecureConfigInterpolator.class);
95+
}
96+
97+
@Override
98+
public void initContext(
99+
KafkaProxy primary,
100+
Context<KafkaProxy> context) {
101+
context.managedWorkflowAndDependentResourceContext().put(SEC, secureConfigInterpolator);
102+
}
84103

85104
/**
86105
* The happy path, where all the dependent resources expressed a desired
@@ -426,5 +445,4 @@ private static InformerEventSource<KafkaProtocolFilter, KafkaProxy> eventSourceF
426445
private static @NonNull Predicate<KafkaProxyIngress> ingressReferences(HasMetadata primary) {
427446
return ingress -> ingress.getSpec().getProxyRef().getName().equals(name(primary));
428447
}
429-
430448
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,14 @@ public static void main(String[] args) {
8080
*/
8181
void start() {
8282
operator.installShutdownHook(Duration.ofSeconds(10));
83-
operator.register(new KafkaProxyReconciler());
83+
operator.register(new KafkaProxyReconciler(SecureConfigInterpolator.DEFAULT_INTERPOLATOR));
8484
operator.register(new KafkaProxyIngressReconciler(Clock.systemUTC()));
8585
operator.register(new KafkaServiceReconciler(Clock.systemUTC()));
86+
operator.register(new KafkaProtocolFilterReconciler(Clock.systemUTC(), SecureConfigInterpolator.DEFAULT_INTERPOLATOR));
8687
addHttpGetHandler("/", () -> 404);
8788
managementServer.start();
88-
operator.start();
8989
addHttpGetHandler(HTTP_PATH_LIVEZ, this::livezStatusCode);
90+
operator.start();
9091
LOGGER.info("Operator started");
9192
}
9293

@@ -111,7 +112,7 @@ private int livezStatusCode() {
111112
sc = 400;
112113
LOGGER.error("Ignoring exception caught while getting operator health info", e);
113114
}
114-
LOGGER.trace("Responding {} to GET {}", sc, HTTP_PATH_LIVEZ);
115+
(sc != 200 ? LOGGER.atWarn() : LOGGER.atDebug()).log("Responding {} to GET {}", sc, HTTP_PATH_LIVEZ);
115116
return sc;
116117
}
117118

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,8 @@ private static String toYaml(Object filterDefs) {
7979
}
8080
}
8181

82-
private final SecureConfigInterpolator secureConfigInterpolator;
83-
8482
public ProxyConfigConfigMap() {
8583
super(ConfigMap.class);
86-
var providerMap = Map.<String, SecureConfigProvider> of(
87-
"secret", MountedResourceConfigProvider.SECRET_PROVIDER,
88-
"configmap", MountedResourceConfigProvider.CONFIGMAP_PROVIDER);
89-
secureConfigInterpolator = new SecureConfigInterpolator("/opt/kroxylicious/secure", providerMap);
9084
}
9185

9286
/**
@@ -207,7 +201,7 @@ private List<NamedFilterDefinition> filterDefinitions(Context<KafkaProxy> contex
207201
var filterCr = resolutionResult.filter(filterCrRef).orElseThrow();
208202
var spec = filterCr.getSpec();
209203
String type = spec.getType();
210-
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec);
204+
SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(context, spec);
211205
ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext();
212206
putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes());
213207
putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts());
@@ -226,7 +220,8 @@ private static <T> void putOrMerged(ManagedWorkflowAndDependentResourceContext c
226220
}
227221
}
228222

229-
private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) {
223+
private SecureConfigInterpolator.InterpolationResult interpolateConfig(Context<KafkaProxy> context, KafkaProtocolFilterSpec spec) {
224+
SecureConfigInterpolator secureConfigInterpolator = KafkaProxyReconciler.secureConfigInterpolator(context);
230225
Object configTemplate = Objects.requireNonNull(spec.getConfigTemplate(), "ConfigTemplate is required in the KafkaProtocolFilterSpec");
231226
return secureConfigInterpolator.interpolate(configTemplate);
232227
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/SecureConfigInterpolator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ private record InterpolatedValue(@Nullable Object interpolatedValue, @NonNull Li
4646

4747
private static final InterpolatedValue NULL_INTERPOLATED_VALUE = new InterpolatedValue(null, List.of());
4848

49+
static final SecureConfigInterpolator DEFAULT_INTERPOLATOR = new SecureConfigInterpolator("/opt/kroxylicious/secure", Map.<String, SecureConfigProvider> of(
50+
"secret", MountedResourceConfigProvider.SECRET_PROVIDER,
51+
"configmap", MountedResourceConfigProvider.CONFIGMAP_PROVIDER));
52+
4953
private final Map<String, SecureConfigProvider> providers;
5054
private final Path mountPathBase;
5155

0 commit comments

Comments
 (0)