Skip to content

Commit 7115cf4

Browse files
committed
feat: implement reserved fields validation (validateFields config)
Add Confluent-compatible reserved fields validation controlled by the validateFields config option. When enabled, two rules are enforced: 1. Reserved fields (listed in confluent:reserved metadata) must not conflict with actual top-level schema fields 2. Reserved fields from previous versions must not be removed Changes: - Add HasTopLevelField() to ParsedSchema interface (Avro, Protobuf, JSON Schema) - Add ValidateFields config to storage, API types, and all 4 backends - Add validateReservedFields() and isValidateFieldsEnabled() to registry - Add DB migrations for validate_fields column (Postgres, MySQL, Cassandra) - Add 12 BDD scenarios covering all schema types and config overrides
1 parent ae70703 commit 7115cf4

File tree

16 files changed

+507
-31
lines changed

16 files changed

+507
-31
lines changed

internal/api/handlers/handlers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ func (h *Handler) GetConfig(w http.ResponseWriter, r *http.Request) {
575575
writeJSON(w, http.StatusOK, types.ConfigResponse{
576576
CompatibilityLevel: config.CompatibilityLevel,
577577
Normalize: config.Normalize,
578+
ValidateFields: config.ValidateFields,
578579
Alias: config.Alias,
579580
CompatibilityGroup: config.CompatibilityGroup,
580581
DefaultMetadata: config.DefaultMetadata,
@@ -594,6 +595,7 @@ func (h *Handler) GetConfig(w http.ResponseWriter, r *http.Request) {
594595
writeJSON(w, http.StatusOK, types.ConfigResponse{
595596
CompatibilityLevel: config.CompatibilityLevel,
596597
Normalize: config.Normalize,
598+
ValidateFields: config.ValidateFields,
597599
Alias: config.Alias,
598600
CompatibilityGroup: config.CompatibilityGroup,
599601
DefaultMetadata: config.DefaultMetadata,
@@ -627,6 +629,7 @@ func (h *Handler) SetConfig(w http.ResponseWriter, r *http.Request) {
627629
configOpts := registry.SetConfigOpts{
628630
Alias: req.Alias,
629631
CompatibilityGroup: req.CompatibilityGroup,
632+
ValidateFields: req.ValidateFields,
630633
DefaultMetadata: req.DefaultMetadata,
631634
OverrideMetadata: req.OverrideMetadata,
632635
DefaultRuleSet: req.DefaultRuleSet,
@@ -644,6 +647,7 @@ func (h *Handler) SetConfig(w http.ResponseWriter, r *http.Request) {
644647
resp := types.ConfigRequest{
645648
Compatibility: strings.ToUpper(req.Compatibility),
646649
Normalize: req.Normalize,
650+
ValidateFields: req.ValidateFields,
647651
Alias: req.Alias,
648652
CompatibilityGroup: req.CompatibilityGroup,
649653
DefaultMetadata: req.DefaultMetadata,

internal/api/types/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type LookupSchemaResponse struct {
7070
type ConfigResponse struct {
7171
CompatibilityLevel string `json:"compatibilityLevel"`
7272
Normalize *bool `json:"normalize,omitempty"`
73+
ValidateFields *bool `json:"validateFields,omitempty"`
7374
Alias string `json:"alias,omitempty"`
7475
CompatibilityGroup string `json:"compatibilityGroup,omitempty"`
7576
DefaultMetadata *storage.Metadata `json:"defaultMetadata,omitempty"`
@@ -82,6 +83,7 @@ type ConfigResponse struct {
8283
type ConfigRequest struct {
8384
Compatibility string `json:"compatibility"`
8485
Normalize *bool `json:"normalize,omitempty"`
86+
ValidateFields *bool `json:"validateFields,omitempty"`
8587
Alias string `json:"alias,omitempty"`
8688
CompatibilityGroup string `json:"compatibilityGroup,omitempty"`
8789
DefaultMetadata *storage.Metadata `json:"defaultMetadata,omitempty"`

internal/registry/registry.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ func (r *Registry) RegisterSchema(ctx context.Context, subject string, schemaStr
135135
}
136136
}
137137

138+
// Validate reserved fields if enabled
139+
if r.isValidateFieldsEnabled(ctx, subject) {
140+
if msgs := r.validateReservedFields(ctx, subject, parsed, opt.Metadata); len(msgs) > 0 {
141+
return nil, fmt.Errorf("%w: %s", ErrIncompatibleSchema, strings.Join(msgs, "; "))
142+
}
143+
}
144+
138145
// Check confluent:version (compare-and-set) if present in metadata
139146
if err := r.checkConfluentVersion(ctx, subject, opt.Metadata); err != nil {
140147
return nil, err
@@ -627,6 +634,7 @@ func (r *Registry) GetConfigFull(ctx context.Context, subject string) (*storage.
627634
type SetConfigOpts struct {
628635
Alias string
629636
CompatibilityGroup string
637+
ValidateFields *bool
630638
DefaultMetadata *storage.Metadata
631639
OverrideMetadata *storage.Metadata
632640
DefaultRuleSet *storage.RuleSet
@@ -649,6 +657,7 @@ func (r *Registry) SetConfig(ctx context.Context, subject string, level string,
649657
opt := opts[0]
650658
config.Alias = opt.Alias
651659
config.CompatibilityGroup = opt.CompatibilityGroup
660+
config.ValidateFields = opt.ValidateFields
652661
config.DefaultMetadata = opt.DefaultMetadata
653662
config.OverrideMetadata = opt.OverrideMetadata
654663
config.DefaultRuleSet = opt.DefaultRuleSet
@@ -1060,6 +1069,77 @@ func (r *Registry) filterByCompatibilityGroup(ctx context.Context, subject strin
10601069
return filtered
10611070
}
10621071

1072+
// isValidateFieldsEnabled checks if reserved field validation is enabled for a subject.
1073+
func (r *Registry) isValidateFieldsEnabled(ctx context.Context, subject string) bool {
1074+
// Check subject config first
1075+
if subject != "" {
1076+
config, err := r.storage.GetConfig(ctx, subject)
1077+
if err == nil && config != nil && config.ValidateFields != nil {
1078+
return *config.ValidateFields
1079+
}
1080+
}
1081+
// Fall back to global config
1082+
config, err := r.storage.GetGlobalConfig(ctx)
1083+
if err == nil && config != nil && config.ValidateFields != nil {
1084+
return *config.ValidateFields
1085+
}
1086+
return false
1087+
}
1088+
1089+
// getReservedFields extracts reserved field names from the "confluent:reserved"
1090+
// metadata property. Returns an empty set if not present.
1091+
func getReservedFields(metadata *storage.Metadata) map[string]bool {
1092+
if metadata == nil || metadata.Properties == nil {
1093+
return nil
1094+
}
1095+
val, ok := metadata.Properties["confluent:reserved"]
1096+
if !ok || val == "" {
1097+
return nil
1098+
}
1099+
fields := make(map[string]bool)
1100+
for _, f := range strings.Split(val, ",") {
1101+
f = strings.TrimSpace(f)
1102+
if f != "" {
1103+
fields[f] = true
1104+
}
1105+
}
1106+
if len(fields) == 0 {
1107+
return nil
1108+
}
1109+
return fields
1110+
}
1111+
1112+
// validateReservedFields checks two invariants when validateFields is enabled:
1113+
// 1. Reserved fields listed in metadata must not exist as top-level schema fields.
1114+
// 2. Reserved fields from the previous version must not be removed.
1115+
func (r *Registry) validateReservedFields(ctx context.Context, subject string, parsed schema.ParsedSchema, metadata *storage.Metadata) []string {
1116+
var msgs []string
1117+
1118+
reservedFields := getReservedFields(metadata)
1119+
1120+
// Rule 2: Check that reserved fields from previous version are not removed
1121+
latest, err := r.storage.GetLatestSchema(ctx, subject)
1122+
if err == nil && latest != nil {
1123+
prevReserved := getReservedFields(latest.Metadata)
1124+
for field := range prevReserved {
1125+
if !reservedFields[field] {
1126+
msgs = append(msgs, fmt.Sprintf(
1127+
"The new schema has reserved field %s removed from its metadata which is present in the old schema's metadata.", field))
1128+
}
1129+
}
1130+
}
1131+
1132+
// Rule 1: Reserved fields must not conflict with actual schema fields
1133+
for field := range reservedFields {
1134+
if parsed.HasTopLevelField(field) {
1135+
msgs = append(msgs, fmt.Sprintf(
1136+
"The new schema has field that conflicts with the reserved field %s.", field))
1137+
}
1138+
}
1139+
1140+
return msgs
1141+
}
1142+
10631143
// resolveReferences looks up the schema content for each reference from storage.
10641144
func (r *Registry) resolveReferences(ctx context.Context, refs []storage.Reference) ([]storage.Reference, error) {
10651145
if len(refs) == 0 {

internal/schema/avro/parser.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ func (s *ParsedSchema) Normalize() schema.ParsedSchema {
104104
}
105105
}
106106

107+
// HasTopLevelField reports whether the Avro record schema contains a field
108+
// with the given name. Returns false for non-record schemas.
109+
func (s *ParsedSchema) HasTopLevelField(field string) bool {
110+
if rs, ok := s.rawSchema.(*avro.RecordSchema); ok {
111+
for _, f := range rs.Fields() {
112+
if f.Name() == field {
113+
return true
114+
}
115+
}
116+
}
117+
return false
118+
}
119+
107120
// FormattedString returns the schema in the requested format.
108121
// Supported formats: "resolved" (inlines all references), "default" (canonical).
109122
func (s *ParsedSchema) FormattedString(format string) string {

internal/schema/jsonschema/parser.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ func (p *ParsedJSONSchema) Normalize() schema.ParsedSchema {
115115
}
116116
}
117117

118+
// HasTopLevelField reports whether the JSON Schema "properties" object
119+
// contains a key with the given name.
120+
func (p *ParsedJSONSchema) HasTopLevelField(field string) bool {
121+
props, ok := p.schemaMap["properties"].(map[string]interface{})
122+
if !ok {
123+
return false
124+
}
125+
_, exists := props[field]
126+
return exists
127+
}
128+
118129
// FormattedString returns the schema in the requested format.
119130
// JSON Schema does not support special format values; always returns canonical string.
120131
func (p *ParsedJSONSchema) FormattedString(format string) string {

internal/schema/protobuf/parser.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@ func (p *ParsedProtobuf) Normalize() schema.ParsedSchema {
116116
}
117117
}
118118

119+
// HasTopLevelField reports whether any top-level message in the Protobuf
120+
// schema contains a field with the given name.
121+
func (p *ParsedProtobuf) HasTopLevelField(field string) bool {
122+
if p.descriptor == nil {
123+
return false
124+
}
125+
msgs := p.descriptor.Messages()
126+
for i := 0; i < msgs.Len(); i++ {
127+
fields := msgs.Get(i).Fields()
128+
for j := 0; j < fields.Len(); j++ {
129+
if string(fields.Get(j).Name()) == field {
130+
return true
131+
}
132+
}
133+
}
134+
return false
135+
}
136+
119137
// FormattedString returns the schema in the requested format.
120138
// Supported formats: "serialized" (base64-encoded FileDescriptorProto), "default" (canonical).
121139
func (p *ParsedProtobuf) FormattedString(format string) string {

internal/schema/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ type ParsedSchema interface {
2727
// Normalize returns a normalized copy of this schema with deterministic
2828
// representation for deduplication and comparison purposes.
2929
Normalize() ParsedSchema
30+
31+
// HasTopLevelField reports whether the schema contains a top-level field
32+
// with the given name. For Avro records this checks record fields, for
33+
// Protobuf it checks fields across all top-level messages, and for JSON
34+
// Schema it checks the "properties" object.
35+
HasTopLevelField(field string) bool
3036
}
3137

3238
// Parser is the interface for schema parsers.

internal/storage/cassandra/migrations.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ func Migrate(session *gocql.Session, keyspace string) error {
229229
// subject_configs and global_config: compatibility_group
230230
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD compatibility_group text`, qident(keyspace)),
231231
fmt.Sprintf(`ALTER TABLE %s.global_config ADD compatibility_group text`, qident(keyspace)),
232+
233+
// subject_configs and global_config: validate_fields
234+
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD validate_fields boolean`, qident(keyspace)),
235+
fmt.Sprintf(`ALTER TABLE %s.global_config ADD validate_fields boolean`, qident(keyspace)),
232236
}
233237
for _, stmt := range alterStmts {
234238
if err := session.Query(stmt).Exec(); err != nil {

internal/storage/cassandra/store.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,13 +1361,13 @@ func (s *Store) GetConfig(ctx context.Context, subject string) (*storage.ConfigR
13611361
return s.GetGlobalConfig(ctx)
13621362
}
13631363
var compat, alias string
1364-
var normalize *bool
1364+
var normalize, validateFields *bool
13651365
var compatibilityGroup string
13661366
var defaultMetadataStr, overrideMetadataStr, defaultRulesetStr, overrideRulesetStr string
13671367
err := s.readQuery(
1368-
fmt.Sprintf(`SELECT compatibility, alias, normalize, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group FROM %s.subject_configs WHERE subject = ?`, qident(s.cfg.Keyspace)),
1368+
fmt.Sprintf(`SELECT compatibility, alias, normalize, validate_fields, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group FROM %s.subject_configs WHERE subject = ?`, qident(s.cfg.Keyspace)),
13691369
subject,
1370-
).WithContext(ctx).Scan(&compat, &alias, &normalize, &defaultMetadataStr, &overrideMetadataStr, &defaultRulesetStr, &overrideRulesetStr, &compatibilityGroup)
1370+
).WithContext(ctx).Scan(&compat, &alias, &normalize, &validateFields, &defaultMetadataStr, &overrideMetadataStr, &defaultRulesetStr, &overrideRulesetStr, &compatibilityGroup)
13711371
if err != nil {
13721372
if errors.Is(err, gocql.ErrNotFound) {
13731373
return nil, storage.ErrNotFound
@@ -1379,6 +1379,7 @@ func (s *Store) GetConfig(ctx context.Context, subject string) (*storage.ConfigR
13791379
CompatibilityLevel: compat,
13801380
Alias: alias,
13811381
Normalize: normalize,
1382+
ValidateFields: validateFields,
13821383
CompatibilityGroup: compatibilityGroup,
13831384
DefaultMetadata: unmarshalJSONText[storage.Metadata](defaultMetadataStr),
13841385
OverrideMetadata: unmarshalJSONText[storage.Metadata](overrideMetadataStr),
@@ -1394,9 +1395,9 @@ func (s *Store) SetConfig(ctx context.Context, subject string, config *storage.C
13941395
}
13951396
compat := normalizeCompat(config.CompatibilityLevel)
13961397
return s.writeQuery(
1397-
fmt.Sprintf(`INSERT INTO %s.subject_configs (subject, compatibility, alias, normalize, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1398-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1399-
subject, compat, config.Alias, config.Normalize,
1398+
fmt.Sprintf(`INSERT INTO %s.subject_configs (subject, compatibility, alias, normalize, validate_fields, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1399+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1400+
subject, compat, config.Alias, config.Normalize, config.ValidateFields,
14001401
marshalJSONText(config.DefaultMetadata), marshalJSONText(config.OverrideMetadata),
14011402
marshalJSONText(config.DefaultRuleSet), marshalJSONText(config.OverrideRuleSet),
14021403
config.CompatibilityGroup,
@@ -1422,13 +1423,13 @@ func (s *Store) DeleteConfig(ctx context.Context, subject string) error {
14221423
// GetGlobalConfig retrieves the global compatibility config.
14231424
func (s *Store) GetGlobalConfig(ctx context.Context) (*storage.ConfigRecord, error) {
14241425
var compat, alias string
1425-
var normalize *bool
1426+
var normalize, validateFields *bool
14261427
var compatibilityGroup string
14271428
var defaultMetadataStr, overrideMetadataStr, defaultRulesetStr, overrideRulesetStr string
14281429
err := s.readQuery(
1429-
fmt.Sprintf(`SELECT compatibility, alias, normalize, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group FROM %s.global_config WHERE key = ?`, qident(s.cfg.Keyspace)),
1430+
fmt.Sprintf(`SELECT compatibility, alias, normalize, validate_fields, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group FROM %s.global_config WHERE key = ?`, qident(s.cfg.Keyspace)),
14301431
"global",
1431-
).WithContext(ctx).Scan(&compat, &alias, &normalize, &defaultMetadataStr, &overrideMetadataStr, &defaultRulesetStr, &overrideRulesetStr, &compatibilityGroup)
1432+
).WithContext(ctx).Scan(&compat, &alias, &normalize, &validateFields, &defaultMetadataStr, &overrideMetadataStr, &defaultRulesetStr, &overrideRulesetStr, &compatibilityGroup)
14321433
if err != nil {
14331434
if errors.Is(err, gocql.ErrNotFound) {
14341435
return nil, storage.ErrNotFound
@@ -1440,6 +1441,7 @@ func (s *Store) GetGlobalConfig(ctx context.Context) (*storage.ConfigRecord, err
14401441
CompatibilityLevel: compat,
14411442
Alias: alias,
14421443
Normalize: normalize,
1444+
ValidateFields: validateFields,
14431445
CompatibilityGroup: compatibilityGroup,
14441446
DefaultMetadata: unmarshalJSONText[storage.Metadata](defaultMetadataStr),
14451447
OverrideMetadata: unmarshalJSONText[storage.Metadata](overrideMetadataStr),
@@ -1452,24 +1454,25 @@ func (s *Store) GetGlobalConfig(ctx context.Context) (*storage.ConfigRecord, err
14521454
func (s *Store) SetGlobalConfig(ctx context.Context, config *storage.ConfigRecord) error {
14531455
compat := "BACKWARD"
14541456
var alias string
1455-
var normalize *bool
1457+
var normalize, validateFields *bool
14561458
var compatibilityGroup string
14571459
var defaultMetadata, overrideMetadata *storage.Metadata
14581460
var defaultRuleSet, overrideRuleSet *storage.RuleSet
14591461
if config != nil {
14601462
compat = normalizeCompat(config.CompatibilityLevel)
14611463
alias = config.Alias
14621464
normalize = config.Normalize
1465+
validateFields = config.ValidateFields
14631466
compatibilityGroup = config.CompatibilityGroup
14641467
defaultMetadata = config.DefaultMetadata
14651468
overrideMetadata = config.OverrideMetadata
14661469
defaultRuleSet = config.DefaultRuleSet
14671470
overrideRuleSet = config.OverrideRuleSet
14681471
}
14691472
return s.writeQuery(
1470-
fmt.Sprintf(`INSERT INTO %s.global_config (key, compatibility, alias, normalize, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1471-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1472-
"global", compat, alias, normalize,
1473+
fmt.Sprintf(`INSERT INTO %s.global_config (key, compatibility, alias, normalize, validate_fields, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1474+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1475+
"global", compat, alias, normalize, validateFields,
14731476
marshalJSONText(defaultMetadata), marshalJSONText(overrideMetadata),
14741477
marshalJSONText(defaultRuleSet), marshalJSONText(overrideRuleSet),
14751478
compatibilityGroup,
@@ -1479,9 +1482,9 @@ func (s *Store) SetGlobalConfig(ctx context.Context, config *storage.ConfigRecor
14791482
// DeleteGlobalConfig resets the global config to the default (BACKWARD).
14801483
func (s *Store) DeleteGlobalConfig(ctx context.Context) error {
14811484
return s.writeQuery(
1482-
fmt.Sprintf(`INSERT INTO %s.global_config (key, compatibility, alias, normalize, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1483-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1484-
"global", "BACKWARD", "", nil, "", "", "", "", "",
1485+
fmt.Sprintf(`INSERT INTO %s.global_config (key, compatibility, alias, normalize, validate_fields, default_metadata, override_metadata, default_ruleset, override_ruleset, compatibility_group, updated_at)
1486+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now())`, qident(s.cfg.Keyspace)),
1487+
"global", "BACKWARD", "", nil, nil, "", "", "", "", "",
14851488
).WithContext(ctx).Exec()
14861489
}
14871490

internal/storage/mysql/migrations.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,7 @@ var migrations = []string{
117117

118118
// Migration 19: Add compatibility_group column to configs
119119
"ALTER TABLE configs ADD COLUMN compatibility_group VARCHAR(255)",
120+
121+
// Migration 20: Add validate_fields column to configs
122+
"ALTER TABLE configs ADD COLUMN validate_fields BOOLEAN",
120123
}

0 commit comments

Comments
 (0)