Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions 0001-Fix-Remove-environment-prefix-from-topic-names-befor.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
From 673d4a12f8196e100f131cc397cf76fd06f7e261 Mon Sep 17 00:00:00 2001
From: Ashok B <[email protected]>
Date: Wed, 5 Nov 2025 16:20:00 +0530
Subject: [PATCH] Fix: Remove environment prefix from topic names before schema
lookup and update tests for consistency

---
.../builtin/sr/SchemaRegistrySerde.java | 9 ++++++++-
.../integration/odd/TopicsExporter.java | 7 ++++++-
.../builtin/sr/SchemaRegistrySerdeTest.java | 20 +++++++++++++++++++
3 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java
index 91c2375d..3d566dcf 100644
--- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java
+++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java
@@ -243,7 +243,14 @@ public class SchemaRegistrySerde implements BuiltInSerde {
}

private String schemaSubject(String topic, Target type) {
- return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, topic);
+ // Transform topic name from {env}.{topic_name} to {topic_name}
+ // Example: "development.integration_events.connection_management.entities.connection"
+ // becomes "integration_events.connection_management.entities.connection"
+ String transformedTopic = topic.contains(".") ?
+ topic.substring(topic.indexOf(".") + 1) : topic;
+
+ // Apply the schema naming template with the transformed topic name
+ return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, transformedTopic);
}

@Override
diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java
index d34e0a61..1960b97a 100644
--- a/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java
+++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/TopicsExporter.java
@@ -103,7 +103,12 @@ class TopicsExporter {
if (cluster.getSchemaRegistryClient() == null) {
return Mono.just(List.of());
}
- String subject = topic + (isKey ? "-key" : "-value");
+ // Transform topic name from {env}.{topic_name} to {topic_name}
+ // Example: "development.integration_events.connection_management.entities.connection"
+ // becomes "integration_events.connection_management.entities.connection"
+ String transformedTopic = topic.contains(".") ?
+ topic.substring(topic.indexOf(".") + 1) : topic;
+ String subject = transformedTopic + (isKey ? "-key" : "-value");
return getSubjWithResolvedRefs(cluster, subject)
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java
index d66a8d00..54b1875f 100644
--- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java
+++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java
@@ -70,6 +70,26 @@ class SchemaRegistrySerdeTest {
assertThat(serde.getSchema(topic, Serde.Target.VALUE)).isEmpty();
}

+ @Test
+ @SneakyThrows
+ void transformsEnvironmentPrefixedTopicNames() {
+ // Test with environment-prefixed topic name
+ String topic = "development.integration_events.connection_management.entities.connection";
+ String expectedSubject = "integration_events.connection_management.entities.connection-key";
+
+ int schemaId = registryClient.register(expectedSubject, new AvroSchema("{ \"type\": \"int\" }"));
+ int registeredVersion = registryClient.getLatestSchemaMetadata(expectedSubject).getVersion();
+
+ var schemaOptional = serde.getSchema(topic, Serde.Target.KEY);
+ assertThat(schemaOptional).isPresent();
+
+ SchemaDescription schemaDescription = schemaOptional.get();
+ assertThat(schemaDescription.getAdditionalProperties())
+ .containsEntry("subject", expectedSubject)
+ .containsEntry("schemaId", schemaId)
+ .containsEntry("latestVersion", registeredVersion);
+ }
+
@Test
void serializeTreatsInputAsJsonAvroSchemaPayload() throws RestClientException, IOException {
AvroSchema schema = new AvroSchema(
--
2.50.1

Empty file added DINF-2950.patch
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,15 @@ private <T> Optional<T> wrapWith404Handler(Callable<T> call) {
}

private String schemaSubject(String topic, Target type) {
return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, topic);
// Transform topic name from {env}.{topic_name} to {topic_name}
// Example: "development.integration_events.connection_management.entities.connection"
// becomes "integration_events.connection_management.entities.connection"
String transformedTopic = topic.contains(".")
? topic.substring(topic.indexOf(".") + 1)
: topic;

// Apply the schema naming template with the transformed topic name
return String.format(type == Target.KEY ? keySchemaNameTemplate : valueSchemaNameTemplate, transformedTopic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,13 @@ private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
if (cluster.getSchemaRegistryClient() == null) {
return Mono.just(List.of());
}
String subject = topic + (isKey ? "-key" : "-value");
// Transform topic name from {env}.{topic_name} to {topic_name}
// Example: "development.integration_events.connection_management.entities.connection"
// becomes "integration_events.connection_management.entities.connection"
String transformedTopic = topic.contains(".")
? topic.substring(topic.indexOf(".") + 1)
: topic;
String subject = transformedTopic + (isKey ? "-key" : "-value");
return getSubjWithResolvedRefs(cluster, subject)
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ void returnsEmptyDescriptorIfSchemaNotRegisteredInSR() {
assertThat(serde.getSchema(topic, Serde.Target.VALUE)).isEmpty();
}

@Test
@SneakyThrows
void transformsEnvironmentPrefixedTopicNames() {
// Test with environment-prefixed topic name
String topic = "development.integration_events.connection_management.entities.connection";
String expectedSubject = "integration_events.connection_management.entities.connection-key";

int schemaId = registryClient.register(expectedSubject, new AvroSchema("{ \"type\": \"int\" }"));
int registeredVersion = registryClient.getLatestSchemaMetadata(expectedSubject).getVersion();

var schemaOptional = serde.getSchema(topic, Serde.Target.KEY);
assertThat(schemaOptional).isPresent();

SchemaDescription schemaDescription = schemaOptional.get();
assertThat(schemaDescription.getAdditionalProperties())
.containsEntry("subject", expectedSubject)
.containsEntry("schemaId", schemaId)
.containsEntry("latestVersion", registeredVersion);
}

@Test
void serializeTreatsInputAsJsonAvroSchemaPayload() throws RestClientException, IOException {
AvroSchema schema = new AvroSchema(
Expand Down
Loading