Skip to content

Commit a4b0c8d

Browse files
committed
fix: resolve remaining BDD failures with API, Protobuf, and FORWARD_TRANSITIVE fixes
API fixes: - Add error codes 40408/40409 for subject config/mode not found - Add deleted=true query param support for GetVersion handler - Add pagination (offset/limit) for versions, subjects, and schema ID endpoints - Clean up subject config/mode on subject delete Protobuf checker fixes: - Detect required field removal in proto2 as incompatible - Handle field-to-oneof moves: moving INTO existing oneof is incompatible (FIELD_MOVED_TO_EXISTING_ONEOF), moving into new oneof is compatible - Distinguish real vs synthetic oneofs (proto3 optional) - Optional-to-repeated only compatible for length-delimited types (string/bytes/message) FORWARD_TRANSITIVE test fix: - Corrected test data: f2 field now has no default, making transitive check properly incompatible (verified against Confluent Schema Registry v8.1.1) BDD results: 1283 passing, 0 failing (23 @pending-impl skipped)
1 parent 2227cf0 commit a4b0c8d

File tree

10 files changed

+446
-38
lines changed

10 files changed

+446
-38
lines changed

internal/api/handlers/handlers.go

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package handlers
33

44
import (
5+
"context"
56
"encoding/json"
67
"errors"
78
"fmt"
@@ -234,7 +235,9 @@ func (h *Handler) GetVersions(w http.ResponseWriter, r *http.Request) {
234235
return
235236
}
236237

237-
writeJSON(w, http.StatusOK, versions)
238+
// Apply pagination
239+
start, end := parsePagination(r, len(versions))
240+
writeJSON(w, http.StatusOK, versions[start:end])
238241
}
239242

240243
// GetVersion handles GET /subjects/{subject}/versions/{version}
@@ -249,18 +252,26 @@ func (h *Handler) GetVersion(w http.ResponseWriter, r *http.Request) {
249252
return
250253
}
251254

255+
includeDeleted := r.URL.Query().Get("deleted") == "true"
256+
252257
schema, err := h.registry.GetSchemaBySubjectVersion(r.Context(), subject, version)
253258
if err != nil {
254-
if errors.Is(err, storage.ErrSubjectNotFound) {
255-
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectNotFound, "Subject not found")
256-
return
259+
// If deleted=true and version not found, try to find the deleted version
260+
if includeDeleted && (errors.Is(err, storage.ErrVersionNotFound) || errors.Is(err, storage.ErrSubjectNotFound)) {
261+
schema, err = h.findDeletedVersion(r.Context(), subject, version)
257262
}
258-
if errors.Is(err, storage.ErrVersionNotFound) {
259-
writeError(w, http.StatusNotFound, types.ErrorCodeVersionNotFound, "Version not found")
263+
if err != nil {
264+
if errors.Is(err, storage.ErrSubjectNotFound) {
265+
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectNotFound, "Subject not found")
266+
return
267+
}
268+
if errors.Is(err, storage.ErrVersionNotFound) {
269+
writeError(w, http.StatusNotFound, types.ErrorCodeVersionNotFound, "Version not found")
270+
return
271+
}
272+
writeError(w, http.StatusInternalServerError, types.ErrorCodeInternalServerError, err.Error())
260273
return
261274
}
262-
writeError(w, http.StatusInternalServerError, types.ErrorCodeInternalServerError, err.Error())
263-
return
264275
}
265276

266277
schemaStr := schema.Schema
@@ -282,6 +293,23 @@ func (h *Handler) GetVersion(w http.ResponseWriter, r *http.Request) {
282293
writeJSON(w, http.StatusOK, resp)
283294
}
284295

