Skip to content

Commit 5ceb08f

Browse files
authored
Rework error reporting (kroxylicious#2007)
* Remove the `KafkaProxy.status.clusters` conditions, using the `VirtualKafkaCluster.status.conditions` * Use `KafkaProxyContext` to manage the items in the context.managedWorkflowAndDependentResourceContext() (removing `SharedKafkaProxyContext` for more consistent naming). Initialize this in `KafkaProxyReconciler#initContext()` so that all the dependent resources see the same sets of referenced resources. * `ProxyConfigConfigMap` now writes a list of conditions for each cluster to the Proxy CM * VirtualKafkaClusterReconciler now has the proxy ConfigMap as a secondary resource, and uses the list of conditions written by the `ProxyConfigConfigMap` (during the KafkaProxyReconciliation) to update the `VirtualKafkaCluster.status.conditions` for one cluster. Also some other changes * Have `KafkaProxyReconciler`'s constructor accept a `Clock`, like the other reconcilers * Factor out `ProxyConfigData` for encapsulating the contents of the Proxy ConfigMap's data section. * Alloc ResourceUtil's `maybeAddOrUpdateCondition` to take a list of conditions, rather than just one. * ClusterCondition and InvalidClusterException gone, plus related tests. Signed-off-by: Tom Bentley <[email protected]>
1 parent a524f56 commit 5ceb08f

File tree

58 files changed

+756
-752
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+756
-752
lines changed

kroxylicious-operator/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@
126126
<artifactId>jackson-databind</artifactId>
127127
<scope>compile</scope>
128128
</dependency>
129+
<dependency>
130+
<groupId>com.fasterxml.jackson.datatype</groupId>
131+
<artifactId>jackson-datatype-jsr310</artifactId>
132+
</dependency>
129133

130134
<dependency>
131135
<groupId>org.slf4j</groupId>

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/api/common/Condition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* A common Condition type, used in CR statuses.
1111
*/
1212
@com.fasterxml.jackson.annotation.JsonInclude(com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL)
13-
@com.fasterxml.jackson.annotation.JsonPropertyOrder({ "lastTransitionTime", "message", "observedGeneration", "reason", "status", "type" })
13+
@com.fasterxml.jackson.annotation.JsonPropertyOrder({ "observedGeneration", "type", "status", "lastTransitionTime", "reason", "message" })
1414
@com.fasterxml.jackson.databind.annotation.JsonDeserialize(using = com.fasterxml.jackson.databind.JsonDeserializer.None.class)
1515
@javax.annotation.processing.Generated("io.fabric8.java.generator.CRGeneratorRunner")
1616
@lombok.EqualsAndHashCode()

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ClusterCondition.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ClusterService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
2020
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
21-
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
22-
import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder;
2321
import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel;
2422

2523
import static io.kroxylicious.kubernetes.operator.ResourcesUtil.toByNameMap;
@@ -50,10 +48,10 @@ static String serviceName(VirtualKafkaCluster cluster) {
5048
public Map<String, Service> desiredResources(
5149
KafkaProxy primary,
5250
Context<KafkaProxy> context) {
53-
ProxyModelBuilder proxyModelBuilder = ProxyModelBuilder.contextBuilder(context);
54-
ProxyModel model = proxyModelBuilder.build(primary, context);
51+
KafkaProxyContext kafkaProxyContext = KafkaProxyContext.proxyContext(context);
52+
var model = kafkaProxyContext.model();
5553
Stream<Service> serviceStream = model.clustersWithValidIngresses().stream()
56-
.filter(cluster -> !SharedKafkaProxyContext.isBroken(context, cluster))
54+
.filter(cluster -> !kafkaProxyContext.isBroken(cluster))
5755
.flatMap(cluster -> model.ingressModel().clusterIngressModel(cluster).map(ProxyIngressModel.VirtualClusterIngressModel::services).orElse(Stream.empty()));
5856
return serviceStream.collect(toByNameMap());
5957
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/InvalidClusterException.java

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.util.stream.Collectors;
11+
12+
import io.javaoperatorsdk.operator.api.reconciler.Context;
13+
14+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
15+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
16+
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
17+
18+
public record KafkaProxyContext(Clock clock,
19+
SecureConfigInterpolator secureConfigInterpolator,
20+
ProxyModel model) {
21+
22+
private static final String KEY_CTX = KafkaProxyContext.class.getName();
23+
24+
static void init(Context<KafkaProxy> context,
25+
Clock clock,
26+
SecureConfigInterpolator secureConfigInterpolator,
27+
ProxyModel model) {
28+
var rc = context.managedWorkflowAndDependentResourceContext();
29+
30+
rc.put(KEY_CTX,
31+
new KafkaProxyContext(
32+
Clock.fixed(clock.instant(), clock.getZone()),
33+
secureConfigInterpolator,
34+
model));
35+
}
36+
37+
static KafkaProxyContext proxyContext(Context<KafkaProxy> context) {
38+
return context.managedWorkflowAndDependentResourceContext().getMandatory(KEY_CTX, KafkaProxyContext.class);
39+
}
40+
41+
boolean isBroken(VirtualKafkaCluster cluster) {
42+
var fullyResolved = model().resolutionResult().fullyResolvedClustersInNameOrder().stream().map(ResourcesUtil::name).collect(Collectors.toSet());
43+
return !fullyResolved.contains(ResourcesUtil.name(cluster))
44+
|| !model().ingressModel().clusterIngressModel(cluster).stream()
45+
.allMatch(ingressModel -> ingressModel.ingressExceptions().isEmpty());
46+
47+
}
48+
49+
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyReconciler.java

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package io.kroxylicious.kubernetes.operator;
77

8+
import java.time.Clock;
89
import java.time.Duration;
910
import java.time.ZoneId;
1011
import java.time.ZonedDateTime;
@@ -45,6 +46,8 @@
4546
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
4647
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec;
4748
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
49+
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
50+
import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder;
4851
import io.kroxylicious.proxy.tag.VisibleForTesting;
4952

5053
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -83,22 +86,22 @@ public class KafkaProxyReconciler implements
8386
public static final String CONFIG_DEP = "config";
8487
public static final String DEPLOYMENT_DEP = "deployment";
8588
public static final String CLUSTERS_DEP = "clusters";
86-
static final String SEC = "sec";
89+
90+
private final Clock clock;
8791
private final SecureConfigInterpolator secureConfigInterpolator;
8892

89-
public KafkaProxyReconciler(SecureConfigInterpolator secureConfigInterpolator) {
93+
public KafkaProxyReconciler(Clock clock, SecureConfigInterpolator secureConfigInterpolator) {
94+
this.clock = clock;
9095
this.secureConfigInterpolator = secureConfigInterpolator;
9196
}
9297

93-
static SecureConfigInterpolator secureConfigInterpolator(Context<KafkaProxy> context) {
94-
return context.managedWorkflowAndDependentResourceContext().getMandatory(SEC, SecureConfigInterpolator.class);
95-
}
96-
9798
@Override
9899
public void initContext(
99-
KafkaProxy primary,
100+
KafkaProxy proxy,
100101
Context<KafkaProxy> context) {
101-
context.managedWorkflowAndDependentResourceContext().put(SEC, secureConfigInterpolator);
102+
ProxyModelBuilder proxyModelBuilder = ProxyModelBuilder.contextBuilder();
103+
ProxyModel model = proxyModelBuilder.build(proxy, context);
104+
KafkaProxyContext.init(context, clock, secureConfigInterpolator, model);
102105
}
103106

104107
/**
@@ -111,7 +114,7 @@ public UpdateControl<KafkaProxy> reconcile(KafkaProxy primary,
111114
LOGGER.info("Completed reconciliation of {}/{}", namespace(primary), name(primary));
112115
}
113116
return UpdateControl.patchStatus(
114-
buildStatus(primary, context, null));
117+
buildStatus(primary, null));
115118
}
116119

117120
/**
@@ -123,7 +126,7 @@ public ErrorStatusUpdateControl<KafkaProxy> updateErrorStatus(KafkaProxy primary
123126
Exception exception) {
124127
// Post-condition: status.conditions should be in a canonical order (to avoid non-terminating reconciliations)
125128
// Post-condition: There is only one Ready condition
126-
var control = ErrorStatusUpdateControl.patchStatus(buildStatus(primary, context, exception));
129+
var control = ErrorStatusUpdateControl.patchStatus(buildStatus(primary, exception));
127130
if (exception instanceof InvalidResourceException) {
128131
control.withNoRetry();
129132
}
@@ -135,7 +138,6 @@ public ErrorStatusUpdateControl<KafkaProxy> updateErrorStatus(KafkaProxy primary
135138
}
136139

137140
private static KafkaProxy buildStatus(KafkaProxy primary,
138-
Context<KafkaProxy> context,
139141
@Nullable Exception exception) {
140142
if (exception instanceof AggregatedOperatorException aoe && aoe.getAggregatedExceptions().size() == 1) {
141143
exception = aoe.getAggregatedExceptions().values().iterator().next();
@@ -150,30 +152,17 @@ private static KafkaProxy buildStatus(KafkaProxy primary,
150152
.endMetadata()
151153
.withNewStatus()
152154
.withObservedGeneration(generation(primary))
153-
.withConditions(effectiveReadyCondition(now, primary, exception ))
154-
.withClusters(clusterConditions(now, primary, context ))
155+
.withConditions(effectiveReadyCondition(now, primary, exception))
155156
.endStatus()
156157
.build();
157158
// @formatter:on
158159
}
159160

160-
private static List<io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.Clusters> clusterConditions(ZonedDateTime now,
161-
KafkaProxy primary,
162-
Context<KafkaProxy> context) {
163-
return ResourcesUtil.clustersInNameOrder(context).map(cluster -> {
164-
ClusterCondition clusterCondition = SharedKafkaProxyContext.clusterCondition(context, cluster);
165-
var conditions = newClusterCondition(now, primary, clusterCondition);
166-
return new io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxystatus.ClustersBuilder()
167-
.withName(name(cluster))
168-
.withConditions(conditions).build();
169-
}).toList();
170-
}
171-
172161
/**
173162
* Determines whether the {@code Ready} condition has had a state transition,
174163
* and returns an appropriate new {@code Ready} condition.
175164
*
176-
* @param now
165+
* @param now The current time
177166
* @param primary The primary.
178167
* @param exception An exception, or null if the reconciliation was successful.
179168
* @return The {@code Ready} condition to use in {@code status.conditions}.
@@ -224,7 +213,7 @@ else if (exception instanceof InvalidResourceException) {
224213
}
225214

226215
/**
227-
* @param now
216+
* @param now The current time
228217
* @param primary The primary.
229218
* @param exception An exception, or null if the reconciliation was successful.
230219
* @return The {@code Ready} condition to use in {@code status.conditions}
@@ -245,20 +234,6 @@ private static Condition newCondition(
245234
.build();
246235
}
247236

248-
private static Condition newClusterCondition(
249-
ZonedDateTime now,
250-
KafkaProxy primary,
251-
ClusterCondition clusterCondition) {
252-
return new ConditionBuilder()
253-
.withLastTransitionTime(now)
254-
.withMessage(clusterCondition.message())
255-
.withObservedGeneration(generation(primary))
256-
.withReason(clusterCondition.reason())
257-
.withStatus(clusterCondition.status())
258-
.withType(clusterCondition.type())
259-
.build();
260-
}
261-
262237
private static boolean isTransition(@Nullable Condition oldReady, @Nullable Exception exception) {
263238
if (oldReady == null) {
264239
return true;

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static void main(String[] args) {
7979
*/
8080
void start() {
8181
operator.installShutdownHook(Duration.ofSeconds(10));
82-
operator.register(new KafkaProxyReconciler(SecureConfigInterpolator.DEFAULT_INTERPOLATOR));
82+
operator.register(new KafkaProxyReconciler(Clock.systemUTC(), SecureConfigInterpolator.DEFAULT_INTERPOLATOR));
8383
operator.register(new VirtualKafkaClusterReconciler(Clock.systemUTC()));
8484
operator.register(new KafkaProxyIngressReconciler(Clock.systemUTC()));
8585
operator.register(new KafkaServiceReconciler(Clock.systemUTC()));

0 commit comments

Comments
 (0)