diff --git a/kroxylicious-operator/pom.xml b/kroxylicious-operator/pom.xml index ed3f61b0b1..f51ff5c24e 100644 --- a/kroxylicious-operator/pom.xml +++ b/kroxylicious-operator/pom.xml @@ -357,7 +357,9 @@ io.kroxylicious.kubernetes.api.common.Condition - + + java.lang.Object + diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java index d0617f58fc..c69e47ef8d 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/ProxyConfigConfigMap.java @@ -33,6 +33,7 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterSpec; import io.kroxylicious.kubernetes.operator.model.ProxyModel; import io.kroxylicious.kubernetes.operator.model.ProxyModelBuilder; import io.kroxylicious.kubernetes.operator.model.ingress.ProxyIngressModel; @@ -203,17 +204,13 @@ private List filterDefinitions(Context contex String filterDefinitionName = filterDefinitionName(filterCrRef); var filterCr = resolutionResult.filter(filterCrRef).orElseThrow(); - if (filterCr.getAdditionalProperties().get("spec") instanceof Map spec) { - String type = (String) spec.get("type"); - SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec); - ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext(); - putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes()); - putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts()); - return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config()); - } - else { - throw new InvalidClusterException(ClusterCondition.filterInvalid(ResourcesUtil.name(cluster), filterDefinitionName, "`spec` was not an `object`.")); - } + var spec = filterCr.getSpec(); + String type = spec.getType(); + SecureConfigInterpolator.InterpolationResult interpolationResult = interpolateConfig(spec); + ManagedWorkflowAndDependentResourceContext ctx = context.managedWorkflowAndDependentResourceContext(); + putOrMerged(ctx, SECURE_VOLUME_KEY, interpolationResult.volumes()); + putOrMerged(ctx, SECURE_VOLUME_MOUNT_KEY, interpolationResult.mounts()); + return new NamedFilterDefinition(filterDefinitionName, type, interpolationResult.config()); }).toList(); } @@ -228,14 +225,14 @@ private static void putOrMerged(ManagedWorkflowAndDependentResourceContext c } } - private SecureConfigInterpolator.InterpolationResult interpolateConfig(Map spec) { + private SecureConfigInterpolator.InterpolationResult interpolateConfig(KafkaProtocolFilterSpec spec) { SecureConfigInterpolator.InterpolationResult result; - Object configTemplate = spec.get("configTemplate"); + Object configTemplate = spec.getConfigTemplate(); if (configTemplate != null) { result = secureConfigInterpolator.interpolate(configTemplate); } else { - result = new SecureConfigInterpolator.InterpolationResult(spec.get("config"), Set.of(), Set.of()); + throw new IllegalArgumentException("ConfigTemplate is not found in KafkaProtocolFilterSpec"); } return result; } diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java index f945b7b274..3c4ce36eac 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImpl.java @@ -15,7 +15,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.kroxylicious.kubernetes.api.common.FilterRef; @@ -25,6 +24,7 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterSpec; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.ResourcesUtil; import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult; @@ -52,7 +52,7 @@ public ResolutionResult deepResolve(Context context, UnresolvedDepen .collect(ResourcesUtil.toByLocalRefMap()); Map, KafkaService> clusterRefs = context.getSecondaryResources(KafkaService.class).stream() .collect(ResourcesUtil.toByLocalRefMap()); - Map, GenericKubernetesResource> filters = context.getSecondaryResources(GenericKubernetesResource.class).stream() + Map, KafkaProtocolFilter> filters = context.getSecondaryResources(KafkaProtocolFilter.class).stream() .collect(ResourcesUtil.toByLocalRefMap()); var resolutionResult = virtualKafkaClusters.stream().map(cluster -> determineUnresolvedDependencies(cluster, ingresses, clusterRefs, filters)) .collect(Collectors.toSet()); @@ -64,7 +64,7 @@ public ResolutionResult deepResolve(Context context, UnresolvedDepen private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaCluster cluster, Map, KafkaProxyIngress> ingresses, Map, KafkaService> clusterRefs, - Map, GenericKubernetesResource> filters) { + Map, KafkaProtocolFilter> filters) { VirtualKafkaClusterSpec spec = cluster.getSpec(); Set> unresolvedDependencies = new HashSet<>(); determineUnresolvedIngresses(spec, ingresses).forEach(unresolvedDependencies::add); @@ -74,7 +74,7 @@ private ClusterResolutionResult determineUnresolvedDependencies(VirtualKafkaClus } private Stream> determineUnresolvedFilters(VirtualKafkaClusterSpec spec, - Map, GenericKubernetesResource> filters) { + Map, KafkaProtocolFilter> filters) { List filtersList = spec.getFilterRefs(); if (filtersList == null) { return Stream.empty(); @@ -102,7 +102,7 @@ private static Stream> determineUnresolvedIngresses(Virtua .filter(ref -> !ingresses.containsKey(ref)); } - private static boolean filterResourceMatchesRef(FilterRef filterRef, GenericKubernetesResource filterResource) { + private static boolean filterResourceMatchesRef(FilterRef filterRef, KafkaProtocolFilter filterResource) { String apiVersion = filterResource.getApiVersion(); var filterResourceGroup = apiVersion.substring(0, apiVersion.indexOf("/")); return filterResourceGroup.equals(filterRef.getGroup()) diff --git a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java index c92b6e3130..531c20eee8 100644 --- a/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java +++ b/kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/resolver/ResolutionResult.java @@ -15,13 +15,12 @@ import java.util.Set; import java.util.function.Predicate; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; - import io.kroxylicious.kubernetes.api.common.FilterRef; import io.kroxylicious.kubernetes.api.common.LocalRef; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.ResourcesUtil; import static java.util.Comparator.comparing; @@ -32,7 +31,7 @@ * describes which dependencies could not be resolved per VirtualKafkaCluster. */ public class ResolutionResult { - private final Map, GenericKubernetesResource> filters; + private final Map, KafkaProtocolFilter> filters; private final Map, KafkaProxyIngress> kafkaProxyIngresses; private final Map, KafkaService> kafkaServiceRefs; private final Set clusterResolutionResults; @@ -53,7 +52,7 @@ public boolean isFullyResolved() { } - ResolutionResult(Map, GenericKubernetesResource> filters, + ResolutionResult(Map, KafkaProtocolFilter> filters, Map, KafkaProxyIngress> kafkaProxyIngresses, Map, KafkaService> kafkaServiceRefs, Set clusterResolutionResults) { @@ -127,7 +126,7 @@ public Optional kafkaServiceRef(VirtualKafkaCluster cluster) { * Get all resolved Filters * @return filters */ - public Collection filters() { + public Collection filters() { return filters.values(); } @@ -135,7 +134,7 @@ public Collection filters() { * Get the resolved GenericKubernetesResource for a filterRef * @return optional containing the resource if resolved, else empty */ - public Optional filter(FilterRef filterRef) { + public Optional filter(FilterRef filterRef) { return filters().stream() .filter(filterResource -> { String apiVersion = filterResource.getApiVersion(); diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java index 9dfeba1457..cc2becadc4 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/DerivedResourcesTest.java @@ -35,7 +35,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -48,6 +47,7 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyIngress; import io.kroxylicious.kubernetes.api.v1alpha1.KafkaService; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; import io.kroxylicious.kubernetes.operator.config.RuntimeDecl; import edu.umd.cs.findbugs.annotations.NonNull; @@ -363,18 +363,18 @@ private static Context buildContext(Path testDir, List filterInstances = new HashSet<>(); + Set filterInstances = new HashSet<>(); for (var filterApi : runtimeDecl.filterApis()) { String fileName = "in-" + filterApi.kind() + "-*.yaml"; try (var dirStream = Files.newDirectoryStream(testDir, fileName)) { for (Path p : dirStream) { - GenericKubernetesResource resource = YAML_MAPPER.readValue(p.toFile(), GenericKubernetesResource.class); + KafkaProtocolFilter resource = YAML_MAPPER.readValue(p.toFile(), KafkaProtocolFilter.class); assertMinimalMetadata(resource.getMetadata(), fileName); filterInstances.add(resource); } } } - doReturn(filterInstances).when(context).getSecondaryResources(GenericKubernetesResource.class); + doReturn(filterInstances).when(context).getSecondaryResources(KafkaProtocolFilter.class); doReturn(Set.copyOf(virtualKafkaClusters)).when(context).getSecondaryResources(VirtualKafkaCluster.class); doReturn(Set.copyOf(kafkaServiceRefs)).when(context).getSecondaryResources(KafkaService.class); doReturn(Set.copyOf(ingresses)).when(context).getSecondaryResources(KafkaProxyIngress.class); diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java index e540df1d1e..7e06ebeeff 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/ProxyReconcilerIT.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Set; import org.assertj.core.api.AbstractStringAssert; @@ -40,6 +41,8 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterBuilder; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder; import io.kroxylicious.kubernetes.operator.config.RuntimeDecl; import static io.kroxylicious.kubernetes.api.v1alpha1.kafkaproxyingressspec.ClusterIP.Protocol.TCP; @@ -56,6 +59,7 @@ class ProxyReconcilerIT { private static final String PROXY_A = "proxy-a"; private static final String PROXY_B = "proxy-b"; private static final String CLUSTER_FOO_REF = "fooref"; + private static final String FILTER_NAME = "validation"; private static final String CLUSTER_FOO = "foo"; private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip"; private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092"; @@ -88,6 +92,7 @@ void beforeEach() { .withAdditionalCustomResourceDefinition(VirtualKafkaCluster.class) .withAdditionalCustomResourceDefinition(KafkaService.class) .withAdditionalCustomResourceDefinition(KafkaProxyIngress.class) + .withAdditionalCustomResourceDefinition(KafkaProtocolFilter.class) .waitForNamespaceDeletion(true) .withConfigurationService(x -> x.withCloseClientOnStop(false)) .build(); @@ -119,12 +124,13 @@ public KafkaProxyIngress ingress(String name) { CreatedResources doCreate() { KafkaProxy proxy = extension.create(kafkaProxy(PROXY_A)); + KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME)); KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP)); KafkaProxyIngress ingressBar = extension.create(clusterIpIngress(CLUSTER_BAR_CLUSTERIP_INGRESS, proxy)); Set clusterRefs = Set.of(barClusterRef); - VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar)); + VirtualKafkaCluster clusterBar = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxy, barClusterRef, ingressBar, filter)); Set clusters = Set.of(clusterBar); - assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of()); + assertProxyConfigContents(proxy, Set.of(CLUSTER_BAR_BOOTSTRAP, filter.getSpec().getType()), Set.of()); assertDeploymentMountsConfigConfigMap(proxy); assertDeploymentBecomesReady(proxy); assertServiceTargetsProxyInstances(proxy, clusterBar, ingressBar); @@ -274,9 +280,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() { KafkaService fooClusterRef = extension.create(clusterRef(CLUSTER_FOO_REF, CLUSTER_FOO_BOOTSTRAP)); KafkaService barClusterRef = extension.create(clusterRef(CLUSTER_BAR_REF, CLUSTER_BAR_BOOTSTRAP)); + KafkaProtocolFilter filter = extension.create(filter(FILTER_NAME)); - VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo)); - VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar)); + VirtualKafkaCluster clusterFoo = extension.create(virtualKafkaCluster(CLUSTER_FOO, proxyA, fooClusterRef, ingressFoo, filter)); + VirtualKafkaCluster barCluster = extension.create(virtualKafkaCluster(CLUSTER_BAR, proxyB, barClusterRef, ingressBar, filter)); assertProxyConfigContents(proxyA, Set.of(CLUSTER_FOO_BOOTSTRAP), Set.of()); assertProxyConfigContents(proxyB, Set.of(CLUSTER_BAR_BOOTSTRAP), Set.of()); @@ -312,13 +319,13 @@ private AbstractStringAssert assertThatProxyConfigFor(KafkaProxy proxy) { } private static VirtualKafkaCluster virtualKafkaCluster(String clusterName, KafkaProxy proxy, KafkaService clusterRef, - KafkaProxyIngress ingress) { + KafkaProxyIngress ingress, KafkaProtocolFilter filter) { return new VirtualKafkaClusterBuilder().withNewMetadata().withName(clusterName).endMetadata() .withNewSpec() .withTargetKafkaServiceRef(new KafkaServiceRefBuilder().withName(name(clusterRef)).build()) .withNewProxyRef().withName(name(proxy)).endProxyRef() .addNewIngressRef().withName(name(ingress)).endIngressRef() - .withFilterRefs() + .withFilterRefs().addNewFilterRef().withName(name(filter)).endFilterRef() .endSpec().build(); } @@ -329,6 +336,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot .endSpec().build(); } + private static KafkaProtocolFilter filter(String name) { + return new KafkaProtocolFilterBuilder().withNewMetadata().withName(name).endMetadata() + .withNewSpec().withType("RecordValidation").withConfigTemplate(Map.of("rules", List.of(Map.of("allowNulls", false)))) + .endSpec().build(); + } + KafkaProxy kafkaProxy(String name) { // @formatter:off return new KafkaProxyBuilder() diff --git a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java index e9b6de0f9f..917f720a70 100644 --- a/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java +++ b/kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/resolver/DependencyResolverImplTest.java @@ -9,17 +9,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.stubbing.OngoingStubbing; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; -import io.fabric8.kubernetes.api.model.GenericKubernetesResourceBuilder; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.kroxylicious.kubernetes.api.common.FilterRef; @@ -34,6 +30,8 @@ import io.kroxylicious.kubernetes.api.v1alpha1.KafkaServiceBuilder; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster; import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaClusterBuilder; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter; +import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilterBuilder; import io.kroxylicious.kubernetes.operator.resolver.ResolutionResult.ClusterResolutionResult; import static org.assertj.core.api.Assertions.assertThat; @@ -92,7 +90,7 @@ void testNullFiltersOnVirtualClusterTolerated() { @Test void testSingleFilterUnreferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -113,7 +111,7 @@ void testSingleFilterUnreferenced() { @Test void testSingleFilterReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -134,8 +132,8 @@ void testSingleFilterReferenced() { @Test void testMultipleFiltersReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); - GenericKubernetesResource filter2 = protocolFilter("filterName2"); + KafkaProtocolFilter filter = protocolFilter("filterName"); + KafkaProtocolFilter filter2 = protocolFilter("filterName2"); givenFiltersInContext(filter, filter2); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -157,7 +155,7 @@ void testMultipleFiltersReferenced() { @Test void testSubsetOfFiltersReferenced() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("cluster")); givenIngressesInContext(); @@ -179,7 +177,7 @@ void testSubsetOfFiltersReferenced() { @Test void testUnresolvedFilter() { - GenericKubernetesResource filter = protocolFilter("filterName"); + KafkaProtocolFilter filter = protocolFilter("filterName"); givenFiltersInContext(filter); givenClusterRefsInContext(kafkaServiceRef("clusterRef")); givenIngressesInContext(); @@ -320,16 +318,17 @@ private static FilterRef filterRef(String name) { return new FilterRefBuilder().withName(name).build(); } - private static GenericKubernetesResource protocolFilter(String name) { - return new GenericKubernetesResourceBuilder() + private static KafkaProtocolFilter protocolFilter(String name) { + return new KafkaProtocolFilterBuilder() .withApiVersion("filter.kroxylicious.io/v1alpha") .withKind("KafkaProtocolFilter") .withNewMetadata().withName(name) - .endMetadata().build(); + .endMetadata() + .build(); } - private void givenFiltersInContext(GenericKubernetesResource... resources) { - givenSecondaryResourcesInContext(GenericKubernetesResource.class, resources); + private void givenFiltersInContext(KafkaProtocolFilter... resources) { + givenSecondaryResourcesInContext(KafkaProtocolFilter.class, resources); } private void givenIngressesInContext(KafkaProxyIngress... ingresses) { @@ -344,8 +343,9 @@ private void givenClusterRefsInContext(KafkaService... clusterRefs) { givenSecondaryResourcesInContext(KafkaService.class, clusterRefs); } - private OngoingStubbing> givenSecondaryResourcesInContext(Class type, T... resources) { - return when(mockContext.getSecondaryResources(type)).thenReturn(Arrays.stream(resources).collect(Collectors.toSet())); + @SafeVarargs + private void givenSecondaryResourcesInContext(Class type, T... resources) { + when(mockContext.getSecondaryResources(type)).thenReturn(Arrays.stream(resources).collect(Collectors.toSet())); } private static VirtualKafkaCluster virtualCluster(List filterRefs, String clusterRef, List ingressRefs) {