diff --git a/core/generated/swagger-ui/schema-registry-api-spec.yaml b/core/generated/swagger-ui/schema-registry-api-spec.yaml index 1f1762fd0cc..f623141aa8c 100644 --- a/core/generated/swagger-ui/schema-registry-api-spec.yaml +++ b/core/generated/swagger-ui/schema-registry-api-spec.yaml @@ -2045,7 +2045,7 @@ paths: summary: Get schema string by version description: Retrieves the schema for the specified version of this subject. Only the unescaped schema string is returned. - operationId: getSchemaOnly_2 + operationId: getSchemaOnly_1 parameters: - name: subject in: path @@ -2145,7 +2145,7 @@ paths: - Schemas (v1) summary: Get schema by ID description: Retrieves the schema identified by the input ID. - operationId: getSchemaOnly + operationId: getSchemaOnly_2 parameters: - name: id in: path @@ -2757,6 +2757,8 @@ components: type: array items: $ref: "#/components/schemas/Rule" + empty: + type: boolean description: Schema rule set SchemaEntity: type: object diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index b5118e7455a..8233b3dfff6 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -79,6 +79,7 @@ import io.confluent.kafka.schemaregistry.rest.handlers.CompositeUpdateRequestHandler; import io.confluent.kafka.schemaregistry.rest.handlers.UpdateRequestHandler; import io.confluent.kafka.schemaregistry.storage.encoder.MetadataEncoderService; +import io.confluent.kafka.schemaregistry.storage.encoder.MetadataEncoderServiceInterface; import io.confluent.kafka.schemaregistry.storage.exceptions.EntryTooLargeException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; @@ -142,7 +143,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg private final LookupCache lookupCache; // visible for testing final KafkaStore kafkaStore; - private final MetadataEncoderService metadataEncoder; + private final MetadataEncoderServiceInterface metadataEncoder; private RuleSetHandler ruleSetHandler; private final List updateRequestHandlers = new CopyOnWriteArrayList<>(); private final Serializer serializer; @@ -252,11 +253,15 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, this.lookupCache = lookupCache(); this.idGenerator = identityGenerator(config); this.kafkaStore = kafkaStore(config); - this.metadataEncoder = new MetadataEncoderService(this); + this.metadataEncoder = createMetadataEncoderService(); this.ruleSetHandler = new RuleSetHandler(); this.time = config.getTime(); } + protected MetadataEncoderServiceInterface createMetadataEncoderService() { + return new MetadataEncoderService(this); + } + @VisibleForTesting static SchemaRegistryIdentity getMyIdentity(NamedURI internalListener, boolean isEligibleForLeaderElector, SchemaRegistryConfig config) { @@ -350,7 +355,7 @@ public Serializer getSerializer() { } public MetadataEncoderService getMetadataEncoder() { - return metadataEncoder; + return (MetadataEncoderService) metadataEncoder; } public RuleSetHandler getRuleSetHandler() { @@ -719,6 +724,7 @@ public Schema register(String subject, boolean modifiedSchema = false; if (mode != Mode.IMPORT) { + log.info("Is mode!=import triggering decode?"); modifiedSchema = maybePopulateFromPrevious( config, schema, undeletedVersions, newVersion, propagateSchemaTags); } @@ -1766,6 +1772,7 @@ public SchemaString get( } public Schema toSchemaEntity(SchemaValue schemaValue) throws SchemaRegistryStoreException { + log.info("toSchemaEntity triggered decode"); metadataEncoder.decodeMetadata(schemaValue); return schemaValue.toSchemaEntity(); } @@ -2128,6 +2135,7 @@ private CloseableIterator allVersions( return filter(transform(kafkaStore.getAll(key1, key2), v -> { if (v instanceof SchemaValue) { try { + log.info("allVersions triggered decode"); metadataEncoder.decodeMetadata(((SchemaValue) v)); } catch (SchemaRegistryStoreException e) { log.error("Failed to decode metadata for schema id {}", ((SchemaValue) v).getId(), e); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java index bfaec5352cd..e30abc901c2 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderService.java @@ -62,7 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MetadataEncoderService implements Closeable { +public class MetadataEncoderService implements Closeable, MetadataEncoderServiceInterface { private static final Logger log = LoggerFactory.getLogger(MetadataEncoderService.class); @@ -271,7 +271,7 @@ public void encodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti return; } try { - transformMetadata(schema, true, (aead, value) -> { + transformMetadata(schema, true, true, (aead, value) -> { try { byte[] ciphertext = aead.encrypt(value.getBytes(StandardCharsets.UTF_8), EMPTY_AAD); return Base64.getEncoder().encodeToString(ciphertext); @@ -299,7 +299,7 @@ public void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti return; } try { - transformMetadata(schema, false, (aead, value) -> { + transformMetadata(schema, false, false, (aead, value) -> { try { byte[] plaintext = aead.decrypt(Base64.getDecoder().decode(value), EMPTY_AAD); return new String(plaintext, StandardCharsets.UTF_8); @@ -313,8 +313,8 @@ public void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti } } - private void transformMetadata( - SchemaValue schema, boolean isEncode, BiFunction func) + public void transformMetadata( + SchemaValue schema, boolean rotationNeeded, boolean isEncode, BiFunction func) throws SchemaRegistryStoreException { Metadata metadata = schema.getMetadata(); if (metadata == null @@ -333,7 +333,7 @@ private void transformMetadata( String tenant = qualifiedSubject.getTenant(); // Only create the encoder if we are encoding during writes and not decoding during reads - KeysetHandle handle = isEncode ? getOrCreateEncoder(tenant) : getEncoder(tenant); + KeysetHandle handle = isEncode ? getOrCreateEncoder(tenant, rotationNeeded) : getEncoder(tenant); if (handle == null) { throw new SchemaRegistryStoreException("Could not get encoder for tenant " + tenant); } @@ -366,14 +366,30 @@ private void transformMetadata( } } - private KeysetHandle getOrCreateEncoder(String tenant) { + public KeysetHandle getOrCreateEncoder(String tenant, Boolean rotationNeeded) { // Ensure encoders are up to date + if (encoders == null) { + throw new IllegalStateException("Encoders not initialized when we call getOrCreateEncoder"); + } encoders.sync(); + if (rotationNeeded) { + KeysetWrapper wrapper = encoders.compute(tenant, + (k, v) -> { + try { + KeysetHandle handle = KeysetHandle.generateNew(keyTemplate); + return new KeysetWrapper(handle, rotationNeeded); + } catch (GeneralSecurityException e) { + throw new IllegalStateException("Could not create key template"); + } + + }); + return wrapper.getKeysetHandle(); + } KeysetWrapper wrapper = encoders.computeIfAbsent(tenant, k -> { try { KeysetHandle handle = KeysetHandle.generateNew(keyTemplate); - return new KeysetWrapper(handle, false); + return new KeysetWrapper(handle, rotationNeeded); } catch (GeneralSecurityException e) { throw new IllegalStateException("Could not create key template"); } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceInterface.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceInterface.java new file mode 100644 index 00000000000..5ebbe951fc8 --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/encoder/MetadataEncoderServiceInterface.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.storage.encoder; + +import com.google.crypto.tink.Aead; +import com.google.crypto.tink.KeysetHandle; +import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; +import io.confluent.kafka.schemaregistry.storage.SchemaValue; +import io.kcache.Cache; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; + +public interface MetadataEncoderServiceInterface { + + void encodeMetadata(SchemaValue schema) throws SchemaRegistryStoreException; + + void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreException; + + void init(); + + void close(); + + void transformMetadata(SchemaValue schema, boolean rotationNeeded, boolean isEncode, + BiFunction func) + throws SchemaRegistryStoreException; + + KeysetHandle getOrCreateEncoder(String tenant, Boolean rotationNeeded); +}