diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java index e896ca9d2fb..94f2c5f791f 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java @@ -24,16 +24,28 @@ import com.google.common.collect.Lists; import io.confluent.kafka.schemaregistry.ParsedSchemaHolder; import io.confluent.kafka.schemaregistry.SimpleParsedSchemaHolder; +import io.confluent.kafka.schemaregistry.client.rest.entities.Association; import io.confluent.kafka.schemaregistry.client.rest.entities.Config; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment; +import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateInfo; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateRequest; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationInfo; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationResponse; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import java.util.LinkedHashSet; import java.util.SortedMap; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.Arrays; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +54,6 @@ import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; -import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; -import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; @@ -70,6 +79,13 @@ public class MockSchemaRegistryClient implements SchemaRegistryClient { private static final int DEFAULT_CAPACITY = 1000; private static final String WILDCARD = "*"; + private static final String DEFAULT_RESOURCE_TYPE = "topic"; + private static final String DEFAULT_ASSOCIATION_TYPE = "value"; + private static final LifecyclePolicy DEFAULT_LIFECYCLE_POLICY = LifecyclePolicy.STRONG; + private static final Map> RESOURCE_TYPE_TO_ASSOC_TYPE_MAP = + new HashMap>() {{ + put(DEFAULT_RESOURCE_TYPE, Arrays.asList("key", DEFAULT_ASSOCIATION_TYPE)); + }}; private Config defaultConfig = new Config("BACKWARD"); private final Map> schemaToResponseCache; @@ -78,6 +94,9 @@ public class MockSchemaRegistryClient implements SchemaRegistryClient { private final Map guidToSchemaCache; private final Map> schemaToVersionCache; private final Map configCache; + private final Map> subjectToAssocCache; + private final Map resourceAndAssocTypeCache; + private final Map> resourceIdToAssocCache; private final Map modes; private final Map ids; private final LoadingCache parsedSchemaCache; @@ -85,6 +104,39 @@ public class MockSchemaRegistryClient implements SchemaRegistryClient { private static final String NO_SUBJECT = ""; + private static class ResourceAndAssocType { + String resourceId; + String resourceType; + String associationType; + + public ResourceAndAssocType(String resourceId, + String resourceType, + String associationType) { + this.resourceId = resourceId; + this.resourceType = resourceType; + this.associationType = associationType; + } + + @Override + public int hashCode() { + return Objects.hash(resourceId, resourceType, associationType); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ResourceAndAssocType other = (ResourceAndAssocType) obj; + return Objects.equals(resourceId, other.resourceId) + && Objects.equals(resourceType, other.resourceType) + && Objects.equals(associationType, other.associationType); + } + } + public MockSchemaRegistryClient() { this(null); } @@ -96,6 +148,9 @@ public MockSchemaRegistryClient(List providers) { guidToSchemaCache = new ConcurrentHashMap<>(); schemaToVersionCache = new ConcurrentHashMap<>(); configCache = new ConcurrentHashMap<>(); + subjectToAssocCache = new ConcurrentHashMap<>(); + resourceAndAssocTypeCache = new ConcurrentHashMap<>(); + resourceIdToAssocCache = new ConcurrentHashMap<>(); modes = new ConcurrentHashMap<>(); ids = new ConcurrentHashMap<>(); this.providers = providers != null && !providers.isEmpty() @@ -545,6 +600,14 @@ private List allVersions(String subject) { return allVersions; } + private int latestVersion(String subject) { + List versions = allVersions(subject); + if (versions.isEmpty()) { + return -1; + } + return versions.get(versions.size() - 1); + } + @Override public boolean testCompatibility(String subject, ParsedSchema newSchema) throws IOException, RestClientException { @@ -636,13 +699,26 @@ public synchronized List deleteSubject( String subject, boolean isPermanent) throws IOException, RestClientException { + // make sure this subject has no associations + List associations = getAssociationsBySubject(subject, null, null, null, 0, -1); + if (!associations.isEmpty()) { + throw new RestClientException("Associations found", 409, 40921); + } + return deleteSubjectNoAssociationsCheck(requestProperties, subject, isPermanent); + } + + private List deleteSubjectNoAssociationsCheck( + Map requestProperties, + String subject, + boolean isPermanent) + throws IOException, RestClientException { schemaToResponseCache.remove(subject); idToSchemaCache.remove(subject); Map versions = schemaToVersionCache.remove(subject); configCache.remove(subject); return versions != null - ? versions.values().stream().sorted().collect(Collectors.toList()) - : Collections.emptyList(); + ? versions.values().stream().sorted().collect(Collectors.toList()) + : Collections.emptyList(); } @Override @@ -806,4 +882,352 @@ private static String toQualifiedContext(String subject) { QualifiedSubject.create(DEFAULT_TENANT, subject); return qualifiedSubject != null ? qualifiedSubject.toQualifiedContext() : NO_SUBJECT; } + + private boolean validateResourceTypeAndAssociationType(String resourceType, + String associationType) { + if (!RESOURCE_TYPE_TO_ASSOC_TYPE_MAP.containsKey(resourceType)) { + return false; + } + if (!RESOURCE_TYPE_TO_ASSOC_TYPE_MAP.get(resourceType).contains(associationType)) { + return false; + } + return true; + } + + private void validateAssociationCreateRequest(AssociationCreateRequest request) { + // Validate required fields + if (request.getResourceName() == null || request.getResourceName().isEmpty()) { + throw new IllegalArgumentException( + "reourceId, resourceName, resourceNamespace and associations can't be null or empty"); + } + if (request.getResourceNamespace() == null || request.getResourceNamespace().isEmpty()) { + throw new IllegalArgumentException( + "reourceId, resourceName, resourceNamespace and associations can't be null or empty"); + } + if (request.getResourceId() == null || request.getResourceId().isEmpty()) { + throw new IllegalArgumentException( + "reourceId, resourceName, resourceNamespace and associations can't be null or empty"); + } + if (request.getAssociations() == null) { + throw new IllegalArgumentException( + "reourceId, resourceName, resourceNamespace and associations can't be null or empty"); + } + // Set default resource type if not provided + if (request.getResourceType() == null || request.getResourceType().isEmpty()) { + request.setResourceType(DEFAULT_RESOURCE_TYPE); + } + // Validate each association + for (AssociationCreateInfo associationCreateInfo : request.getAssociations()) { + // Check subject is required + if (associationCreateInfo.getSubject() == null + || associationCreateInfo.getSubject().isEmpty()) { + throw new IllegalArgumentException("subject in the association can't be null or empty"); + } + + // Set default association type if not provided + if (associationCreateInfo.getAssociationType() == null + || associationCreateInfo.getAssociationType().isEmpty()) { + associationCreateInfo.setAssociationType(DEFAULT_ASSOCIATION_TYPE); + } + + // Validate resource type and association type + if (!validateResourceTypeAndAssociationType( + request.getResourceType(), associationCreateInfo.getAssociationType())) { + throw new IllegalArgumentException( + String.format("resourceType {} and associationType {} don't match", + request.getResourceType(), associationCreateInfo.getAssociationType())); + } + + // Set default lifecycle if not provided + if (associationCreateInfo.getLifecycle() == null) { + associationCreateInfo.setLifecycle(DEFAULT_LIFECYCLE_POLICY); + } + + // The association can't be both weak and frozen + if (associationCreateInfo.getLifecycle() == LifecyclePolicy.WEAK + && associationCreateInfo.isFrozen()) { + throw new IllegalArgumentException("the association can't be both weak and frozen"); + } + } + } + + private synchronized void createAssociationsHelper(AssociationCreateRequest request) + throws IOException, RestClientException { + // Check that association types are unique + Map infosByType = new HashMap<>(); + for (AssociationCreateInfo info : request.getAssociations()) { + String associationType = info.getAssociationType(); + if (infosByType.containsKey(associationType)) { + throw new RestClientException( + String.format( + "The association specified an invalid value for property: %s", + associationType), + 422, 42212); + } + infosByType.put(associationType, info); + } + + // Make sure subject exists + for (AssociationCreateInfo associationInRequest : request.getAssociations()) { + String subject = associationInRequest.getSubject(); + int latestVersion = latestVersion(subject); + + if (associationInRequest.getSchema() == null && latestVersion < 0) { + throw new RestClientException( + String.format("No active (non-deleted) version exists for subject '%s", subject), + 409, 40907); + } + } + + // Find existing associations + for (AssociationCreateInfo associationInRequest : request.getAssociations()) { + ResourceAndAssocType key = new ResourceAndAssocType( + request.getResourceId(), + request.getResourceType(), + associationInRequest.getAssociationType() + ); + + Association existingAssociation = resourceAndAssocTypeCache.get(key); + + if (existingAssociation != null) { + // Association exists - must be equal + if (!existingAssociation.getResourceName().equals(request.getResourceName()) + || !existingAssociation.getResourceNamespace() + .equals(request.getResourceNamespace()) + || !existingAssociation.isEquivalent(associationInRequest)) { + throw new RestClientException( + String.format( + "An association of type '%s' already exists for resource '%s", + associationInRequest.getAssociationType(), + request.getResourceId()), 422, 42212); + } + } else { + // Check if subject can accept new association + String subject = associationInRequest.getSubject(); + List existingAssociations = subjectToAssocCache.get(subject); + + if (existingAssociations != null && !existingAssociations.isEmpty()) { + if (associationInRequest.getLifecycle() == LifecyclePolicy.STRONG) { + throw new RestClientException( + String.format( + "An association of type '%s', already exists for subject '%s", + associationInRequest.getAssociationType(), subject), 409, 40904); + } + if (existingAssociations.get(0).getLifecycle() == LifecyclePolicy.STRONG) { + throw new RestClientException( + String.format( + "A strong association of type '%s' already exists for subject '%s", + associationInRequest.getAssociationType(), subject), 409, 40905); + } + } + } + } + // Post all schemas + for (AssociationCreateInfo associationInRequest : request.getAssociations()) { + String subject = associationInRequest.getSubject(); + Schema schema = associationInRequest.getSchema(); + boolean normalize = associationInRequest.isNormalize(); + + if (schema != null) { + register(subject, parseSchema(schema).get(), normalize); + } + } + // Write associations to caches + for (AssociationCreateInfo associationInRequest : request.getAssociations()) { + ResourceAndAssocType key = new ResourceAndAssocType( + request.getResourceId(), + request.getResourceType(), + associationInRequest.getAssociationType() + ); + + Association newAssociation = new Association( + associationInRequest.getSubject(), + UUID.randomUUID().toString(), + request.getResourceName(), + request.getResourceNamespace(), + request.getResourceId(), + request.getResourceType(), + associationInRequest.getAssociationType(), + associationInRequest.getLifecycle(), + associationInRequest.isFrozen() + ); + + // Update caches + resourceAndAssocTypeCache.put(key, newAssociation); + + subjectToAssocCache.computeIfAbsent( + associationInRequest.getSubject(), + k -> new ArrayList<>() + ).add(newAssociation); + + resourceIdToAssocCache.computeIfAbsent( + request.getResourceId(), + k -> new ArrayList<>() + ).add(newAssociation); + } + } + + public AssociationResponse createAssociation(AssociationCreateRequest request) + throws IOException, RestClientException { + try { + validateAssociationCreateRequest(request); + } catch (Exception e) { + throw new RestClientException( + String.format( + "The association specified an invalid value for property, %s", + e.getMessage()), + 422, 42212); + } + createAssociationsHelper(request); + List infos = request.getAssociations().stream() + .map(associationCreateInfo -> + new AssociationInfo(associationCreateInfo.getSubject(), + associationCreateInfo.getAssociationType(), + associationCreateInfo.getLifecycle(), associationCreateInfo.isFrozen(), + associationCreateInfo.getSchema())).collect(Collectors.toList()); + AssociationResponse response = new AssociationResponse( + request.getResourceName(), request.getResourceNamespace(), + request.getResourceId(), request.getResourceType(), infos); + return response; + } + + public List getAssociationsBySubject(String subject, String resourceType, + List associationTypes, + String lifecycle, int offset, int limit) + throws IOException, RestClientException { + if (subject == null || subject.isEmpty()) { + throw new RestClientException("Association parameters are invalid", 422, 42212); + } + if (lifecycle != null) { + try { + LifecyclePolicy.valueOf(lifecycle); + } catch (IllegalArgumentException e) { + throw new RestClientException("Association parameters are invalid", 422, 42212); + } + } + + List associations = subjectToAssocCache.get(subject); + + if (associations == null || associations.isEmpty()) { + return new ArrayList<>(); // Return empty list + } + List filtered = associations.stream() + .filter(association -> + resourceType == null || association.getResourceType().equals(resourceType)) + .filter(association -> + associationTypes == null + || associationTypes.isEmpty() + || associationTypes.contains(association.getAssociationType())) + .filter(association -> + lifecycle == null || association.getLifecycle().toString().equals(lifecycle)) + .collect(Collectors.toList()); + + // Apply pagination + int start = offset; + if (start > filtered.size()) { + start = filtered.size(); + } + + int end = start + limit; + if (limit <= 0 || end > filtered.size()) { + end = filtered.size(); + } + return filtered.subList(start, end); + } + + public List getAssociationsByResourceId(String resourceId, String resourceType, + List associationTypes, + String lifecycle, int offset, int limit) + throws IOException, RestClientException { + if (resourceId == null || resourceId.isEmpty()) { + throw new RestClientException("Association parameters are invalid", 422, 42212); + } + List associations = resourceIdToAssocCache.get(resourceId); + if (lifecycle != null) { + try { + LifecyclePolicy.valueOf(lifecycle); + } catch (IllegalArgumentException e) { + throw new RestClientException("Association parameters are invalid", 422, 42212); + } + } + + if (associations == null || associations.isEmpty()) { + return new ArrayList<>(); // Return empty list + } + List filtered = associations.stream() + .filter(association -> + resourceType == null || association.getResourceType().equals(resourceType)) + .filter(association -> + associationTypes == null + || associationTypes.isEmpty() + || associationTypes.contains(association.getAssociationType())) + .filter(association -> + lifecycle == null || association.getLifecycle().toString().equals(lifecycle)) + .collect(Collectors.toList()); + + // Apply pagination + int start = offset; + if (start > filtered.size()) { + start = filtered.size(); + } + + int end = start + limit; + if (limit <= 0 || end > filtered.size()) { + end = filtered.size(); + } + return filtered.subList(start, end); + } + + private void checkDeleteAssociation(Association association, boolean cascadeLifecycle) + throws RestClientException { + if (!cascadeLifecycle && association.getLifecycle() == LifecyclePolicy.STRONG + && association.isFrozen()) { + throw new RestClientException(String.format( + "The association of type '%s' is frozen for subject '%s", + association.getAssociationType(), association.getSubject()), 409, 40908); + } + } + + private void deleteAssociation(Association association, boolean cascadeLifecycle) + throws IOException, RestClientException { + String subject = association.getSubject(); + String resourceId = association.getResourceId(); + if (cascadeLifecycle && association.getLifecycle() == LifecyclePolicy.STRONG) { + deleteSubjectNoAssociationsCheck(null, subject, false); + deleteSubjectNoAssociationsCheck(null, subject, true); + } + resourceIdToAssocCache.computeIfPresent(resourceId, (k, list) -> { + list.remove(association); + return list.isEmpty() ? null : list; + }); + subjectToAssocCache.computeIfPresent(subject, (k, list) -> { + list.remove(association); + return list.isEmpty() ? null : list; + }); + ResourceAndAssocType resourceAndAssocType = new ResourceAndAssocType( + association.getResourceId(), association.getResourceType(), + association.getAssociationType() + ); + resourceAndAssocTypeCache.remove(resourceAndAssocType); + } + + public synchronized void deleteAssociations(String resourceId, String resourceType, + List associationTypes, + boolean cascadeLifecycle) + throws IOException, RestClientException { + List associationsToDelete = getAssociationsByResourceId( + resourceId, resourceType, associationTypes, null, 0, -1); + + if (associationsToDelete == null || associationsToDelete.isEmpty()) { + return; + } + + for (Association associationToDelete : associationsToDelete) { + checkDeleteAssociation(associationToDelete, cascadeLifecycle); + } + + for (Association associationToDelete : associationsToDelete) { + deleteAssociation(associationToDelete, cascadeLifecycle); + } + } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java index 111079be4a2..70b86fd28f9 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java @@ -19,11 +19,14 @@ import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.DEFAULT_TENANT; import com.google.common.base.Ticker; +import io.confluent.kafka.schemaregistry.client.rest.entities.Association; import io.confluent.kafka.schemaregistry.client.rest.entities.Config; import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata; import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateRequest; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationResponse; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import java.io.Closeable; import java.io.IOException; @@ -403,4 +406,33 @@ public default Integer deleteSchemaVersion( @Override default void close() throws IOException {} + + public default AssociationResponse createAssociation(AssociationCreateRequest request) + throws IOException, RestClientException { + throw new UnsupportedOperationException(); + } + + public default List getAssociationsBySubject(String subject, + String resourceType, + List associationTypes, + String lifecycle, + int offset, int limit) + throws IOException, RestClientException { + throw new UnsupportedOperationException(); + } + + public default List getAssociationsByResourceId(String resourceId, + String resourceType, + List associationTypes, + String lifecycle, + int offset, int limit) + throws IOException, RestClientException { + throw new UnsupportedOperationException(); + } + + public default void deleteAssociations(String resourceId, String resourceType, + List associationTypes, boolean cascadeLifecycle) + throws IOException, RestClientException { + throw new UnsupportedOperationException(); + } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Association.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Association.java new file mode 100644 index 00000000000..ad69b0c106f --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Association.java @@ -0,0 +1,190 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateInfo; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationUpdateInfo; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class Association { + + private String subject; + private String guid; + private String resourceName; + private String resourceNamespace; + private String resourceId; + private String resourceType; + private String associationType; + private LifecyclePolicy lifecycle; + private boolean frozen; + + @JsonCreator + public Association(@JsonProperty("subject") String subject, + @JsonProperty("guid") String guid, + @JsonProperty("resourceName") String resourceName, + @JsonProperty("resourceNamespace") String resourceNamespace, + @JsonProperty("resourceId") String resourceId, + @JsonProperty("resourceType") String resourceType, + @JsonProperty("associationType") String associationType, + @JsonProperty("lifecycle") LifecyclePolicy lifecycle, + @JsonProperty("frozen") boolean frozen) { + this.subject = subject; + this.guid = guid; + this.resourceName = resourceName; + this.resourceNamespace = resourceNamespace; + this.resourceId = resourceId; + this.resourceType = resourceType; + this.associationType = associationType; + this.lifecycle = lifecycle; + this.frozen = frozen; + } + + @JsonProperty("subject") + public String getSubject() { + return subject; + } + + @JsonProperty("subject") + public void setSubject(String subject) { + this.subject = subject; + } + + @JsonProperty("guid") + public String getGuid() { + return guid; + } + + @JsonProperty("guid") + public void setGuid(String guid) { + this.guid = guid; + } + + @JsonProperty("resourceName") + public String getResourceName() { + return resourceName; + } + + @JsonProperty("resourceName") + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + @JsonProperty("resourceNamespace") + public String getResourceNamespace() { + return resourceNamespace; + } + + @JsonProperty("resourceNamespace") + public void setResourceNamespace(String resourceNamespace) { + this.resourceNamespace = resourceNamespace; + } + + @JsonProperty("resourceId") + public String getResourceId() { + return resourceId; + } + + @JsonProperty("resourceId") + public void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + @JsonProperty("resourceType") + public String getResourceType() { + return resourceType; + } + + @JsonProperty("resourceType") + public void setResourceType(String resourceType) { + this.resourceType = resourceType; + } + + @JsonProperty("associationType") + public String getAssociationType() { + return associationType; + } + + @JsonProperty("associationType") + public void setAssociationType(String associationType) { + this.associationType = associationType; + } + + @JsonProperty("lifecycle") + public LifecyclePolicy getLifecycle() { + return lifecycle; + } + + @JsonProperty("lifecycle") + public void setLifecycle(LifecyclePolicy lifecycle) { + this.lifecycle = lifecycle; + } + + @JsonProperty("frozen") + public boolean isFrozen() { + return frozen; + } + + @JsonProperty("frozen") + public void setFrozen(boolean frozen) { + this.frozen = frozen; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + Association that = (Association) o; + return frozen == that.frozen + && Objects.equals(subject, that.subject) + && Objects.equals(guid, that.guid) + && Objects.equals(resourceName, that.resourceName) + && Objects.equals(resourceNamespace, that.resourceNamespace) + && Objects.equals(resourceId, that.resourceId) + && Objects.equals(resourceType, that.resourceType) + && Objects.equals(associationType, that.associationType) + && lifecycle == that.lifecycle; + } + + @Override + public int hashCode() { + return Objects.hash( + subject, guid, resourceName, resourceNamespace, resourceId, + resourceType, associationType, lifecycle, frozen); + } + + public boolean isEquivalent(AssociationCreateInfo info) { + return Objects.equals(subject, info.getSubject()) + && Objects.equals(associationType, info.getAssociationType()) + && lifecycle == info.getLifecycle() + && frozen == info.isFrozen(); + } + + public boolean isEquivalent(AssociationUpdateInfo info) { + boolean associationTypeEquals = Objects.equals(associationType, info.getAssociationType()); + boolean lifecycleEquals = info.getLifecycle().isEmpty() + || info.getLifecycle().get() == getLifecycle(); + boolean frozenEquals = info.getFrozen().isEmpty() || info.getFrozen().get() == isFrozen(); + return associationTypeEquals && lifecycleEquals && frozenEquals; + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/LifecyclePolicy.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/LifecyclePolicy.java new file mode 100644 index 00000000000..24579bb7526 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/LifecyclePolicy.java @@ -0,0 +1,22 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities; + +public enum LifecyclePolicy { + STRONG, + WEAK +} diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateInfo.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateInfo.java new file mode 100644 index 00000000000..c6a062e7d2d --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateInfo.java @@ -0,0 +1,139 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationCreateInfo { + + private String subject; + private String associationType; + private LifecyclePolicy lifecycle; + private boolean frozen; + private Schema schema; + private boolean normalize; + + @JsonCreator + public AssociationCreateInfo( + @JsonProperty("subject") String subject, + @JsonProperty("associationType") String associationType, + @JsonProperty("lifecycle") LifecyclePolicy lifecycle, + @JsonProperty("frozen") boolean frozen, + @JsonProperty("schema") Schema schema, + @JsonProperty("normalize") boolean normalize) { + this.subject = subject; + this.associationType = associationType; + this.lifecycle = lifecycle; + this.frozen = frozen; + this.schema = schema; + this.normalize = normalize; + } + + @JsonProperty("subject") + public String getSubject() { + return subject; + } + + @JsonProperty("subject") + public void setSubject(String subject) { + this.subject = subject; + } + + @JsonProperty("associationType") + public String getAssociationType() { + return associationType; + } + + @JsonProperty("associationType") + public void setAssociationType(String associationType) { + this.associationType = associationType; + } + + @JsonProperty("lifecycle") + public LifecyclePolicy getLifecycle() { + return lifecycle; + } + + @JsonProperty("lifecycle") + public void setLifecycle(LifecyclePolicy lifecycle) { + this.lifecycle = lifecycle; + } + + @JsonProperty("frozen") + public boolean isFrozen() { + return frozen; + } + + @JsonProperty("frozen") + public void setFrozen(boolean frozen) { + this.frozen = frozen; + } + + @JsonProperty("schema") + public Schema getSchema() { + return schema; + } + + @JsonProperty("schema") + public void setSchema(Schema schema) { + this.schema = schema; + } + + @JsonProperty("normalize") + public boolean isNormalize() { + return normalize; + } + + @JsonProperty("normalize") + public void setNormalize(boolean normalize) { + this.normalize = normalize; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationCreateInfo that = (AssociationCreateInfo) o; + return frozen == that.frozen + && Objects.equals(subject, that.subject) + && Objects.equals(associationType, that.associationType) + && lifecycle == that.lifecycle + && Objects.equals(schema, that.schema) + && normalize == that.normalize; + } + + @Override + public int hashCode() { + return Objects.hash( + subject, associationType, lifecycle, frozen, schema, normalize); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateRequest.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateRequest.java new file mode 100644 index 00000000000..e7678ab5683 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateRequest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import static io.confluent.kafka.schemaregistry.client.rest.utils.RestValidation.checkName; +import static io.confluent.kafka.schemaregistry.client.rest.utils.RestValidation.checkSubject; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.IllegalPropertyException; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationCreateRequest { + + private static final String DEFAULT_RESOURCE_TYPE = "topic"; + private static final String DEFAULT_ASSOCIATION_TYPE = "value"; + private static final LifecyclePolicy DEFAULT_LIFECYCLE = LifecyclePolicy.STRONG; + + private String resourceName; + private String resourceNamespace; + private String resourceId; + private String resourceType; + private List associations; + + @JsonCreator + public AssociationCreateRequest( + @JsonProperty("resourceName") String resourceName, + @JsonProperty("resourceNamespace") String resourceNamespace, + @JsonProperty("resourceId") String resourceId, + @JsonProperty("resourceType") String resourceType, + @JsonProperty("associations") List associations) { + this.resourceName = resourceName; + this.resourceNamespace = resourceNamespace; + this.resourceId = resourceId; + this.resourceType = resourceType; + this.associations = associations; + } + + @JsonProperty("resourceName") + public String getResourceName() { + return resourceName; + } + + @JsonProperty("resourceName") + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + @JsonProperty("resourceNamespace") + public String getResourceNamespace() { + return resourceNamespace; + } + + @JsonProperty("resourceNamespace") + public void setResourceNamespace(String resourceNamespace) { + this.resourceNamespace = resourceNamespace; + } + + @JsonProperty("resourceId") + public String getResourceId() { + return resourceId; + } + + @JsonProperty("resourceId") + public void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + @JsonProperty("resourceType") + public String getResourceType() { + return resourceType; + } + + @JsonProperty("resourceType") + public void setResourceType(String resourceType) { + this.resourceType = resourceType; + } + + @JsonProperty("associations") + public List getAssociations() { + return associations; + } + + @JsonProperty("associations") + public void setAssociations(List associations) { + this.associations = associations; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationCreateRequest that = (AssociationCreateRequest) o; + return Objects.equals(resourceName, that.resourceName) + && Objects.equals(resourceNamespace, that.resourceNamespace) + && Objects.equals(resourceId, that.resourceId) + && Objects.equals(resourceType, that.resourceType) + && Objects.equals(associations, that.associations); + } + + @Override + public int hashCode() { + return Objects.hash( + resourceName, resourceNamespace, resourceId, resourceType, associations); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } + + public void validate() { + checkName(getResourceName(), "resourceName"); + checkName(getResourceNamespace(), "resourceNamespace"); + if (getResourceId() == null || getResourceId().isEmpty()) { + throw new IllegalPropertyException("resourceId", "cannot be null or empty"); + } + if (getResourceType() != null && !getResourceType().isEmpty()) { + checkName(getResourceType(), "resourceType"); + } else { + setResourceType(DEFAULT_RESOURCE_TYPE); + } + for (AssociationCreateInfo info : getAssociations()) { + checkSubject(info.getSubject()); + if (info.getAssociationType() != null && !info.getAssociationType().isEmpty()) { + checkName(info.getAssociationType(), "associationType"); + } else { + info.setAssociationType(DEFAULT_ASSOCIATION_TYPE); + } + if (info.getLifecycle() == LifecyclePolicy.WEAK) { + if (info.isFrozen()) { + throw new IllegalPropertyException( + "frozen", "association with lifecycle of WEAK cannot be frozen"); + } + } else if (info.getLifecycle() == null) { + info.setLifecycle(DEFAULT_LIFECYCLE); + } + } + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationInfo.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationInfo.java new file mode 100644 index 00000000000..7b49a939228 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationInfo.java @@ -0,0 +1,125 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationInfo { + + private String subject; + private String associationType; + private LifecyclePolicy lifecycle; + private boolean frozen; + private Schema schema; + + @JsonCreator + public AssociationInfo( + @JsonProperty("subject") String subject, + @JsonProperty("associationType") String associationType, + @JsonProperty("lifecycle") LifecyclePolicy lifecycle, + @JsonProperty("frozen") boolean frozen, + @JsonProperty("schema") Schema schema) { + this.subject = subject; + this.associationType = associationType; + this.lifecycle = lifecycle; + this.frozen = frozen; + this.schema = schema; + } + + @JsonProperty("subject") + public String getSubject() { + return subject; + } + + @JsonProperty("subject") + public void setSubject(String subject) { + this.subject = subject; + } + + @JsonProperty("associationType") + public String getAssociationType() { + return associationType; + } + + @JsonProperty("associationType") + public void setAssociationType(String associationType) { + this.associationType = associationType; + } + + @JsonProperty("lifecycle") + public LifecyclePolicy getLifecycle() { + return lifecycle; + } + + @JsonProperty("lifecycle") + public void setLifecycle(LifecyclePolicy lifecycle) { + this.lifecycle = lifecycle; + } + + @JsonProperty("frozen") + public boolean isFrozen() { + return frozen; + } + + @JsonProperty("frozen") + public void setFrozen(boolean frozen) { + this.frozen = frozen; + } + + @JsonProperty("schema") + public Schema getSchema() { + return schema; + } + + @JsonProperty("schema") + public void setSchema(Schema schema) { + this.schema = schema; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationInfo that = (AssociationInfo) o; + return frozen == that.frozen + && Objects.equals(subject, that.subject) + && Objects.equals(associationType, that.associationType) + && lifecycle == that.lifecycle + && Objects.equals(schema, that.schema); + } + + @Override + public int hashCode() { + return Objects.hash( + subject, associationType, lifecycle, frozen, schema); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResponse.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResponse.java new file mode 100644 index 00000000000..84c31cc14be --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResponse.java @@ -0,0 +1,124 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationResponse { + + private String resourceName; + private String resourceNamespace; + private String resourceId; + private String resourceType; + private List associations; + + @JsonCreator + public AssociationResponse( + @JsonProperty("resourceName") String resourceName, + @JsonProperty("resourceNamespace") String resourceNamespace, + @JsonProperty("resourceId") String resourceId, + @JsonProperty("resourceType") String resourceType, + @JsonProperty("associations") List associations) { + this.resourceName = resourceName; + this.resourceNamespace = resourceNamespace; + this.resourceId = resourceId; + this.resourceType = resourceType; + this.associations = associations; + } + + @JsonProperty("resourceName") + public String getResourceName() { + return resourceName; + } + + @JsonProperty("resourceName") + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + @JsonProperty("resourceNamespace") + public String getResourceNamespace() { + return resourceNamespace; + } + + @JsonProperty("resourceNamespace") + public void setResourceNamespace(String resourceNamespace) { + this.resourceNamespace = resourceNamespace; + } + + @JsonProperty("resourceId") + public String getResourceId() { + return resourceId; + } + + @JsonProperty("resourceId") + public void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + @JsonProperty("resourceType") + public String getResourceType() { + return resourceType; + } + + @JsonProperty("resourceType") + public void setResourceType(String resourceType) { + this.resourceType = resourceType; + } + + @JsonProperty("associations") + public List getAssociations() { + return associations; + } + + @JsonProperty("associations") + public void setAssociations(List associations) { + this.associations = associations; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationResponse that = (AssociationResponse) o; + return Objects.equals(resourceName, that.resourceName) + && Objects.equals(resourceNamespace, that.resourceNamespace) + && Objects.equals(resourceId, that.resourceId) + && Objects.equals(resourceType, that.resourceType) + && Objects.equals(associations, that.associations); + } + + @Override + public int hashCode() { + return Objects.hash( + resourceName, resourceNamespace, resourceId, resourceType, associations); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResult.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResult.java new file mode 100644 index 00000000000..2dcdc24ee36 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationResult.java @@ -0,0 +1,81 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.Objects; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationResult { + + private ErrorMessage error; + private AssociationResponse result; + + @JsonCreator + public AssociationResult( + @JsonProperty("error") ErrorMessage error, + @JsonProperty("result") AssociationResponse result) { + this.error = error; + this.result = result; + } + + @JsonProperty("error") + public ErrorMessage getError() { + return error; + } + + @JsonProperty("error") + public void setError(ErrorMessage error) { + this.error = error; + } + + @JsonProperty("result") + public AssociationResponse getResult() { + return result; + } + + @JsonProperty("result") + public void setResult(AssociationResponse result) { + this.result = result; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationResult that = (AssociationResult) o; + return Objects.equals(error, that.error) + && Objects.equals(result, that.result); + } + + @Override + public int hashCode() { + return Objects.hash(error, result); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationUpdateInfo.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationUpdateInfo.java new file mode 100644 index 00000000000..a80b06e72c8 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationUpdateInfo.java @@ -0,0 +1,96 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.entities.requests; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; +import io.confluent.kafka.schemaregistry.utils.JacksonMapper; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AssociationUpdateInfo { + + private String associationType; + private Optional lifecycle; + private Optional frozen; + + @JsonCreator + public AssociationUpdateInfo( + @JsonProperty("associationType") String associationType, + @JsonProperty("lifecycle") Optional lifecycle, + @JsonProperty("frozen") Optional frozen) { + this.associationType = associationType; + this.lifecycle = lifecycle; + this.frozen = frozen; + } + + @JsonProperty("associationType") + public String getAssociationType() { + return associationType; + } + + @JsonProperty("associationType") + public void setAssociationType(String associationType) { + this.associationType = associationType; + } + + @JsonProperty("lifecycle") + public Optional getLifecycle() { + return lifecycle; + } + + @JsonProperty("lifecycle") + public void setLifecycle(Optional lifecycle) { + this.lifecycle = lifecycle; + } + + @JsonProperty("frozen") + public Optional getFrozen() { + return frozen; + } + + @JsonProperty("frozen") + public void setFrozen(Optional frozen) { + this.frozen = frozen; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + AssociationUpdateInfo that = (AssociationUpdateInfo) o; + return Objects.equals(frozen, that.frozen) + && Objects.equals(associationType, that.associationType) + && Objects.equals(lifecycle, that.lifecycle); + } + + @Override + public int hashCode() { + return Objects.hash(associationType, lifecycle, frozen); + } + + public String toJson() throws IOException { + return JacksonMapper.INSTANCE.writeValueAsString(this); + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/exceptions/IllegalPropertyException.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/exceptions/IllegalPropertyException.java new file mode 100644 index 00000000000..19e0e587430 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/exceptions/IllegalPropertyException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.exceptions; + +public class IllegalPropertyException extends RuntimeException { + + private final String propertyName; + private final String detail; + + public IllegalPropertyException(final String propertyName, final String detail) { + super("property: " + propertyName + "; detail: " + detail); + this.propertyName = propertyName; + this.detail = detail; + } + + public String getPropertyName() { + return propertyName; + } + + public String getDetail() { + return detail; + } +} \ No newline at end of file diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/utils/RestValidation.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/utils/RestValidation.java new file mode 100644 index 00000000000..753008c0474 --- /dev/null +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/utils/RestValidation.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client.rest.utils; + +import com.google.common.base.CharMatcher; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.IllegalPropertyException; + +public class RestValidation { + + public static final int NAME_MAX_LENGTH = 256; + + public static void checkName(String name, String propertyName) { + if (name == null || name.isEmpty()) { + throw new IllegalPropertyException(propertyName, "cannot be null or empty"); + } + if (name.length() > NAME_MAX_LENGTH) { + throw new IllegalPropertyException(propertyName, "exceeds max length of " + NAME_MAX_LENGTH); + } + char first = name.charAt(0); + if (!(Character.isLetter(first) || first == '_')) { + throw new IllegalPropertyException(propertyName, "must start with a letter or underscore"); + } + for (int i = 1; i < name.length(); i++) { + char c = name.charAt(i); + if (!(Character.isLetterOrDigit(c) || c == '_' || c == '-')) { + throw new IllegalPropertyException(propertyName, "illegal character '" + c + "'"); + } + } + } + + public static void checkSubject(String subject) { + if (subject == null || subject.isEmpty() + || CharMatcher.javaIsoControl().matchesAnyOf(subject)) { + throw new IllegalPropertyException("subject", "must not be empty"); + } + } +} diff --git a/client/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java b/client/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java new file mode 100644 index 00000000000..869a87b4546 --- /dev/null +++ b/client/src/test/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClientTest.java @@ -0,0 +1,792 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.client; + +import io.confluent.kafka.schemaregistry.client.rest.entities.Association; +import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateInfo; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateRequest; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationInfo; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationResponse; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class MockSchemaRegistryClientTest { + private SchemaRegistryClient client; + + @Before + public void setUp() { + this.client = new MockSchemaRegistryClient(); + } + + @Test + public void testAssociationCreateRequestValidationLogic() { + // Pre-create subjects used for testing + try { + client.register("testKey", new AvroSchema("{\"type\": \"string\"}"), true); + client.register("testValue", new AvroSchema("{\"type\": \"string\"}"), true); + } catch (Exception e) { + assertNull("Schema registration should succeed.", e); + } + + AssociationCreateInfo createInfo1 = new AssociationCreateInfo( + "testKey", "key", null, false, null, false); + AssociationCreateInfo createInfo2 = new AssociationCreateInfo( + "testValue", "value", null, false, null, false); + + // Invalid requests + List invalidRequests = new ArrayList<>(); + + // No resource name + invalidRequests.add(new AssociationCreateRequest( + null, "lkc1", "test-id", "topic", + Arrays.asList(createInfo1, createInfo2))); + + // No resource namespace + invalidRequests.add(new AssociationCreateRequest( + "test", null, "test-id", "topic", + Arrays.asList(createInfo1, createInfo2))); + + // No resource id + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", null, "topic", + Arrays.asList(createInfo1, createInfo2))); + + // No associations + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic", null)); + + // No subject name in AssociationCreateInfo + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic", + Collections.singletonList( + new AssociationCreateInfo(null, "value", null, false, null, false) + ))); + + // Unsupported ResourceType + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic2", + Arrays.asList(createInfo1, createInfo2))); + + // Unsupported AssociationType + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic", + Collections.singletonList( + new AssociationCreateInfo("testValue", "value2", null, false, null, false) + ))); + + // Duplicate AssociationType in the request + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic", + Arrays.asList( + new AssociationCreateInfo("testKey", "value", null, false, null, false), + new AssociationCreateInfo("testValue", "value", null, false, null, false) + ))); + + // Weak association with frozen to be true + invalidRequests.add(new AssociationCreateRequest( + "test", "lkc1", "test-id", "topic", + Collections.singletonList( + new AssociationCreateInfo("testValue", null, LifecyclePolicy.WEAK, true, null, false) + ))); + + // Test all invalid requests - they should throw exceptions + for (AssociationCreateRequest invalidRequest : invalidRequests) { + try { + client.createAssociation(invalidRequest); + fail("Expected exception for invalid request"); + } catch (Exception e) { + // Expected - validation should fail + assertNotNull("Error should not be null", e); + } + } + + // Minimum valid request + AssociationCreateRequest createRequest = new AssociationCreateRequest( + "test", "lkc1", "test-id", null, + Collections.singletonList( + new AssociationCreateInfo("testValue", null, null, false, null, false) + )); + AssociationResponse createResponse = null; + try { + createResponse = client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("Error should be null", e); + } + + // Assertions + assertNotNull("Response should not be null", createResponse); + assertEquals("ResourceName should match", + createRequest.getResourceName(), createResponse.getResourceName()); + assertEquals("ResourceNamespace should match", + createRequest.getResourceNamespace(), createResponse.getResourceNamespace()); + assertEquals("ResourceId should match", + createRequest.getResourceId(), createResponse.getResourceId()); + assertEquals("ResourceType should be 'topic'", + "topic", createResponse.getResourceType()); + assertEquals("Should have 1 association", + 1, createResponse.getAssociations().size()); + + AssociationInfo association = createResponse.getAssociations().get(0); + assertEquals("Subject should match", + createRequest.getAssociations().get(0).getSubject(), association.getSubject()); + assertEquals("AssociationType should be 'value'", + "value", association.getAssociationType()); + assertEquals("Lifecycle should be STRONG", + LifecyclePolicy.STRONG, association.getLifecycle()); + assertFalse("Frozen should be false", association.isFrozen()); + assertNull("Schema should be null", association.getSchema()); + //assertFalse("Normalize should be false", association.isNormalize()); + } + + @Test + public void testCreateOneAssociationInCreateRequest() { + // Pre-create subjects + String testValueSubject = "testValue"; + AvroSchema schemaInfo = new AvroSchema( + "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}]}"); + try { + client.register(testValueSubject, schemaInfo, true); + } catch (Exception e) { + assertNull("Schema registration should succeed.", e); + } + + // Make an association with an existing subject without new schema + AssociationCreateRequest createRequest = new AssociationCreateRequest( + "test", "lkc1", "test-id", null, + Collections.singletonList(new AssociationCreateInfo(testValueSubject, null, + null, false, null, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + // Create association request is idempotent. Re-issue the same create request should succeed. + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + // Re-issue the same request with different association property (except schema) will error out. + createRequest = new AssociationCreateRequest( + "test", "lkc1", "test-id", null, + Collections.singletonList( + new AssociationCreateInfo(testValueSubject, null, LifecyclePolicy.WEAK, false, null, false) + )); + + try { + client.createAssociation(createRequest); + fail("Expected exception - existing association gets modified"); + } catch (Exception e) { + assertNotNull("Error should not be null", e); + } + + // Make an association with an existing subject with new schema + Schema updatedSchema = new Schema(null, null, null, null, null, + "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}, {\"type\":\"string\",\"name\":\"id2\"}]}"); + createRequest = new AssociationCreateRequest( + "test", "lkc1", "test-id", null, + Collections.singletonList( + new AssociationCreateInfo(testValueSubject, null, null, false, updatedSchema, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + // Make an association with a new subject without new schema. Test should fail. + testValueSubject = "testValue2"; + createRequest = new AssociationCreateRequest( + "test2", "lkc1", "test-id2", null, + Collections.singletonList( + new AssociationCreateInfo(testValueSubject, null, null, false, null, false) + )); + + try { + client.createAssociation(createRequest); + fail("Expected exception - new subject without schema"); + } catch (Exception e) { + assertNotNull("Error should not be null", e); + } + + // Make an association with a new subject with new schema + testValueSubject = "testValue2"; + createRequest = new AssociationCreateRequest( + "test2", "lkc1", "test-id2", null, + Collections.singletonList( + new AssociationCreateInfo(testValueSubject, null, null, false, updatedSchema, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + } + + @Test + public void testCreateMultipleAssociationsInCreateRequest() { + String schemaString = + "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}]}"; + + AvroSchema schemaInfo = new AvroSchema(schemaString); + + // Scenario 1: Both associations using existing subjects + String keySubject = "test1Key"; + String valueSubject = "test1Value"; + String resourceName = "test1"; + String resourceID = "test1-id"; + + // Pre-create subjects + try { + client.register(keySubject, schemaInfo, true); + client.register(valueSubject, schemaInfo, true); + } catch (Exception e) { + assertNull("Schema registration should succeed.", e); + } + + AssociationCreateRequest createRequest = new AssociationCreateRequest( + resourceName, "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", null, false, null, false), + new AssociationCreateInfo(valueSubject, "value", null, false, null, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + // Scenario 2: One using existing subject, one creating new subject + keySubject = "test2Key"; + valueSubject = "test2Value"; + resourceName = "test2"; + resourceID = "test2-id"; + + try { + client.register(keySubject, schemaInfo, true); + } catch (Exception e) { + assertNull("Schema registration should succeed.", e); + } + + Schema schema = new Schema(null, null, null, "AVRO", Collections.emptyList(), schemaString); + + createRequest = new AssociationCreateRequest( + resourceName, "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", null, false, null, false), + new AssociationCreateInfo(valueSubject, "value", null, false, schema, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + // Scenario 3: Both creating new subjects + keySubject = "test3Key"; + valueSubject = "test3Value"; + resourceName = "test3"; + resourceID = "test3-id"; + + createRequest = new AssociationCreateRequest( + resourceName, "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", null, false, schema, false), + new AssociationCreateInfo(valueSubject, "value", null, false, schema, false) + )); + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + } + + @Test + public void testCreateStrongAndWeakAssociationsForTheSameSubject() { + // Resources for testing + Resource resourceFoo = new Resource("foo", "lkc1", "id-foo", "topic"); + Resource resourceBar = new Resource("bar", "lkc1", "id-bar", "topic"); + String fooValueSubject = "fooValue"; + String value = "value"; + + // Pre-create subject + String schemaString = + "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}]}"; + AvroSchema schemaInfo = new AvroSchema(schemaString); + try { + client.register(fooValueSubject, schemaInfo, true); + } catch (Exception e) { + assertNull("Schema registration should succeed.", e); + } + + // Scenario 1: Same subject, Foo=STRONG, Bar=STRONG -> Bar should fail + AssociationCreateRequest fooRequest = generateAssociationCreateRequest( + resourceFoo, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.STRONG, false, null, false) + ); + try { + client.createAssociation(fooRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + List result = null; + try { + result = client.getAssociationsByResourceId( + resourceFoo.getResourceId(), null, null, null, 0, -1); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + assertEquals(1, result.size()); + assertNotNull(result.get(0).getGuid()); + assertFalse(result.get(0).getGuid().isEmpty()); + + AssociationCreateRequest barRequest = generateAssociationCreateRequest( + resourceBar, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.STRONG, false, null, false) + ); + + try { + client.createAssociation(barRequest); + fail("Expected exception - cannot create strong association when subject already has strong"); + } catch (Exception e) { + assertNotNull(e); + } + + // Scenario 2: Foo=STRONG, Bar=WEAK -> Bar should fail + barRequest = generateAssociationCreateRequest( + resourceBar, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.WEAK, false, null, false) + ); + + try { + client.createAssociation(barRequest); + fail("Expected exception - cannot create weak when subject has strong"); + } catch (Exception e) { + assertNotNull(e); + } + + // Scenario 3: Foo=WEAK, Bar=STRONG -> Bar should fail + // Delete Foo association without cascade + try { + client.deleteAssociations(fooRequest.getResourceId(), null, null, false); + } catch (Exception e) { + assertNull("AssociationDeleteRequest should succeed.", e); + } + List associations = null; + try { + associations = client.getAssociationsByResourceId( + resourceFoo.getResourceId(), null, null, null, 0, -1); + } catch (Exception e) { + assertNull("getAssociationsByResourceId should succeed.", e); + } + assertEquals(0, associations.size()); + + // Verify subject still exists + SchemaMetadata metadata = null; + try { + metadata = client.getLatestSchemaMetadata(fooValueSubject); + } catch (Exception e) { + assertNull("getLatestSchemaMetadata should succeed.", e); + } + assertTrue(metadata.getId() > 0); + + // Create Foo weak association + fooRequest = generateAssociationCreateRequest( + resourceFoo, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.WEAK, false, null, false) + ); + try { + client.createAssociation(fooRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + try { + result = client.getAssociationsByResourceId( + resourceFoo.getResourceId(), null, null, null, 0, -1); + } catch (Exception e) { + assertNull("getAssociationsByResourceId succeed.", e); + } + assertEquals(1, result.size()); + assertNotNull(result.get(0).getGuid()); + assertFalse(result.get(0).getGuid().isEmpty()); + + // Try to create Bar strong - should fail + barRequest = generateAssociationCreateRequest( + resourceBar, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.STRONG, false, null, false) + ); + + try { + client.createAssociation(barRequest); + fail("Expected exception - cannot create strong when subject has weak"); + } catch (Exception e) { + assertNotNull(e); + } + + // Scenario 4: Foo=WEAK, Bar=WEAK -> Bar should succeed + barRequest = generateAssociationCreateRequest( + resourceBar, + new AssociationCreateInfo(fooValueSubject, value, LifecyclePolicy.WEAK, false, null, false) + ); + try { + client.createAssociation(barRequest); + } catch (Exception e) { + assertNull("AssociationCreateRequest should succeed.", e); + } + + try { + result = client.getAssociationsByResourceId( + resourceBar.getResourceId(), null, null, null, 0, -1); + } catch (Exception e) { + assertNull("getAssociationsByResourceId should succeed.", e); + } + assertEquals(1, result.size()); + assertNotNull(result.get(0).getGuid()); + assertFalse(result.get(0).getGuid().isEmpty()); + + // Verify subject has 2 associations + try { + result = client.getAssociationsBySubject(fooValueSubject, null, null, null, 0, -1); + } catch (Exception e) { + assertNull("getAssociationsBySubject should succeed.", e); + } + assertEquals(2, result.size()); + } + + // Helper method + private AssociationCreateRequest generateAssociationCreateRequest( + Resource resource, AssociationCreateInfo info) { + return new AssociationCreateRequest( + resource.getResourceName(), + resource.getResourceNamespace(), + resource.getResourceId(), + resource.getResourceType(), + Collections.singletonList(info) + ); + } + + // Helper class + private static class Resource { + private final String resourceName; + private final String resourceNamespace; + private final String resourceId; + private final String resourceType; + + public Resource(String name, String namespace, String id, String type) { + this.resourceName = name; + this.resourceNamespace = namespace; + this.resourceId = id; + this.resourceType = type; + } + + public String getResourceName() { + return resourceName; + } + + public String getResourceNamespace() { + return resourceNamespace; + } + + public String getResourceId() { + return resourceId; + } + + public String getResourceType() { + return resourceType; + } + } + + @Test + public void testGetAssociationsWithFilters() { + // Setup + String keySubject = "test1Key"; + String valueSubject = "test1Value"; + String resourceName = "test1"; + String resourceID = "test1-id"; + + String schemaString = + "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}]}"; + + Schema schema = new Schema( + null, null, null, null, + Collections.emptyList(), + schemaString + ); + + // Create associations: key=STRONG, value=WEAK + AssociationCreateRequest createRequest = new AssociationCreateRequest( + resourceName, "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo( + keySubject, "key", LifecyclePolicy.STRONG, false, schema, false), + new AssociationCreateInfo( + valueSubject, "value", LifecyclePolicy.WEAK, false, schema, false) + )); + + try { + client.createAssociation(createRequest); + } catch (Exception e) { + assertNotNull("createAssociation should succeed.", e); + } + + // Query by subject with lifecycle filter "weak" - should return error + List associations = null; + try { + associations = client.getAssociationsBySubject( + keySubject, null, Arrays.asList("key", "value"), "weak", 0, -1); + fail("getAssociationsBySubject with lower case lifecycle should fail."); + } catch (Exception e) { + assertNotNull("getAssociationsBySubject should return error.", e); + } + + // Query by subject with lifecycle filter "WEAK" - should return 0 + try { + associations = client.getAssociationsBySubject( + keySubject, null, Arrays.asList("key", "value"), "WEAK", 0, -1); + } catch (Exception e) { + assertNull("getAssociationsBySubject should succeed.", e); + } + assertEquals(0, associations.size()); + + // Query by subject with lifecycle filter "STRONG" - should return 1 + try { + associations = client.getAssociationsBySubject( + keySubject, null, Arrays.asList("key", "value"), "STRONG", 0, -1); + } catch (Exception e) { + assertNull("getAssociationsBySubject should succeed.", e); + } + assertEquals(1, associations.size()); + + // Query by resourceID without lifecycle filter - should return 2 + try { + associations = client.getAssociationsByResourceId( + resourceID, null, Arrays.asList("key", "value"), null, 0, -1); + } catch (Exception e) { + assertNull("getAssociationsByResourceId should succeed.", e); + } + assertEquals(2, associations.size()); + } + + @Test + public void testDeleteAssociation() { + String schemaString = "{\"namespace\":\"basicavro\",\"type\":\"record\",\"name\":\"Payment\"," + + "\"fields\":[{\"type\":\"string\",\"name\":\"id\"}]}"; + Schema schema = new Schema(null, null, null, null, Collections.emptyList(), schemaString); + + // Test 1: Delete with cascade=true + // Strong association subject should be deleted, weak should remain + testCascadeDelete(schema); + + // Test 2: Delete with cascade=false + // Both subjects should remain + testNoCascadeDelete(schema); + + // Test 3: Delete non-existing association + // Should succeed without error + testDeleteNonExisting(); + + // Test 4: Delete frozen association + // cascade=false should fail, cascade=true should delete strong only + testDeleteFrozenAndNonCascade(schema); + } + + + private void testCascadeDelete(Schema schema) { + String keySubject = "test1Key"; + String valueSubject = "test1Value"; + String resourceID = "test1-id"; + + // Create associations + try { + client.createAssociation(new AssociationCreateRequest( + "test1", "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", LifecyclePolicy.STRONG, false, schema, false), + new AssociationCreateInfo(valueSubject, "value", LifecyclePolicy.WEAK, false, schema, false) + ))); + } catch (Exception e) { + assertNull("createAssociation should succeed.", e); + } + + // Delete with cascade=true + try { + client.deleteAssociations(resourceID, null, null, true); + } catch (Exception e) { + assertNull("deleteAssociation should succeed.", e); + } + + // Key subject (STRONG) should be deleted + try { + SchemaMetadata metadata = client.getLatestSchemaMetadata(keySubject); + fail("Expected exception - key subject should be deleted"); + } catch (Exception e) { + assertNotNull(e); + String errorMsg = e.getMessage().toLowerCase(); + assertTrue("Error should contain 'not found'", errorMsg.contains("not found")); + } + + // Value subject (WEAK) should still exist + try { + SchemaMetadata valueMetadata = client.getLatestSchemaMetadata(valueSubject); + assertTrue("Value subject should still exist", valueMetadata.getId() > 0); + } catch (Exception e) { + assertNull("getLatestSchemaMetadata should succeed.", e); + } + + // Associations should be deleted + try { + List associations = client.getAssociationsByResourceId( + resourceID, null, null, null, 0, -1); + assertTrue("Associations should be empty", + associations == null || associations.isEmpty()); + } catch (Exception e) { + assertNull("getAssociationsByResourceId should succeed.", e); + } + } + + private void testNoCascadeDelete(Schema schema) { + String keySubject = "test2Key"; + String valueSubject = "test2Value"; + String resourceID = "test2-id"; + + // Create associations + try { + client.createAssociation(new AssociationCreateRequest( + "test2", "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", LifecyclePolicy.STRONG, false, schema, false), + new AssociationCreateInfo(valueSubject, "value", LifecyclePolicy.WEAK, false, schema, false) + ))); + } catch (Exception e) { + assertNull("createAssociation should succeed.", e); + } + + // Delete with cascade=false + try { + client.deleteAssociations(resourceID, null, null, false); + } catch (Exception e) { + assertNull("deleteAssociation should succeed.", e); + } + + // Both subjects should still exist + try { + SchemaMetadata keyMetadata = client.getLatestSchemaMetadata(keySubject); + assertTrue("Key subject should still exist", keyMetadata.getId() > 0); + } catch (Exception e) { + assertNull("getLatestSchemaMetadata should succeed.", e); + } + + try { + SchemaMetadata valueMetadata = client.getLatestSchemaMetadata(valueSubject); + assertTrue("Value subject should still exist", valueMetadata.getId() > 0); + } catch (Exception e) { + assertNull("getLatestSchemaMetadata should succeed.", e); + } + + // Associations should be deleted + try { + List associations = client.getAssociationsByResourceId( + resourceID, null, null, null, 0, -1); + assertTrue("Associations should be empty", + associations == null || associations.isEmpty()); + } catch (Exception e) { + assertNull("getAssociationsByResourceId should succeed.", e); + } + } + + private void testDeleteNonExisting() { + // Delete non-existing association should succeed + try { + client.deleteAssociations("non-existing-id", null, null, false); + } catch (Exception e) { + assertNull("deleteAssociation should succeed.", e); + } + // No exception = success + } + + private void testDeleteFrozenAndNonCascade(Schema schema) { + String keySubject = "test3Key"; + String valueSubject = "test3Value"; + String resourceID = "test3-id"; + + // Create associations with frozen=true for key + try { + client.createAssociation(new AssociationCreateRequest( + "test3", "lkc1", resourceID, null, + Arrays.asList( + new AssociationCreateInfo(keySubject, "key", LifecyclePolicy.STRONG, true, schema, false), + new AssociationCreateInfo(valueSubject, "value", LifecyclePolicy.WEAK, false, schema, false) + ))); + } catch (Exception e) { + assertNull("createAssociation should succeed.", e); + } + + // Delete with cascade=false should fail (frozen association) + try { + client.deleteAssociations(resourceID, null, null, false); + fail("Expected exception - cannot delete frozen association"); + } catch (Exception e) { + assertNotNull(e); + } + + // Delete with cascade=true should succeed + // Only STRONG (key) subject gets deleted, WEAK (value) remains + try { + client.deleteAssociations(resourceID, null, null, true); + } catch (Exception e) { + assertNull("deleteAssociation should succeed.", e); + } + + // Key subject should not exist + try { + client.getAllVersions(keySubject); + fail("Expected exception - key subject should be deleted"); + } catch (Exception e) { + assertNotNull(e); + } + + // Value subject should exist + try { + List valueVersions = client.getAllVersions(valueSubject); + assertNotNull("Value subject should exist", valueVersions); + assertFalse("Value subject should have versions", valueVersions.isEmpty()); + } catch (Exception e) { + assertNull("getAllVersions should succeed.", e); + } + } + +}