Skip to content

Commit 23fe5e5

Browse files
committed
fix: address PR #263 feedback — Confluent compatibility and data integrity
Fixes 11 confirmed issues from PR review: - Issues 1-2: Add schema_fingerprints table to PostgreSQL and MySQL for stable global schema IDs and reference preservation after permanent delete - Issues 3-4: Enforce IMPORT mode for explicit ID registration and bulk import (error 42205) - Issue 5: Propagate mode check errors instead of failing open - Issue 7: Guard SetNextID against sequence rewind after import - Issue 8: Include soft-deleted versions when computing next version in RegisterSchemaWithID - Issue 9: Handle "latest" sentinel in findDeletedVersion for GET version?deleted=true - Issue 10: Add external reference resolution to JSON Schema compatibility checker - Issue 11: Fix Cassandra GetMaxSchemaID to query actual max instead of block allocator ceiling Also adds BDD conformance tests covering all fixes (pr_fixes_conformance.feature) and updates existing import feature files for IMPORT mode enforcement.
1 parent 5de8dba commit 23fe5e5

File tree

16 files changed

+898
-93
lines changed

16 files changed

+898
-93
lines changed

internal/api/handlers/handlers.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,20 @@ func NewWithConfig(reg *registry.Registry, cfg Config) *Handler {
6767
}
6868

