Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions backend/pkg/api/handle_schema_registry_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,93 @@ enum MyEnumA {
assert.Equal(thirdSchemaID, fourthSchemaID, "with normalize=true, schemas with different enum value order should produce the same schema ID")
})
}

func (s *APIIntegrationTestSuite) TestSchemaMetadata() {
t := s.T()
t.Skip() // todo remove skip once redpanda v26.1 is GA
require := require.New(t)
assert := assert.New(t)

t.Run("create schema with metadata properties", func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()

schemaStr := `{"type":"record","name":"User","fields":[{"name":"id","type":"string"}]}`
req := struct {
Schema string `json:"schema"`
Type string `json:"schemaType"`
Metadata struct {
Properties map[string]string `json:"properties"`
} `json:"metadata"`
}{
Schema: schemaStr,
Type: sr.TypeAvro.String(),
}
req.Metadata.Properties = map[string]string{
"owner": "team-platform",
"version": "1.0.0",
}

res, body := s.apiRequest(ctx, http.MethodPost, "/api/schema-registry/subjects/test-metadata/versions", req)
require.Equal(200, res.StatusCode)

createResponse := struct {
ID int `json:"id"`
}{}
err := json.Unmarshal(body, &createResponse)
require.NoError(err)
assert.Greater(createResponse.ID, 0, "schema ID should be returned")
})

t.Run("retrieve schema with metadata", func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()

res, body := s.apiRequest(ctx, http.MethodGet, "/api/schema-registry/subjects/test-metadata/versions/latest", nil)
require.Equal(200, res.StatusCode)

var details console.SchemaRegistrySubjectDetails
err := json.Unmarshal(body, &details)
require.NoError(err)

// Verify metadata is present in response
require.Len(details.Schemas, 1, "should have one schema")
require.NotNil(details.Schemas[0].Metadata, "metadata should not be nil")
assert.Equal("team-platform", details.Schemas[0].Metadata.Properties["owner"], "owner property should match")
assert.Equal("1.0.0", details.Schemas[0].Metadata.Properties["version"], "version property should match")
})

t.Run("create schema without metadata (backward compatibility)", func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
defer cancel()

schemaStr := `{"type":"record","name":"Event","fields":[{"name":"id","type":"string"}]}`
req := struct {
Schema string `json:"schema"`
Type string `json:"schemaType"`
}{
Schema: schemaStr,
Type: sr.TypeAvro.String(),
}

res, body := s.apiRequest(ctx, http.MethodPost, "/api/schema-registry/subjects/test-no-metadata/versions", req)
require.Equal(200, res.StatusCode)

createResponse := struct {
ID int `json:"id"`
}{}
err := json.Unmarshal(body, &createResponse)
require.NoError(err)
assert.Greater(createResponse.ID, 0, "schema ID should be returned")

// Verify schema without metadata retrieves correctly
res, body = s.apiRequest(ctx, http.MethodGet, "/api/schema-registry/subjects/test-no-metadata/versions/latest", nil)
require.Equal(200, res.StatusCode)

var details console.SchemaRegistrySubjectDetails
err = json.Unmarshal(body, &details)
require.NoError(err)
require.Len(details.Schemas, 1, "should have one schema")
assert.Nil(details.Schemas[0].Metadata, "metadata should be nil for schema without metadata")
})
}
43 changes: 37 additions & 6 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,24 @@ func mapSubjectSchema(in sr.SubjectSchema, isSoftDeleted bool) SchemaRegistryVer
Version: ref.Version,
}
}

var metadata *SchemaMetadata
if in.SchemaMetadata != nil {
metadata = &SchemaMetadata{
Tags: in.SchemaMetadata.Tags,
Properties: in.SchemaMetadata.Properties,
Sensitive: in.SchemaMetadata.Sensitive,
}
}

