diff --git a/0001-Fix-Remove-environment-prefix-from-topic-names-befor.patch b/0001-Fix-Remove-environment-prefix-from-topic-names-befor.patch new file mode 100644 index 000000000..f2384f8cf --- /dev/null +++ b/0001-Fix-Remove-environment-prefix-from-topic-names-befor.patch @@ -0,0 +1,84 @@ +From 673d4a12f8196e100f131cc397cf76fd06f7e261 Mon Sep 17 00:00:00 2001 +From: Ashok B +Date: Wed, 5 Nov 2025 16:20:00 +0530 +Subject: [PATCH] Fix: Remove environment prefix from topic names before schema + lookup and update tests for consistency + +--- + .../builtin/sr/SchemaRegistrySerde.java | 9 ++++++++- + .../integration/odd/TopicsExporter.java | 7 ++++++- + .../builtin/sr/SchemaRegistrySerdeTest.java | 20 +++++++++++++++++++ + 3 files changed, 34 insertions(+), 2 deletions(-) + +diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +index 91c2375d..3d566dcf 100644 +--- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java ++++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +@@ -243,7 +243,14 @@ public class SchemaRegistrySerde implements BuiltInSerde { + } + + private String schemaSubject(String topic, Target type) { +- return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, topic); ++ // Transform topic name from {env}.{topic_name} to {topic_name} ++ // Example: "development.integration_events.connection_management.entities.connection" ++ // becomes "integration_events.connection_management.entities.connection" ++ String transformedTopic = topic.contains(".") ? ++ topic.substring(topic.indexOf(".") + 1) : topic; ++ ++ // Apply the schema naming template with the transformed topic name ++ return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, transformedTopic); + } + + @Override +diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java +index d34e0a61..1960b97a 100644 +--- a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java ++++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java +@@ -103,7 +103,12 @@ class TopicsExporter { + if (cluster.getSchemaRegistryClient() == null) { + return Mono.just(List.of()); + } +- String subject = topic + (isKey ? "-key" : "-value"); ++ // Transform topic name from {env}.{topic_name} to {topic_name} ++ // Example: "development.integration_events.connection_management.entities.connection" ++ // becomes "integration_events.connection_management.entities.connection" ++ String transformedTopic = topic.contains(".") ? ++ topic.substring(topic.indexOf(".") + 1) : topic; ++ String subject = transformedTopic + (isKey ? "-key" : "-value"); + return getSubjWithResolvedRefs(cluster, subject) + .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey)) + .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())) +diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +index d66a8d00..54b1875f 100644 +--- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java ++++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +@@ -70,6 +70,26 @@ class SchemaRegistrySerdeTest { + assertThat(serde.getSchema(topic, Serde.Target.VALUE)).isEmpty(); + } + ++ @Test ++ @SneakyThrows ++ void transformsEnvironmentPrefixedTopicNames() { ++ // Test with environment-prefixed topic name ++ String topic = "development.integration_events.connection_management.entities.connection"; ++ String expectedSubject = "integration_events.connection_management.entities.connection-key"; ++ ++ int schemaId = registryClient.register(expectedSubject, new AvroSchema("{ \"type\": \"int\" }")); ++ int registeredVersion = registryClient.getLatestSchemaMetadata(expectedSubject).getVersion(); ++ ++ var schemaOptional = serde.getSchema(topic, Serde.Target.KEY); ++ assertThat(schemaOptional).isPresent(); ++ ++ SchemaDescription schemaDescription = schemaOptional.get(); ++ assertThat(schemaDescription.getAdditionalProperties()) ++ .containsEntry("subject", expectedSubject) ++ .containsEntry("schemaId", schemaId) ++ .containsEntry("latestVersion", registeredVersion); ++ } ++ + @Test + void serializeTreatsInputAsJsonAvroSchemaPayload() throws RestClientException, IOException { + AvroSchema schema = new AvroSchema( +-- +2.50.1 + diff --git a/DINF-2950.patch b/DINF-2950.patch new file mode 100644 index 000000000..e69de29bb diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 91c2375d8..f973a97a6 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -243,7 +243,15 @@ private Optional wrapWith404Handler(Callable call) { } private String schemaSubject(String topic, Target type) { - return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, topic); + // Transform topic name from {env}.{topic_name} to {topic_name} + // Example: "development.integration_events.connection_management.entities.connection" + // becomes "integration_events.connection_management.entities.connection" + String transformedTopic = topic.contains(".") + ? topic.substring(topic.indexOf(".") + 1) + : topic; + + // Apply the schema naming template with the transformed topic name + return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, transformedTopic); } @Override diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java index d34e0a613..335047440 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java @@ -103,7 +103,13 @@ private Mono> getTopicSchema(KafkaCluster cluster, if (cluster.getSchemaRegistryClient() == null) { return Mono.just(List.of()); } - String subject = topic + (isKey ? "-key" : "-value"); + // Transform topic name from {env}.{topic_name} to {topic_name} + // Example: "development.integration_events.connection_management.entities.connection" + // becomes "integration_events.connection_management.entities.connection" + String transformedTopic = topic.contains(".") + ? topic.substring(topic.indexOf(".") + 1) + : topic; + String subject = transformedTopic + (isKey ? "-key" : "-value"); return getSubjWithResolvedRefs(cluster, subject) .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey)) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())) diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index d66a8d004..54b1875fb 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -70,6 +70,26 @@ void returnsEmptyDescriptorIfSchemaNotRegisteredInSR() { assertThat(serde.getSchema(topic, Serde.Target.VALUE)).isEmpty(); } + @Test + @SneakyThrows + void transformsEnvironmentPrefixedTopicNames() { + // Test with environment-prefixed topic name + String topic = "development.integration_events.connection_management.entities.connection"; + String expectedSubject = "integration_events.connection_management.entities.connection-key"; + + int schemaId = registryClient.register(expectedSubject, new AvroSchema("{ \"type\": \"int\" }")); + int registeredVersion = registryClient.getLatestSchemaMetadata(expectedSubject).getVersion(); + + var schemaOptional = serde.getSchema(topic, Serde.Target.KEY); + assertThat(schemaOptional).isPresent(); + + SchemaDescription schemaDescription = schemaOptional.get(); + assertThat(schemaDescription.getAdditionalProperties()) + .containsEntry("subject", expectedSubject) + .containsEntry("schemaId", schemaId) + .containsEntry("latestVersion", registeredVersion); + } + @Test void serializeTreatsInputAsJsonAvroSchemaPayload() throws RestClientException, IOException { AvroSchema schema = new AvroSchema(