296+
// findDeletedVersion looks up a soft-deleted version by iterating all versions including deleted.
297+
func (h *Handler) findDeletedVersion(ctx context.Context, subject string, version int) (*storage.SchemaRecord, error) {
298+
schemas, err := h.registry.GetSchemasBySubject(ctx, subject, true) // include deleted
299+
if err != nil {
300+
return nil, err
301+
}
302+
if len(schemas) == 0 {
303+
return nil, storage.ErrSubjectNotFound
304+
}
305+
for _, s := range schemas {
306+
if s.Version == version {
307+
return s, nil
308+
}
309+
}
310+
return nil, storage.ErrVersionNotFound
311+
}
312+
285313
// RegisterSchema handles POST /subjects/{subject}/versions
286314
func (h *Handler) RegisterSchema(w http.ResponseWriter, r *http.Request) {
287315
subject := chi.URLParam(r, "subject")
@@ -500,7 +528,7 @@ func (h *Handler) GetConfig(w http.ResponseWriter, r *http.Request) {
500528
level, err := h.registry.GetSubjectConfig(r.Context(), subject)
501529
if err != nil {
502530
if errors.Is(err, storage.ErrNotFound) {
503-
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectNotFound,
531+
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectCompatNotFound,
504532
fmt.Sprintf("Subject '%s' does not have subject-level compatibility configured", subject))
505533
return
506534
}
@@ -698,7 +726,7 @@ func (h *Handler) GetMode(w http.ResponseWriter, r *http.Request) {
698726
mode, err := h.registry.GetSubjectMode(r.Context(), subject)
699727
if err != nil {
700728
if errors.Is(err, storage.ErrNotFound) {
701-
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectNotFound,
729+
writeError(w, http.StatusNotFound, types.ErrorCodeSubjectModeNotFound,
702730
fmt.Sprintf("Subject '%s' does not have subject-level mode configured", subject))
703731
return
704732
}
@@ -753,6 +781,32 @@ func (h *Handler) SetMode(w http.ResponseWriter, r *http.Request) {
753781

754782
// parseVersion parses a version string, handling "latest" and "-1".
755783
// Returns errInvalidVersion for non-numeric strings, zero, or negative values (other than -1).
784+
// parsePagination extracts offset and limit query params and applies them to a slice length.
785+
// Returns the start and end indices for slicing.
786+
func parsePagination(r *http.Request, total int) (start, end int) {
787+
start = 0
788+
end = total
789+
790+
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
791+
if o, err := strconv.Atoi(offsetStr); err == nil && o >= 0 {
792+
start = o
793+
}
794+
}
795+
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
796+
if l, err := strconv.Atoi(limitStr); err == nil && l >= 0 {
797+
end = start + l
798+
}
799+
}
800+
801+
if start > total {
802+
start = total
803+
}
804+
if end > total {
805+
end = total
806+
}
807+
return start, end
808+
}
809+
756810
func parseVersion(s string) (int, error) {
757811
if s == "latest" || s == "-1" {
758812
return -1, nil
@@ -850,7 +904,9 @@ func (h *Handler) GetSubjectsBySchemaID(w http.ResponseWriter, r *http.Request)
850904
return
851905
}
852906

853-
writeJSON(w, http.StatusOK, subjects)
907+
// Apply pagination
908+
start, end := parsePagination(r, len(subjects))
909+
writeJSON(w, http.StatusOK, subjects[start:end])
854910
}
855911

856912
// GetVersionsBySchemaID handles GET /schemas/ids/{id}/versions
@@ -887,7 +943,9 @@ func (h *Handler) GetVersionsBySchemaID(w http.ResponseWriter, r *http.Request)
887943
})
888944
}
889945

890-
writeJSON(w, http.StatusOK, result)
946+
// Apply pagination
947+
start, end := parsePagination(r, len(result))
948+
writeJSON(w, http.StatusOK, result[start:end])
891949
}
892950

893951
// ListSchemas handles GET /schemas

