Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/generated/swagger-ui/schema-registry-api-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2045,7 +2045,7 @@ paths:
summary: Get schema string by version
description: Retrieves the schema for the specified version of this subject.
Only the unescaped schema string is returned.
operationId: getSchemaOnly_2
operationId: getSchemaOnly_1
parameters:
- name: subject
in: path
Expand Down Expand Up @@ -2145,7 +2145,7 @@ paths:
- Schemas (v1)
summary: Get schema by ID
description: Retrieves the schema identified by the input ID.
operationId: getSchemaOnly
operationId: getSchemaOnly_2
parameters:
- name: id
in: path
Expand Down Expand Up @@ -2757,6 +2757,8 @@ components:
type: array
items:
$ref: "#/components/schemas/Rule"
empty:
type: boolean
description: Schema rule set
SchemaEntity:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.confluent.kafka.schemaregistry.rest.handlers.CompositeUpdateRequestHandler;
import io.confluent.kafka.schemaregistry.rest.handlers.UpdateRequestHandler;
import io.confluent.kafka.schemaregistry.storage.encoder.MetadataEncoderService;
import io.confluent.kafka.schemaregistry.storage.encoder.MetadataEncoderServiceInterface;
import io.confluent.kafka.schemaregistry.storage.exceptions.EntryTooLargeException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
// visible for testing
final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
private final MetadataEncoderService metadataEncoder;
private final MetadataEncoderServiceInterface metadataEncoder;
private RuleSetHandler ruleSetHandler;
private final List<UpdateRequestHandler> updateRequestHandlers = new CopyOnWriteArrayList<>();
private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
Expand Down Expand Up @@ -252,11 +253,15 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
this.lookupCache = lookupCache();
this.idGenerator = identityGenerator(config);
this.kafkaStore = kafkaStore(config);
this.metadataEncoder = new MetadataEncoderService(this);
this.metadataEncoder = createMetadataEncoderService();
this.ruleSetHandler = new RuleSetHandler();
this.time = config.getTime();
}

protected MetadataEncoderServiceInterface createMetadataEncoderService() {
return new MetadataEncoderService(this);
}

