diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java new file mode 100644 index 00000000000..1e420713faa --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/PrincipalLoggingFilter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.rest; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.security.Principal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.confluent.kafka.schemaregistry.utils.PrincipalContext; + +/** +* This class is a servlet filter that logs the user principal for each incoming request to +* Schema Registry. It is a necessary step to allow for building resource associations +*/ +public class PrincipalLoggingFilter implements Filter { + + private static final Logger log = LoggerFactory.getLogger(PrincipalLoggingFilter.class.getName()); + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + } + + @Override + public void doFilter(ServletRequest request, ServletResponse servletResponse, + FilterChain filterChain) throws IOException, ServletException { + HttpServletRequest req = (HttpServletRequest) request; + Principal principal = req.getUserPrincipal(); + + if (principal != null) { + log.info("User Principal: {}", principal.getName()); + PrincipalContext.setPrincipal(principal.getName()); + } else { + log.info("No User Principal found for the request."); + PrincipalContext.clear(); + } + + try { + filterChain.doFilter(request, servletResponse); + } finally { + PrincipalContext.clear(); // Clear the principal after the request is processed + } + } + + @Override + public void destroy() { + } +} diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index 6ddf2d1d4dd..c4d58accbee 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -35,14 +35,17 @@ import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; import io.confluent.rest.Application; import io.confluent.rest.RestConfigException; +import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.resource.ResourceCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.DispatcherType; import javax.ws.rs.core.Configurable; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,6 +62,10 @@ public SchemaRegistryRestApplication(Properties props) throws RestConfigExceptio @Override protected void configurePreResourceHandling(ServletContextHandler context) { super.configurePreResourceHandling(context); + PrincipalLoggingFilter principalLoggingFilter = new PrincipalLoggingFilter(); + FilterHolder filterHolder = new FilterHolder(principalLoggingFilter); + filterHolder.setName("PrincipalLoggingFilter"); + context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); context.setErrorHandler(new JsonErrorHandler()); // This handler runs before first Session, Security or ServletHandler context.insertHandler(new RequestHeaderHandler()); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index d049034ba1a..89a12bf4dcf 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -84,6 +84,7 @@ import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException; import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; +import io.confluent.kafka.schemaregistry.utils.PrincipalContext; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; import io.confluent.rest.NamedURI; import io.confluent.rest.RestConfig; @@ -658,6 +659,8 @@ public Schema register(String subject, RegisterSchemaRequest request, boolean no schema = new Schema(subject, version, schema.getId(), newSchema); } + logResourceAssociation(schema); + return register(subject, schema, normalize, request.doPropagateSchemaTags()); } catch (IllegalArgumentException e) { throw new InvalidSchemaException(e); @@ -1145,6 +1148,7 @@ public void deleteSchemaVersion(String subject, } else { kafkaStore.put(key, null); } + logResourceAssociation(schema); } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { @@ -1298,6 +1302,7 @@ public Schema lookUpSchemaUnderSubject( if (schema == null) { return null; } + logResourceAssociation(schema); Config config = getConfigInScope(subject); Schema existingSchema = lookUpSchemaUnderSubject( config, subject, schema, normalize, lookupDeletedSchema, false); @@ -1715,6 +1720,9 @@ public Schema get(String subject, int version, boolean returnDeletedSchema) if (schemaValue != null && (!schemaValue.isDeleted() || returnDeletedSchema)) { schema = toSchemaEntity(schemaValue); } + if (schema != null) { + logResourceAssociation(schema); + } return schema; } } @@ -1887,6 +1895,7 @@ public List listVersionsForId(int id, String subject, boolean lo if (schema == null) { return null; } + logResourceAssociation(toSchemaEntity(schema)); SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(toSchemaEntity(schema)); @@ -2626,6 +2635,12 @@ private static boolean shouldInclude(boolean isDeleted, LookupFilter filter) { } } + private void logResourceAssociation(Schema schema) { + String schemaHash = MD5.ofSchema(schema).toHexString(); + String principal = PrincipalContext.getPrincipal(); + log.info("Resource association log - (Principal, schemaHash): ({}, {})", principal, schemaHash); + } + @Override public SchemaRegistryConfig config() { return config; diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java index 1692ccf1fb0..b889c52a99a 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/MD5.java @@ -81,4 +81,16 @@ public boolean equals(Object o) { MD5 otherMd5 = (MD5) o; return Arrays.equals(this.md5, otherMd5.md5); } + + public String toHexString() { + StringBuilder hexString = new StringBuilder(); + for (byte b : md5) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java b/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java new file mode 100644 index 00000000000..458b21549f1 --- /dev/null +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/utils/PrincipalContext.java @@ -0,0 +1,32 @@ +/* + * Copyright 2025 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafka.schemaregistry.utils; + +public class PrincipalContext { + private static final ThreadLocal principalHolder = new ThreadLocal<>(); + + public static void setPrincipal(String principal) { + principalHolder.set(principal); + } + + public static String getPrincipal() { + return principalHolder.get(); + } + + public static void clear() { + principalHolder.remove(); + } +} \ No newline at end of file