internal/api/types/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ const (
136136
ErrorCodeSubjectNotSoftDeleted = 40405
137137
ErrorCodeSchemaVersionSoftDeleted = 40406
138138
ErrorCodeVersionNotSoftDeleted = 40407
139+
ErrorCodeSubjectCompatNotFound = 40408
140+
ErrorCodeSubjectModeNotFound = 40409
139141
ErrorCodeIncompatibleSchema = 409
140142
ErrorCodeInvalidSchema = 42201
141143
ErrorCodeInvalidSchemaType = 42202

internal/compatibility/protobuf/checker.go

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ func (c *Checker) checkMessageCompatibility(newMsg, oldMsg protoreflect.MessageD
171171
oldFields[int32(f.Number())] = f
172172
}
173173

174+
// Track fields that move from non-oneof into a real oneof, keyed by oneof name.
175+
// If multiple previously-independent fields land in the same oneof, that adds
176+
// a mutual exclusion constraint which is an incompatible change.
177+
fieldsMovedToOneof := make(map[string][]string) // oneof name -> list of field names
178+
174179
// Check each new field
175180
for i := 0; i < newMsg.Fields().Len(); i++ {
176181
newField := newMsg.Fields().Get(i)
@@ -187,16 +192,74 @@ func (c *Checker) checkMessageCompatibility(newMsg, oldMsg protoreflect.MessageD
187192
continue
188193
}
189194

195+
// Track fields moving into a real oneof from non-oneof
196+
oldOneof := oldField.ContainingOneof()
197+
newOneof := newField.ContainingOneof()
198+
oldIsRealOneof := oldOneof != nil && !oldOneof.IsSynthetic()
199+
newIsRealOneof := newOneof != nil && !newOneof.IsSynthetic()
200+
if !oldIsRealOneof && newIsRealOneof {
201+
oneofName := string(newOneof.Name())
202+
fieldsMovedToOneof[oneofName] = append(fieldsMovedToOneof[oneofName], string(newField.Name()))
203+
}
204+
190205
// Check field compatibility
191206
c.checkFieldCompatibility(newField, oldField, msgName, result)
192207
delete(oldFields, num)
193208
}
194209

195-
// In proto3, field removal is wire-safe: readers ignore unknown fields and use
196-
// defaults for missing ones. However, removing a field from a oneof IS
197-
// incompatible because it changes oneof semantics.
210+
// Check if fields were moved into an existing oneof.
211+
// Moving a field into a oneof that already has other pre-existing members is
212+
// incompatible (FIELD_MOVED_TO_EXISTING_ONEOF) because it adds a mutual
213+
// exclusion constraint. Also, moving multiple previously-independent fields
214+
// into the same new oneof creates a new mutual exclusion constraint.
215+
//
216+
// Build a set of old field numbers for quick lookup.
217+
oldFieldNums := make(map[int32]bool)
218+
for i := 0; i < oldMsg.Fields().Len(); i++ {
219+
oldFieldNums[int32(oldMsg.Fields().Get(i).Number())] = true
220+
}
221+
222+
for oneofName, movedFields := range fieldsMovedToOneof {
223+
if len(movedFields) > 1 {
224+
// Multiple independent fields moved into same oneof
225+
result.AddMessage("Message '%s': multiple fields moved into oneof '%s', creating mutual exclusion",
226+
msgName, oneofName)
227+
continue
228+
}
229+
// Check if the target oneof has other members that already existed in
230+
// the old schema (i.e. they're not brand new fields).
231+
movedFieldName := movedFields[0]
232+
otherPreExistingMember := false
233+
for i := 0; i < newMsg.Fields().Len(); i++ {
234+
nf := newMsg.Fields().Get(i)
235+
no := nf.ContainingOneof()
236+
if no == nil || no.IsSynthetic() || string(no.Name()) != oneofName {
237+
continue
238+
}
239+
if string(nf.Name()) == movedFieldName {
240+
continue
241+
}
242+
// Check if this field existed in the old schema by number
243+
if oldFieldNums[int32(nf.Number())] {
244+
otherPreExistingMember = true
245+
break
246+
}
247+
}
248+
if otherPreExistingMember {
249+
result.AddMessage("Message '%s': field '%s' moved into existing oneof '%s'",
250+
msgName, movedFieldName, oneofName)
251+
}
252+
}
253+
254+
// Check removed fields for compatibility issues:
255+
// 1. Removing a required field (proto2) is always incompatible
256+
// 2. Removing a field from a non-synthetic oneof is incompatible (changes oneof semantics)
257+
// 3. In proto3, non-oneof field removal is wire-safe (readers ignore unknown fields)
198258
for num, oldField := range oldFields {
199-
if oldField.ContainingOneof() != nil {
259+
if oldField.Cardinality() == protoreflect.Required {
260+
result.AddMessage("Message '%s': required field '%s' (number %d) was removed",
261+
msgName, oldField.Name(), num)
262+
} else if oldField.ContainingOneof() != nil && !oldField.ContainingOneof().IsSynthetic() {
200263
result.AddMessage("Message '%s': field '%s' (number %d) was removed from oneof",
201264
msgName, oldField.Name(), num)
202265
}
@@ -233,7 +296,14 @@ func (c *Checker) checkFieldCompatibility(newField, oldField protoreflect.FieldD
233296
if oldCard != newCard {
234297
// Some cardinality changes are compatible
235298
if oldCard == protoreflect.Optional && newCard == protoreflect.Repeated {
236-
// Optional to repeated - compatible for reading
299+
// Per protobuf spec: "For string, bytes, and message fields, optional is
300+
// compatible with repeated." These use length-delimited encoding which
301+
// is the same wire format for both singular and repeated.
302+
kind := oldField.Kind()
303+
if kind != protoreflect.StringKind && kind != protoreflect.BytesKind && kind != protoreflect.MessageKind {
304+
result.AddMessage("Message '%s': field '%s' changed from optional to repeated",
305+
msgName, fieldName)
306+
}
237307
} else if oldCard == protoreflect.Required && newCard != protoreflect.Required {
238308
// Required to optional/repeated - compatible
239309
} else if newCard == protoreflect.Required && oldCard != protoreflect.Required {
@@ -255,9 +325,20 @@ func (c *Checker) checkFieldCompatibility(newField, oldField protoreflect.FieldD
255325
// Check oneof membership changes
256326
oldOneof := oldField.ContainingOneof()
257327
newOneof := newField.ContainingOneof()
258-
if (oldOneof == nil) != (newOneof == nil) {
259-
result.AddMessage("Message '%s': field '%s' oneof membership changed",
260-
msgName, fieldName)
328+
329+
// Determine if the oneofs are "real" (non-synthetic). Proto3 optional fields
330+
// have a synthetic oneof wrapper that is invisible at the wire level.
331+
oldIsRealOneof := oldOneof != nil && !oldOneof.IsSynthetic()
332+
newIsRealOneof := newOneof != nil && !newOneof.IsSynthetic()
333+
334+
if oldIsRealOneof != newIsRealOneof {
335+
if oldIsRealOneof && !newIsRealOneof {
336+
// Moving OUT of a real oneof — incompatible (changes oneof semantics)
337+
result.AddMessage("Message '%s': field '%s' oneof membership changed",
338+
msgName, fieldName)
339+
}
340+
// Moving INTO a real oneof from non-oneof or synthetic oneof is compatible
341+
// because the field number and wire format are preserved.
261342
}
262343
}
263344

0 commit comments

Comments
 (0)