return SchemaRegistryVersionedSchema{
ID: in.ID,
Version: in.Version,
IsSoftDeleted: isSoftDeleted,
Type: in.Type,
Schema: in.Schema.Schema,
References: references,
Metadata: metadata,
}
}

Expand Down Expand Up @@ -395,12 +406,13 @@ func (s *Service) getSubjectCompatibilityLevel(ctx context.Context, srClient *rp

// SchemaRegistryVersionedSchema describes a retrieved schema.
type SchemaRegistryVersionedSchema struct {
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type sr.SchemaType `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type sr.SchemaType `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
Metadata *SchemaMetadata `json:"metadata,omitempty"`
}

// Reference describes a reference to a different schema stored in the schema registry.
Expand All @@ -410,6 +422,13 @@ type Reference struct {
Version int `json:"version"`
}

// SchemaMetadata contains metadata associated with a schema version.
type SchemaMetadata struct {
Tags map[string][]string `json:"tags,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
Sensitive []string `json:"sensitive,omitempty"`
}

// GetSchemaRegistrySchema retrieves a schema for a given subject, version tuple from the
// schema registry. You can use -1 as the version to return the latest schema,
func (s *Service) GetSchemaRegistrySchema(ctx context.Context, subjectName string, version int, showSoftDeleted bool) (*SchemaRegistryVersionedSchema, error) {
Expand Down Expand Up @@ -585,6 +604,18 @@ func (s *Service) CreateSchemaRegistrySchema(ctx context.Context, subjectName st

subjectSchema, err := srClient.CreateSchema(ctx, subjectName, schema)
if err != nil {
// If metadata was included and we got a parse error, retry without metadata.
// Older Redpanda versions don't support the metadata field.
if schema.SchemaMetadata != nil {
s.logger.WarnContext(ctx, "retrying schema creation without metadata (unsupported by this Redpanda version)",
slog.String("subject", subjectName))
schema.SchemaMetadata = nil
subjectSchema, err = srClient.CreateSchema(ctx, subjectName, schema)
if err != nil {
return nil, err
}
return &CreateSchemaResponse{ID: subjectSchema.ID}, nil
}
return nil, err
}

Expand Down
109 changes: 96 additions & 13 deletions frontend/src/components/pages/schemas/schema-create.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import {
IconButton,
Input,
RadioGroup,
Text,
useToast,
} from '@redpanda-data/ui';
import { useQueryClient } from '@tanstack/react-query';
import { TrashIcon } from 'components/icons';
import { InfoIcon } from 'lucide-react';
import { observable } from 'mobx';
Expand Down Expand Up @@ -150,6 +152,16 @@ export class SchemaAddVersionPage extends PageComponent<{ subjectName: string }>
this.editorState.references = schema.references;
this.editorState.strategy = 'CUSTOM';
this.editorState.userInput = subject.name;

// Load existing metadata properties for editing
if (schema.metadata?.properties) {
this.editorState.metadataProperties = Object.entries(schema.metadata.properties).map(([key, value]) => ({
key,
value,
}));
// Add an empty row for adding new properties
this.editorState.metadataProperties.push({ key: '', value: '' });
}
}

return (
Expand All @@ -176,6 +188,7 @@ const SchemaPageButtons = observer(
editorState: SchemaEditorStateHelper;
}) => {
const toast = useToast();
const queryClient = useQueryClient();
const [isValidating, setValidating] = useState(false);
const [isCreating, setCreating] = useState(false);
const [persistentValidationError, setPersistentValidationError] = useState<{
Expand Down Expand Up @@ -243,11 +256,12 @@ const SchemaPageButtons = observer(
setCreating(true);
try {
const subjectName = editorState.computedSubjectName;
const r = await api
await api
.createSchema(editorState.computedSubjectName, {
schemaType: editorState.format as SchemaTypeType,
schema: editorState.schemaText,
references: editorState.references.filter((x) => x.name && x.subject),
metadata: editorState.computedMetadata,
params: {
normalize: editorState.normalize,
},
Expand All @@ -256,19 +270,14 @@ const SchemaPageButtons = observer(

await api.refreshSchemaDetails(subjectName, true);

// success: navigate to details
const latestVersion = api.schemaDetails.get(subjectName)?.latestActiveVersion;
// biome-ignore lint/suspicious/noConsole: intentional console usage
console.log('schema created', { response: r });
// biome-ignore lint/suspicious/noConsole: intentional console usage
console.log('navigating to details', { subjectName, latestVersion });
appGlobal.historyReplace(
`/schema-registry/subjects/${encodeURIComponent(subjectName)}?version=${latestVersion}`
);
// Invalidate React Query cache so details page shows latest data
await queryClient.invalidateQueries({
queryKey: ['schemaRegistry', 'subjects', subjectName, 'details'],
});

// success: navigate to details with "latest" so it picks up the new version
appGlobal.historyReplace(`/schema-registry/subjects/${encodeURIComponent(subjectName)}?version=latest`);
} catch (err) {
// error: open modal
// biome-ignore lint/suspicious/noConsole: intentional console usage
console.log('failed to create schema', { err });
toast({
status: 'error',
duration: undefined,
Expand Down Expand Up @@ -547,6 +556,16 @@ const SchemaEditor = observer((p: { state: SchemaEditorStateHelper; mode: 'CREAT
{/* <Text>This is an example help text about the references list, to be updated later</Text> */}

<ReferencesEditor state={state} />

<Heading mt="8" variant="lg">
Schema metadata
</Heading>
<Text>
Optional key-value properties to associate with this schema. Metadata will be ignored if not supported by
schema registry.
</Text>

<MetadataPropertiesEditor state={state} />
</Flex>
</>
);
Expand Down Expand Up @@ -636,6 +655,59 @@ const ReferencesEditor = observer((p: { state: SchemaEditorStateHelper }) => {
);
});

const MetadataPropertiesEditor = observer((p: { state: SchemaEditorStateHelper }) => {
const { state } = p;
const props = state.metadataProperties;

const renderRow = (prop: { key: string; value: string }, index: number) => (
<Flex alignItems="flex-end" gap="4" key={index}>
<FormField label="Key">
<Input
data-testid={`schema-create-metadata-key-input-${index}`}
onChange={(e) => {
prop.key = e.target.value;
}}
placeholder="e.g. owner"
value={prop.key}
/>
</FormField>
<FormField label="Value">
<Input
data-testid={`schema-create-metadata-value-input-${index}`}
onChange={(e) => {
prop.value = e.target.value;
}}
placeholder="e.g. team-platform"
value={prop.value}
/>
</FormField>
<IconButton
aria-label="delete"
data-testid={`schema-create-metadata-delete-btn-${index}`}
icon={<TrashIcon fontSize="19px" />}
onClick={() => props.remove(prop)}
variant="ghost"
/>
</Flex>
);

return (
<Flex direction="column" gap="4">
{props.map((x, index) => renderRow(x, index))}

<Button
data-testid="schema-create-add-metadata-btn"
onClick={() => props.push({ key: '', value: '' })}
size="sm"
variant="outline"
width="fit-content"
>
Add property
</Button>
</Flex>
);
});

function createSchemaState() {
return observable({
strategy: 'TOPIC' as
Expand All @@ -654,6 +726,17 @@ function createSchemaState() {
version: number;
}[],
normalize: false,
metadataProperties: [{ key: '', value: '' }] as { key: string; value: string }[],

get computedMetadata(): { properties: Record<string, string> } | undefined {
const properties: Record<string, string> = {};
for (const prop of this.metadataProperties) {
if (prop.key && prop.value) {
properties[prop.key] = prop.value;
}
}
return Object.keys(properties).length > 0 ? { properties } : undefined;
},

get isInvalidKeyOrValue() {
return this.strategy === 'TOPIC' && this.userInput.length > 0 && !this.keyOrValue;
Expand Down
Loading
Loading