Skip to content

Commit 9d50c65

Browse files
committed
fix: resolve DB backend issues for metadata, references, and JSON serialization
- PostgreSQL: return *string from marshalJSONNullable so pq driver sends text (not bytea) for JSONB columns, fixing "invalid input syntax for type json" errors - Cassandra: handle typed nil pointers in marshalJSONText to prevent storing "null" text for nil metadata/ruleSet; read metadata/ruleSet in GetSchemasBySubject for compatibility group filtering; add cleanupReferencesByTarget on permanent delete - MySQL: handle typed nil pointers in marshalJSON
1 parent c37d5dd commit 9d50c65

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

internal/storage/cassandra/store.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"errors"
1010
"fmt"
11+
"reflect"
1112
"sort"
1213
"strings"
1314
"time"
@@ -771,20 +772,22 @@ func (s *Store) GetSchemaBySubjectVersion(ctx context.Context, subject string, v
771772
// GetSchemasBySubject retrieves all schemas for a subject.
772773
func (s *Store) GetSchemasBySubject(ctx context.Context, subject string, includeDeleted bool) ([]*storage.SchemaRecord, error) {
773774
iter := s.readQuery(
774-
fmt.Sprintf(`SELECT version, schema_id, deleted, created_at FROM %s.subject_versions WHERE subject = ?`, qident(s.cfg.Keyspace)),
775+
fmt.Sprintf(`SELECT version, schema_id, deleted, created_at, metadata, ruleset FROM %s.subject_versions WHERE subject = ?`, qident(s.cfg.Keyspace)),
775776
subject,
776777
).WithContext(ctx).Iter()
777778

778779
type versionInfo struct {
779-
version int
780-
schemaID int
781-
deleted bool
782-
createdAt gocql.UUID
780+
version int
781+
schemaID int
782+
deleted bool
783+
createdAt gocql.UUID
784+
metadataStr string
785+
rulesetStr string
783786
}
784787
var entries []versionInfo
785788
var vi versionInfo
786789
hasAnyRows := false
787-
for iter.Scan(&vi.version, &vi.schemaID, &vi.deleted, &vi.createdAt) {
790+
for iter.Scan(&vi.version, &vi.schemaID, &vi.deleted, &vi.createdAt, &vi.metadataStr, &vi.rulesetStr) {
788791
hasAnyRows = true
789792
if includeDeleted || !vi.deleted {
790793
entries = append(entries, vi)
@@ -813,6 +816,13 @@ func (s *Store) GetSchemasBySubject(ctx context.Context, subject string, include
813816
rec.Version = e.version
814817
rec.Deleted = e.deleted
815818
rec.CreatedAt = e.createdAt.Time()
819+
// Overlay per-version metadata/ruleset from subject_versions
820+
if m := unmarshalJSONText[storage.Metadata](e.metadataStr); m != nil {
821+
rec.Metadata = m
822+
}
823+
if r := unmarshalJSONText[storage.RuleSet](e.rulesetStr); r != nil {
824+
rec.RuleSet = r
825+
}
816826
out = append(out, rec)
817827
}
818828
return out, nil
@@ -982,6 +992,8 @@ func (s *Store) DeleteSchema(ctx context.Context, subject string, version int, p
982992
if !deleted {
983993
return storage.ErrVersionNotSoftDeleted
984994
}
995+
// Clean up references_by_target for any schemas this version references
996+
s.cleanupReferencesByTarget(ctx, existingSchemaID, subject, version)
985997
if err := s.writeQuery(
986998
fmt.Sprintf(`DELETE FROM %s.subject_versions WHERE subject = ? AND version = ?`, qident(s.cfg.Keyspace)),
987999
subject, version,
@@ -1044,6 +1056,27 @@ func (s *Store) cleanupOrphanedSchema(ctx context.Context, schemaID int) {
10441056
).WithContext(ctx).Exec()
10451057
}
10461058

1059+
// cleanupReferencesByTarget removes references_by_target entries for a schema
1060+
// that is being permanently deleted. This ensures other schemas can be deleted
1061+
// after their referrers are removed.
1062+
func (s *Store) cleanupReferencesByTarget(ctx context.Context, schemaID int, subject string, version int) {
1063+
// Read all references this schema has
1064+
refIter := s.readQuery(
1065+
fmt.Sprintf(`SELECT ref_subject, ref_version FROM %s.schema_references WHERE schema_id = ?`, qident(s.cfg.Keyspace)),
1066+
schemaID,
1067+
).WithContext(ctx).Iter()
1068+
var refSubject string
1069+
var refVersion int
1070+
for refIter.Scan(&refSubject, &refVersion) {
1071+
// Delete the reverse lookup entry
1072+
_ = s.writeQuery(
1073+
fmt.Sprintf(`DELETE FROM %s.references_by_target WHERE ref_subject = ? AND ref_version = ? AND schema_subject = ? AND schema_version = ?`, qident(s.cfg.Keyspace)),
1074+
refSubject, refVersion, subject, version,
1075+
).WithContext(ctx).Exec()
1076+
}
1077+
refIter.Close()
1078+
}
1079+
10471080
// ---------- Subject Operations ----------
10481081

10491082
// ListSubjects returns all subjects.
@@ -2105,6 +2138,11 @@ func marshalJSONText(v interface{}) string {
21052138
if v == nil {
21062139
return ""
21072140
}
2141+
// Handle typed nil pointers (e.g., (*storage.Metadata)(nil) passed as interface{})
2142+
rv := reflect.ValueOf(v)
2143+
if rv.Kind() == reflect.Ptr && rv.IsNil() {
2144+
return ""
2145+
}
21082146
data, err := json.Marshal(v)
21092147
if err != nil {
21102148
return ""

internal/storage/mysql/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"fmt"
88
"math"
9+
"reflect"
910
"time"
1011

1112
_ "github.com/go-sql-driver/mysql"
@@ -1895,6 +1896,11 @@ func marshalJSON(v interface{}) ([]byte, error) {
18951896
if v == nil {
18961897
return nil, nil
18971898
}
1899+
// Handle typed nil pointers (e.g., (*storage.Metadata)(nil) passed as interface{})
1900+
rv := reflect.ValueOf(v)
1901+
if rv.Kind() == reflect.Ptr && rv.IsNil() {
1902+
return nil, nil
1903+
}
18981904
return json.Marshal(v)
18991905
}
19001906

internal/storage/postgres/store.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1883,7 +1883,8 @@ func (s *Store) scanAPIKeys(rows *sql.Rows) ([]*storage.APIKeyRecord, error) {
18831883

18841884
// marshalJSONNullable marshals a value to JSON, returning nil if the input is nil.
18851885
// It handles both untyped nil and typed nil pointers (e.g., (*Metadata)(nil)).
1886-
func marshalJSONNullable(v interface{}) ([]byte, error) {
1886+
// Returns *string so the pq driver sends it as text (not bytea) for JSONB columns.
1887+
func marshalJSONNullable(v interface{}) (*string, error) {
18871888
if v == nil {
18881889
return nil, nil
18891890
}
@@ -1892,7 +1893,12 @@ func marshalJSONNullable(v interface{}) ([]byte, error) {
18921893
if rv.Kind() == reflect.Ptr && rv.IsNil() {
18931894
return nil, nil
18941895
}
1895-
return json.Marshal(v)
1896+
data, err := json.Marshal(v)
1897+
if err != nil {
1898+
return nil, err
1899+
}
1900+
s := string(data)
1901+
return &s, nil
18961902
}
18971903

18981904
// unmarshalMetadata unmarshals a nullable JSON byte slice into a *storage.Metadata.

0 commit comments

Comments
 (0)