@VisibleForTesting
static SchemaRegistryIdentity getMyIdentity(NamedURI internalListener,
boolean isEligibleForLeaderElector, SchemaRegistryConfig config) {
Expand Down Expand Up @@ -350,7 +355,7 @@ public Serializer<SchemaRegistryKey, SchemaRegistryValue> getSerializer() {
}

public MetadataEncoderService getMetadataEncoder() {
return metadataEncoder;
return (MetadataEncoderService) metadataEncoder;
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider returning the service as a MetadataEncoderServiceInterface rather than casting to the concrete type to preserve abstraction.

Copilot uses AI. Check for mistakes.
}

public RuleSetHandler getRuleSetHandler() {
Expand Down Expand Up @@ -719,6 +724,7 @@ public Schema register(String subject,

boolean modifiedSchema = false;
if (mode != Mode.IMPORT) {
log.info("Is mode!=import triggering decode?");
modifiedSchema = maybePopulateFromPrevious(
config, schema, undeletedVersions, newVersion, propagateSchemaTags);
}
Expand Down Expand Up @@ -1766,6 +1772,7 @@ public SchemaString get(
}

public Schema toSchemaEntity(SchemaValue schemaValue) throws SchemaRegistryStoreException {
log.info("toSchemaEntity triggered decode");
metadataEncoder.decodeMetadata(schemaValue);
return schemaValue.toSchemaEntity();
}
Expand Down Expand Up @@ -2128,6 +2135,7 @@ private CloseableIterator<SchemaRegistryValue> allVersions(
return filter(transform(kafkaStore.getAll(key1, key2), v -> {
if (v instanceof SchemaValue) {
try {
log.info("allVersions triggered decode");
metadataEncoder.decodeMetadata(((SchemaValue) v));
} catch (SchemaRegistryStoreException e) {
log.error("Failed to decode metadata for schema id {}", ((SchemaValue) v).getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataEncoderService implements Closeable {
public class MetadataEncoderService implements Closeable, MetadataEncoderServiceInterface {

private static final Logger log = LoggerFactory.getLogger(MetadataEncoderService.class);

Expand Down Expand Up @@ -271,7 +271,7 @@ public void encodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti
return;
}
try {
transformMetadata(schema, true, (aead, value) -> {
transformMetadata(schema, true, true, (aead, value) -> {
try {
byte[] ciphertext = aead.encrypt(value.getBytes(StandardCharsets.UTF_8), EMPTY_AAD);
return Base64.getEncoder().encodeToString(ciphertext);
Expand Down Expand Up @@ -299,7 +299,7 @@ public void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti
return;
}
try {
transformMetadata(schema, false, (aead, value) -> {
transformMetadata(schema, false, false, (aead, value) -> {
try {
byte[] plaintext = aead.decrypt(Base64.getDecoder().decode(value), EMPTY_AAD);
return new String(plaintext, StandardCharsets.UTF_8);
Expand All @@ -313,8 +313,8 @@ public void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreExcepti
}
}

private void transformMetadata(
SchemaValue schema, boolean isEncode, BiFunction<Aead, String, String> func)
public void transformMetadata(
SchemaValue schema, boolean rotationNeeded, boolean isEncode, BiFunction<Aead, String, String> func)
throws SchemaRegistryStoreException {
Metadata metadata = schema.getMetadata();
if (metadata == null
Expand All @@ -333,7 +333,7 @@ private void transformMetadata(
String tenant = qualifiedSubject.getTenant();

// Only create the encoder if we are encoding during writes and not decoding during reads
KeysetHandle handle = isEncode ? getOrCreateEncoder(tenant) : getEncoder(tenant);
KeysetHandle handle = isEncode ? getOrCreateEncoder(tenant, rotationNeeded) : getEncoder(tenant);
if (handle == null) {
throw new SchemaRegistryStoreException("Could not get encoder for tenant " + tenant);
}
Expand Down Expand Up @@ -366,14 +366,30 @@ private void transformMetadata(
}
}

private KeysetHandle getOrCreateEncoder(String tenant) {
public KeysetHandle getOrCreateEncoder(String tenant, Boolean rotationNeeded) {
// Ensure encoders are up to date
if (encoders == null) {
throw new IllegalStateException("Encoders not initialized when we call getOrCreateEncoder");
}
encoders.sync();
if (rotationNeeded) {
KeysetWrapper wrapper = encoders.compute(tenant,
(k, v) -> {
try {
KeysetHandle handle = KeysetHandle.generateNew(keyTemplate);
return new KeysetWrapper(handle, rotationNeeded);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Could not create key template");
}

});
return wrapper.getKeysetHandle();
}
KeysetWrapper wrapper = encoders.computeIfAbsent(tenant,
k -> {
try {
KeysetHandle handle = KeysetHandle.generateNew(keyTemplate);
return new KeysetWrapper(handle, false);
return new KeysetWrapper(handle, rotationNeeded);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Could not create key template");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafka.schemaregistry.storage.encoder;

import com.google.crypto.tink.Aead;
import com.google.crypto.tink.KeysetHandle;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.kcache.Cache;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

public interface MetadataEncoderServiceInterface {
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding JavaDoc comments to the interface methods to better document expected behaviors and usage for implementers.

Copilot uses AI. Check for mistakes.

void encodeMetadata(SchemaValue schema) throws SchemaRegistryStoreException;

void decodeMetadata(SchemaValue schema) throws SchemaRegistryStoreException;

void init();

void close();

void transformMetadata(SchemaValue schema, boolean rotationNeeded, boolean isEncode,
BiFunction<Aead, String, String> func)
throws SchemaRegistryStoreException;

KeysetHandle getOrCreateEncoder(String tenant, Boolean rotationNeeded);
}