6969
// checkModeForWrite checks if the current mode allows write operations for the given subject.
70-
// Returns an error message if writes are blocked, or empty string if allowed.
70+
// Returns:
71+
// - mode string: non-empty if writes are blocked by READONLY or READONLY_OVERRIDE
72+
// - error: non-nil if the mode check itself failed (e.g. storage unreachable)
73+
//
7174
// Both READONLY and READONLY_OVERRIDE block data and config writes.
72-
func (h *Handler) checkModeForWrite(r *http.Request, subject string) string {
73-
mode, _ := h.registry.GetMode(r.Context(), subject)
75+
func (h *Handler) checkModeForWrite(r *http.Request, subject string) (string, error) {
76+
mode, err := h.registry.GetMode(r.Context(), subject)
77+
if err != nil {
78+
return "", fmt.Errorf("failed to check mode: %w", err)
79+
}
7480
if mode == "READONLY" || mode == "READONLY_OVERRIDE" {
75-
return mode
81+
return mode, nil
7682
}
77-
return ""
83+
return "", nil
7884
}
7985

8086
// resolveAlias resolves a subject alias. If the subject has an alias configured,
@@ -319,6 +325,8 @@ func (h *Handler) GetVersion(w http.ResponseWriter, r *http.Request) {
319325
}
320326

321327
// findDeletedVersion looks up a soft-deleted version by iterating all versions including deleted.
328+
// When version is -1 (the "latest" sentinel), it returns the highest-versioned schema
329+
// among all versions (including soft-deleted), matching Confluent's behavior.
322330
func (h *Handler) findDeletedVersion(ctx context.Context, subject string, version int) (*storage.SchemaRecord, error) {
323331
schemas, err := h.registry.GetSchemasBySubject(ctx, subject, true) // include deleted
324332
if err != nil {
@@ -327,6 +335,21 @@ func (h *Handler) findDeletedVersion(ctx context.Context, subject string, versio
327335
if len(schemas) == 0 {
328336
return nil, storage.ErrSubjectNotFound
329337
}
338+
339+
// "latest" sentinel: return the highest version among all (including deleted)
340+
if version == -1 {
341+
var latest *storage.SchemaRecord
342+
for _, s := range schemas {
343+
if latest == nil || s.Version > latest.Version {
344+
latest = s
345+
}
346+
}
347+
if latest != nil {
348+
return latest, nil
349+
}
350+
return nil, storage.ErrVersionNotFound
351+
}
352+
330353
for _, s := range schemas {
331354
if s.Version == version {
332355
return s, nil
@@ -357,7 +380,10 @@ func (h *Handler) RegisterSchema(w http.ResponseWriter, r *http.Request) {
357380
subject := h.resolveAlias(r.Context(), chi.URLParam(r, "subject"))
358381

359382
// Check mode enforcement
360-
if mode := h.checkModeForWrite(r, subject); mode != "" {
383+
if mode, modeErr := h.checkModeForWrite(r, subject); modeErr != nil {
384+
writeError(w, http.StatusInternalServerError, types.ErrorCodeStorageError, modeErr.Error())
385+
return
386+
} else if mode != "" {
361387
writeError(w, http.StatusUnprocessableEntity, types.ErrorCodeOperationNotPermitted,
362388
fmt.Sprintf("Subject '%s' is in %s mode", subject, mode))
363389
return
@@ -384,8 +410,18 @@ func (h *Handler) RegisterSchema(w http.ResponseWriter, r *http.Request) {
384410
var schema *storage.SchemaRecord
385411
var err error
386412

387-
// In IMPORT mode with explicit ID, use import path
413+
// Explicit ID requires IMPORT mode (Confluent behavior)
388414
if req.ID > 0 {
415+
mode, modeErr := h.registry.GetMode(r.Context(), subject)
416+
if modeErr != nil {
417+
writeError(w, http.StatusInternalServerError, types.ErrorCodeStorageError, "Failed to check mode")
418+
return
419+
}
420+
if mode != "IMPORT" {
421+
writeError(w, http.StatusUnprocessableEntity, types.ErrorCodeOperationNotPermitted,
422+
fmt.Sprintf("Subject '%s' is not in import mode. Registering schemas with explicit IDs requires IMPORT mode.", subject))
423+
return
424+
}
389425
schema, err = h.registry.RegisterSchemaWithID(r.Context(), subject, req.Schema, schemaType, req.References, req.ID)
390426
} else {
391427
schema, err = h.registry.RegisterSchema(r.Context(), subject, req.Schema, schemaType, req.References, registry.RegisterOpts{
@@ -491,7 +527,10 @@ func (h *Handler) DeleteSubject(w http.ResponseWriter, r *http.Request) {
491527
permanent := r.URL.Query().Get("permanent") == "true"
492528

493529
// Check mode enforcement
494-
if mode := h.checkModeForWrite(r, subject); mode != "" {
530+
if mode, modeErr := h.checkModeForWrite(r, subject); modeErr != nil {
531+
writeError(w, http.StatusInternalServerError, types.ErrorCodeStorageError, modeErr.Error())
532+
return
533+
} else if mode != "" {
495534
writeError(w, http.StatusUnprocessableEntity, types.ErrorCodeOperationNotPermitted,
496535
fmt.Sprintf("Subject '%s' is in %s mode", subject, mode))
497536
return
@@ -531,7 +570,10 @@ func (h *Handler) DeleteVersion(w http.ResponseWriter, r *http.Request) {
531570
permanent := r.URL.Query().Get("permanent") == "true"
532571

533572
// Check mode enforcement
534-
if mode := h.checkModeForWrite(r, subject); mode != "" {
573+
if mode, modeErr := h.checkModeForWrite(r, subject); modeErr != nil {
574+
writeError(w, http.StatusInternalServerError, types.ErrorCodeStorageError, modeErr.Error())
575+
return
576+
} else if mode != "" {
535577
writeError(w, http.StatusUnprocessableEntity, types.ErrorCodeOperationNotPermitted,
536578
fmt.Sprintf("Subject '%s' is in %s mode", subject, mode))
537579
return
@@ -1083,6 +1125,18 @@ func (h *Handler) ListSchemas(w http.ResponseWriter, r *http.Request) {
10831125

10841126
// ImportSchemas handles POST /import/schemas
10851127
func (h *Handler) ImportSchemas(w http.ResponseWriter, r *http.Request) {
1128+
// Bulk import requires IMPORT mode (Confluent behavior)
1129+
mode, modeErr := h.registry.GetMode(r.Context(), "")
1130+
if modeErr != nil {
1131+
writeError(w, http.StatusInternalServerError, types.ErrorCodeStorageError, "Failed to check mode")
1132+
return
1133+
}
1134+
if mode != "IMPORT" {
1135+
writeError(w, http.StatusUnprocessableEntity, types.ErrorCodeOperationNotPermitted,
1136+
"Import is not permitted. The registry must be in IMPORT mode to import schemas.")
1137+
return
1138+
}
1139+
10861140
var req types.ImportSchemasRequest
10871141
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
10881142
writeError(w, http.StatusBadRequest, types.ErrorCodeInvalidSchema, "Invalid request body")

internal/api/handlers/handlers_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,25 @@ func registerSchema(t *testing.T, h *Handler, subject, schemaStr string) int64 {
5353
return resp.ID
5454
}
5555

56+
// setImportMode sets the global mode to IMPORT via the handler.
57+
func setImportMode(t *testing.T, h *Handler) {
58+
t.Helper()
59+
modeReq := types.ModeRequest{Mode: "IMPORT"}
60+
modeBytes, _ := json.Marshal(modeReq)
61+
62+
r := chi.NewRouter()
63+
r.Put("/mode", h.SetMode)
64+
65+
req := httptest.NewRequest("PUT", "/mode?force=true", bytes.NewReader(modeBytes))
66+
req.Header.Set("Content-Type", "application/json")
67+
w := httptest.NewRecorder()
68+
r.ServeHTTP(w, req)
69+
70+
if w.Code != http.StatusOK {
71+
t.Fatalf("setImportMode failed: %d %s", w.Code, w.Body.String())
72+
}
73+
}
74+
5675
func decodeErrorResponse(t *testing.T, w *httptest.ResponseRecorder) types.ErrorResponse {
5776
t.Helper()
5877
var resp types.ErrorResponse
@@ -1389,6 +1408,7 @@ func TestListSchemas_WithSchemas(t *testing.T) {
13891408

13901409
func TestImportSchemas_Success(t *testing.T) {
13911410
h := setupTestHandler(t)
1411+
setImportMode(t, h)
13921412

13931413
r := chi.NewRouter()
13941414
r.Post("/import/schemas", h.ImportSchemas)
@@ -1422,6 +1442,7 @@ func TestImportSchemas_Success(t *testing.T) {
14221442

14231443
func TestImportSchemas_EmptyList(t *testing.T) {
14241444
h := setupTestHandler(t)
1445+
setImportMode(t, h)
14251446

14261447
r := chi.NewRouter()
14271448
r.Post("/import/schemas", h.ImportSchemas)
@@ -1441,6 +1462,7 @@ func TestImportSchemas_EmptyList(t *testing.T) {
14411462

14421463
func TestImportSchemas_InvalidBody(t *testing.T) {
14431464
h := setupTestHandler(t)
1465+
setImportMode(t, h)
14441466

14451467
r := chi.NewRouter()
14461468
r.Post("/import/schemas", h.ImportSchemas)
@@ -1457,6 +1479,7 @@ func TestImportSchemas_InvalidBody(t *testing.T) {
14571479

14581480
func TestImportSchemas_DuplicateID(t *testing.T) {
14591481
h := setupTestHandler(t)
1482+
setImportMode(t, h)
14601483

14611484
r := chi.NewRouter()
14621485
r.Post("/import/schemas", h.ImportSchemas)

internal/api/server_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,9 +558,26 @@ func TestServer_RegisterCompatibleSchema(t *testing.T) {
558558
}
559559
}
560560

561+
// setGlobalMode sets the global mode via the API (e.g., "IMPORT", "READWRITE").
562+
func setGlobalMode(t *testing.T, server *Server, mode string) {
563+
t.Helper()
564+
modeReq := types.ModeRequest{Mode: mode}
565+
modeBytes, _ := json.Marshal(modeReq)
566+
req := httptest.NewRequest("PUT", "/mode?force=true", bytes.NewReader(modeBytes))
567+
req.Header.Set("Content-Type", "application/json")
568+
w := httptest.NewRecorder()
569+
server.ServeHTTP(w, req)
570+
if w.Code != http.StatusOK {
571+
t.Fatalf("Failed to set mode to %s: %d %s", mode, w.Code, w.Body.String())
572+
}
573+
}
574+
561575
func TestServer_ImportSchemas(t *testing.T) {
562576
server := setupTestServer(t)
563577

578+
// Set IMPORT mode (required for import operations)
579+
setGlobalMode(t, server, "IMPORT")
580+
564581
// Import schemas with specific IDs
565582
importReq := types.ImportSchemasRequest{
566583
Schemas: []types.ImportSchemaRequest{
@@ -629,6 +646,9 @@ func TestServer_ImportSchemas(t *testing.T) {
629646
t.Errorf("Expected schema ID 42, got %d", versionResp.ID)
630647
}
631648

649+
// Switch back to READWRITE for normal registration
650+
setGlobalMode(t, server, "READWRITE")
651+
632652
// New schema registration should get an ID after the imported ones
633653
newSchema := `{"type": "record", "name": "Product", "fields": [{"name": "product_id", "type": "long"}]}`
634654
registerBody := types.RegisterSchemaRequest{Schema: newSchema}
@@ -654,6 +674,9 @@ func TestServer_ImportSchemas(t *testing.T) {
654674
func TestServer_ImportSchemas_DuplicateID(t *testing.T) {
655675
server := setupTestServer(t)
656676

677+
// Set IMPORT mode (required for import operations)
678+
setGlobalMode(t, server, "IMPORT")
679+
657680
// Import first schema
658681
importReq := types.ImportSchemasRequest{
659682
Schemas: []types.ImportSchemaRequest{
@@ -715,6 +738,9 @@ func TestServer_ImportSchemas_DuplicateID(t *testing.T) {
715738
func TestServer_ImportSchemas_InvalidSchema(t *testing.T) {
716739
server := setupTestServer(t)
717740

741+
// Set IMPORT mode (required for import operations)
742+
setGlobalMode(t, server, "IMPORT")
743+
718744
// Try to import invalid schema
719745
importReq := types.ImportSchemasRequest{
720746
Schemas: []types.ImportSchemaRequest{
@@ -752,6 +778,9 @@ func TestServer_ImportSchemas_InvalidSchema(t *testing.T) {
752778
func TestServer_ImportSchemas_EmptyRequest(t *testing.T) {
753779
server := setupTestServer(t)
754780

781+
// Set IMPORT mode (required for import operations)
782+
setGlobalMode(t, server, "IMPORT")
783+
755784
// Try to import empty list
756785
importReq := types.ImportSchemasRequest{
757786
Schemas: []types.ImportSchemaRequest{},

internal/api/types/types.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,8 @@ const (
163163
ErrorCodeSubjectCompatNotFound = 40408
164164
ErrorCodeSubjectModeNotFound = 40409
165165
ErrorCodeIncompatibleSchema = 409
166-
ErrorCodeInvalidSchema = 42201
167-
ErrorCodeInvalidSchemaType = 42202
168-
ErrorCodeInvalidVersion = 42202 // Confluent uses 42202 for both invalid schema type and invalid version
166+
ErrorCodeInvalidSchema = 42201
167+
ErrorCodeInvalidVersion = 42202
169168
ErrorCodeInvalidCompatibilityLevel = 42203
170169
ErrorCodeInvalidMode = 42204
171170
ErrorCodeOperationNotPermitted = 42205

0 commit comments

Comments
 (0)