Skip to content

Commit fbd4c96

Browse files
authored
Merge pull request kroxylicious#2021 from tombentley/conditions-attempt-two
Conditions attempt two
2 parents b144a9b + 478fa71 commit fbd4c96

File tree

40 files changed

+1354
-881
lines changed

40 files changed

+1354
-881
lines changed

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/api/common/Condition.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
})
2727
public class Condition implements io.fabric8.kubernetes.api.builder.Editable<ConditionBuilder>, io.fabric8.kubernetes.api.model.KubernetesResource {
2828

29+
public static final String REASON_INTERPOLATED_REFS_NOT_FOUND = "InterpolatedReferencedResourcesNotFound";
30+
public static final String REASON_REFS_NOT_FOUND = "ReferencedResourcesNotFound";
31+
public static final String REASON_TRANSITIVE_REFS_NOT_FOUND = "TransitivelyReferencedResourcesNotFound";
32+
public static final String REASON_INVALID = "Invalid";
33+
2934
@Override
3035
public ConditionBuilder edit() {
3136
return new ConditionBuilder(this);

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProtocolFilterReconciler.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ public class KafkaProtocolFilterReconciler implements
4949
Reconciler<KafkaProtocolFilter> {
5050

5151
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtocolFilterReconciler.class);
52-
private final Clock clock;
52+
private final KafkaProtocolFilterStatusFactory statusFactory;
5353
private final SecureConfigInterpolator secureConfigInterpolator;
5454

5555
KafkaProtocolFilterReconciler(Clock clock, SecureConfigInterpolator secureConfigInterpolator) {
56-
this.clock = Objects.requireNonNull(clock);
56+
this.statusFactory = new KafkaProtocolFilterStatusFactory(Objects.requireNonNull(clock));
5757
this.secureConfigInterpolator = Objects.requireNonNull(secureConfigInterpolator);
5858
}
5959

@@ -131,22 +131,26 @@ public UpdateControl<KafkaProtocolFilter> reconcile(
131131
LOGGER.debug("Existing configmaps: {}", existingConfigMaps);
132132

133133
var interpolationResult = secureConfigInterpolator.interpolate(filter.getSpec().getConfigTemplate());
134-
var referencedSecrets = interpolationResult.volumes().stream().flatMap(volume -> Optional.ofNullable(volume.getSecret())
135-
.map(SecretVolumeSource::getSecretName)
136-
.stream())
134+
var referencedSecrets = interpolationResult.volumes().stream()
135+
.flatMap(volume -> Optional.ofNullable(volume.getSecret())
136+
.map(SecretVolumeSource::getSecretName)
137+
.stream())
137138
.collect(Collectors.toCollection(TreeSet::new));
138139
LOGGER.debug("Referenced secrets: {}", referencedSecrets);
139140

140-
var referencedConfigMaps = interpolationResult.volumes().stream().flatMap(volume -> Optional.ofNullable(volume.getConfigMap())
141-
.map(ConfigMapVolumeSource::getName)
142-
.stream())
141+
var referencedConfigMaps = interpolationResult.volumes().stream()
142+
.flatMap(volume -> Optional.ofNullable(volume.getConfigMap())
143+
.map(ConfigMapVolumeSource::getName)
144+
.stream())
143145
.collect(Collectors.toCollection(TreeSet::new));
144146
LOGGER.debug("Referenced configmaps: {}", referencedConfigMaps);
145147

146-
Condition condition;
148+
KafkaProtocolFilter patch;
147149
if (existingSecrets.containsAll(referencedSecrets)
148150
&& existingConfigMaps.containsAll(referencedConfigMaps)) {
149-
condition = ResourcesUtil.newResolvedRefsTrue(clock, filter);
151+
patch = statusFactory.newTrueConditionStatusPatch(
152+
filter,
153+
Condition.Type.ResolvedRefs);
150154
}
151155
else {
152156
referencedSecrets.removeAll(existingSecrets);
@@ -159,19 +163,17 @@ public UpdateControl<KafkaProtocolFilter> reconcile(
159163
message += " ConfigMaps [" + String.join(", ", referencedConfigMaps) + "]";
160164
}
161165
message += " not found";
162-
condition = ResourcesUtil.newResolvedRefsFalse(clock,
166+
patch = statusFactory.newFalseConditionStatusPatch(
163167
filter,
164-
"MissingInterpolationReferences",
168+
Condition.Type.ResolvedRefs,
169+
Condition.REASON_INTERPOLATED_REFS_NOT_FOUND,
165170
message);
166171
}
167172

168-
KafkaProtocolFilter newFilter = ResourcesUtil.patchWithCondition(filter, condition);
169-
LOGGER.debug("Patching with status {}", newFilter.getStatus());
170-
UpdateControl<KafkaProtocolFilter> uc = UpdateControl.patchStatus(newFilter);
171173
if (LOGGER.isInfoEnabled()) {
172174
LOGGER.info("Completed reconciliation of {}/{}", namespace(filter), name(filter));
173175
}
174-
return uc;
176+
return UpdateControl.patchStatus(patch);
175177
}
176178

177179
@Override
@@ -180,11 +182,10 @@ public ErrorStatusUpdateControl<KafkaProtocolFilter> updateErrorStatus(
180182
Context<KafkaProtocolFilter> context,
181183
Exception e) {
182184
// ResolvedRefs to UNKNOWN
183-
Condition condition = ResourcesUtil.resolvedRefsUnknown(clock, filter, e);
184-
ErrorStatusUpdateControl<KafkaProtocolFilter> uc = ErrorStatusUpdateControl.patchStatus(
185-
ResourcesUtil.patchWithCondition(filter, condition));
185+
ErrorStatusUpdateControl<KafkaProtocolFilter> uc = ErrorStatusUpdateControl
186+
.patchStatus(statusFactory.newUnknownConditionStatusPatch(filter, Condition.Type.ResolvedRefs, e));
186187
if (LOGGER.isInfoEnabled()) {
187-
LOGGER.info("Completed reconciliation of {}/{} for error {}", namespace(filter), name(filter), e.toString());
188+
LOGGER.info("Completed reconciliation of {}/{} with error {}", namespace(filter), name(filter), e.toString());
188189
}
189190
return uc;
190191
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.util.List;
11+
import java.util.Optional;
12+
13+
import io.kroxylicious.kubernetes.api.common.Condition;
14+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
15+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder;
16+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterStatus;
17+
18+
public class KafkaProtocolFilterStatusFactory extends StatusFactory<KafkaProtocolFilter> {
19+
20+
public KafkaProtocolFilterStatusFactory(Clock clock) {
21+
super(clock);
22+
}
23+
24+
private KafkaProtocolFilter filterStatusPatch(KafkaProtocolFilter observedProxy,
25+
Condition condition) {
26+
// @formatter:off
27+
return new KafkaProtocolFilterBuilder()
28+
.withNewMetadata()
29+
.withUid(ResourcesUtil.uid(observedProxy))
30+
.withName(ResourcesUtil.name(observedProxy))
31+
.withNamespace(ResourcesUtil.namespace(observedProxy))
32+
.endMetadata()
33+
.withNewStatus()
34+
.withObservedGeneration(ResourcesUtil.generation(observedProxy))
35+
.withConditions(ResourceState.newConditions(Optional.ofNullable(observedProxy.getStatus()).map(KafkaProtocolFilterStatus::getConditions).orElse(List.of()), ResourceState.of(condition)))
36+
.endStatus()
37+
.build();
38+
// @formatter:on
39+
}
40+
41+
@Override
42+
KafkaProtocolFilter newUnknownConditionStatusPatch(KafkaProtocolFilter observedFilter,
43+
Condition.Type type,
44+
Exception e) {
45+
Condition unknownCondition = newUnknownCondition(observedFilter, type, e);
46+
return filterStatusPatch(observedFilter, unknownCondition);
47+
}
48+
49+
@Override
50+
KafkaProtocolFilter newFalseConditionStatusPatch(KafkaProtocolFilter observedProxy,
51+
Condition.Type type,
52+
String reason,
53+
String message) {
54+
Condition falseCondition = newFalseCondition(observedProxy, type, reason, message);
55+
return filterStatusPatch(observedProxy, falseCondition);
56+
}
57+
58+
@Override
59+
KafkaProtocolFilter newTrueConditionStatusPatch(KafkaProtocolFilter observedProxy,
60+
Condition.Type type) {
61+
Condition trueCondition = newTrueCondition(observedProxy, type);
62+
return filterStatusPatch(observedProxy, trueCondition);
63+
}
64+
65+
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyContext.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package io.kroxylicious.kubernetes.operator;
88

9-
import java.time.Clock;
109
import java.util.stream.Collectors;
1110

1211
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -15,21 +14,21 @@
1514
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
1615
import io.kroxylicious.kubernetes.operator.model.ProxyModel;
1716

18-
public record KafkaProxyContext(Clock clock,
17+
public record KafkaProxyContext(VirtualKafkaClusterStatusFactory virtualKafkaClusterStatusFactory,
1918
SecureConfigInterpolator secureConfigInterpolator,
2019
ProxyModel model) {
2120

2221
private static final String KEY_CTX = KafkaProxyContext.class.getName();
2322

2423
static void init(Context<KafkaProxy> context,
25-
Clock clock,
24+
VirtualKafkaClusterStatusFactory virtualKafkaClusterStatusFactory,
2625
SecureConfigInterpolator secureConfigInterpolator,
2726
ProxyModel model) {
2827
var rc = context.managedWorkflowAndDependentResourceContext();
2928

3029
rc.put(KEY_CTX,
3130
new KafkaProxyContext(
32-
Clock.fixed(clock.instant(), clock.getZone()),
31+
virtualKafkaClusterStatusFactory,
3332
secureConfigInterpolator,
3433
model));
3534
}

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyIngressReconciler.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import java.time.Clock;
1010
import java.util.List;
11+
import java.util.Objects;
1112

1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
@@ -38,10 +39,10 @@ public class KafkaProxyIngressReconciler implements
3839

3940
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxyIngressReconciler.class);
4041
public static final String PROXY_EVENT_SOURCE_NAME = "proxy";
41-
private final Clock clock;
42+
private final KafkaProxyIngressStatusFactory statusFactory;
4243

4344
KafkaProxyIngressReconciler(Clock clock) {
44-
this.clock = clock;
45+
this.statusFactory = new KafkaProxyIngressStatusFactory(Objects.requireNonNull(clock));
4546
}
4647

4748
@Override
@@ -68,22 +69,24 @@ public UpdateControl<KafkaProxyIngress> reconcile(
6869
var proxyOpt = context.getSecondaryResource(KafkaProxy.class, PROXY_EVENT_SOURCE_NAME);
6970
LOGGER.debug("spec.proxyRef.name resolves to: {}", proxyOpt);
7071

71-
Condition condition;
72+
KafkaProxyIngress patch;
7273
if (proxyOpt.isPresent()) {
73-
condition = ResourcesUtil.newResolvedRefsTrue(clock, ingress);
74+
patch = statusFactory.newTrueConditionStatusPatch(
75+
ingress,
76+
Condition.Type.ResolvedRefs);
7477
}
7578
else {
76-
condition = ResourcesUtil.newResolvedRefsFalse(clock,
79+
patch = statusFactory.newFalseConditionStatusPatch(
7780
ingress,
78-
"spec.proxyRef.name",
79-
"KafkaProxy not found");
81+
Condition.Type.ResolvedRefs,
82+
Condition.REASON_REFS_NOT_FOUND,
83+
"KafkaProxy spec.proxyRef.name not found");
8084
}
8185

82-
UpdateControl<KafkaProxyIngress> uc = UpdateControl.patchStatus(ResourcesUtil.patchWithCondition(ingress, condition));
8386
if (LOGGER.isInfoEnabled()) {
8487
LOGGER.info("Completed reconciliation of {}/{}", namespace(ingress), name(ingress));
8588
}
86-
return uc;
89+
return UpdateControl.patchStatus(patch);
8790
}
8891

8992
@Override
@@ -92,11 +95,10 @@ public ErrorStatusUpdateControl<KafkaProxyIngress> updateErrorStatus(
9295
Context<KafkaProxyIngress> context,
9396
Exception e) {
9497
// ResolvedRefs to UNKNOWN
95-
Condition condition = ResourcesUtil.resolvedRefsUnknown(clock, ingress, e);
96-
ErrorStatusUpdateControl<KafkaProxyIngress> uc = ErrorStatusUpdateControl.patchStatus(
97-
ResourcesUtil.patchWithCondition(ingress, condition));
98+
ErrorStatusUpdateControl<KafkaProxyIngress> uc = ErrorStatusUpdateControl
99+
.patchStatus(statusFactory.newUnknownConditionStatusPatch(ingress, Condition.Type.ResolvedRefs, e));
98100
if (LOGGER.isInfoEnabled()) {
99-
LOGGER.info("Completed reconciliation of {}/{} for error {}", namespace(ingress), name(ingress), e.toString());
101+
LOGGER.info("Completed reconciliation of {}/{} with error {}", namespace(ingress), name(ingress), e.toString());
100102
}
101103
return uc;
102104
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.time.Clock;
10+
import java.util.List;
11+
import java.util.Optional;
12+
13+
import io.kroxylicious.kubernetes.api.common.Condition;
14+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress;
15+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngressBuilder;
16+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngressStatus;
17+
18+
public class KafkaProxyIngressStatusFactory extends StatusFactory<KafkaProxyIngress> {
19+
public KafkaProxyIngressStatusFactory(Clock clock) {
20+
super(clock);
21+
}
22+
23+
private KafkaProxyIngress ingressStatusPatch(KafkaProxyIngress observedIngress,
24+
Condition condition) {
25+
// @formatter:off
26+
return new KafkaProxyIngressBuilder()
27+
.withNewMetadata()
28+
.withUid(ResourcesUtil.uid(observedIngress))
29+
.withName(ResourcesUtil.name(observedIngress))
30+
.withNamespace(ResourcesUtil.namespace(observedIngress))
31+
.endMetadata()
32+
.withNewStatus()
33+
.withObservedGeneration(ResourcesUtil.generation(observedIngress))
34+
.withConditions(ResourceState.newConditions(Optional.ofNullable(observedIngress.getStatus()).map(KafkaProxyIngressStatus::getConditions).orElse(List.of()), ResourceState.of(condition)))
35+
.endStatus()
36+
.build();
37+
// @formatter:on
38+
}
39+
40+
@Override
41+
KafkaProxyIngress newUnknownConditionStatusPatch(KafkaProxyIngress observedFilter,
42+
Condition.Type type,
43+
Exception e) {
44+
Condition unknownCondition = newUnknownCondition(observedFilter, type, e);
45+
return ingressStatusPatch(observedFilter, unknownCondition);
46+
}
47+
48+
@Override
49+
KafkaProxyIngress newFalseConditionStatusPatch(KafkaProxyIngress observedProxy,
50+
Condition.Type type,
51+
String reason,
52+
String message) {
53+
Condition falseCondition = newFalseCondition(observedProxy, type, reason, message);
54+
return ingressStatusPatch(observedProxy, falseCondition);
55+
}
56+
57+
@Override
58+
KafkaProxyIngress newTrueConditionStatusPatch(KafkaProxyIngress observedProxy,
59+
Condition.Type type) {
60+
Condition trueCondition = newTrueCondition(observedProxy, type);
61+
return ingressStatusPatch(observedProxy, trueCondition);
62+
}
63+
}

0 commit comments

Comments
 (0)