Skip to content

Commit 92a4255

Browse files
committed
refactor(cassandra): optimize storage layer with SAI indexes, batch writes, and block-based IDs
Replace RDBMS-style patterns with Cassandra-native approaches: - Add SAI indexes on subject_versions (schema_id, deleted) and schemas_by_id (fingerprint), eliminating schemas_by_fingerprint and subjects tables - Batch reference writes in CreateSchema/ImportSchema with logged batches - Batch soft-deletes in DeleteSubject with unlogged batch (same partition) - Block-based ID allocation (default block size 50) reduces LWT frequency ~50x - IN-clause batch reads in GetSchemasBySubject (2N+1 → 3 queries) - SAI queries replace O(S×V) full-table scans in GetSubjectsBySchemaID, GetVersionsBySchemaID, cleanupOrphanedSchema, findSchemaInSubject, etc. - Propagate errors in cleanup methods via slog.Warn instead of silent discard - Update conformance test to remove dropped tables from truncation list Requires Cassandra 5.0+ for SAI support. Breaking change — drops legacy tables. All 1353 BDD tests pass against Cassandra.
1 parent 073bbf8 commit 92a4255

File tree

3 files changed

+487
-471
lines changed

3 files changed

+487
-471
lines changed

internal/storage/cassandra/migrations.go

Lines changed: 86 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import (
1111
// Migrate creates/updates the Cassandra schema needed by the registry.
1212
// This is intentionally idempotent (IF NOT EXISTS everywhere).
1313
//
14+
// Requires Cassandra 5.0+ for Storage Attached Index (SAI) support.
15+
//
1416
// Design notes:
1517
// - schema_id is INT to match Confluent wire format (4-byte schema id)
16-
// - Schemas are stored in multiple lookup tables for efficient access:
17-
// - schemas_by_id: fast fetch on deserialize (by global ID)
18-
// - schemas_by_fingerprint: dedup/canonicalization (by fingerprint, global)
19-
// - subject_versions: versions within a subject
20-
// - subject_latest: avoid scanning partitions for latest version
18+
// - Schemas are stored with SAI indexes for efficient secondary lookups:
19+
// - schemas_by_id: primary lookup by global ID, SAI on fingerprint for dedup
20+
// - subject_versions: versions within a subject, SAI on schema_id + deleted
21+
// - subject_latest: track latest version per subject (also used for subject listing)
2122
//
2223
// - Block-based ID allocation reduces LWT contention
2324
// - TimeUUID for timestamps (Cassandra-native)
@@ -29,56 +30,44 @@ func Migrate(session *gocql.Session, keyspace string) error {
2930
AND durable_writes = true`, qident(keyspace)),
3031

3132
// Table 1: schemas_by_id - lookup by global schema ID
32-
// Primary lookup table for deserialization
33+
// Primary lookup table for deserialization.
34+
// SAI index on fingerprint replaces the old schemas_by_fingerprint table.
3335
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.schemas_by_id (
3436
schema_id int PRIMARY KEY,
3537
schema_type text,
3638
fingerprint text,
3739
schema_text text,
3840
canonical_text text,
39-
created_at timeuuid
40-
)`, qident(keyspace)),
41-
42-
// Table 2: schemas_by_fingerprint - global deduplication
43-
// Fingerprint is globally unique, so it's the partition key
44-
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.schemas_by_fingerprint (
45-
fingerprint text PRIMARY KEY,
46-
schema_id int,
47-
schema_type text,
48-
schema_text text,
49-
canonical_text text,
50-
created_at timeuuid
41+
created_at timeuuid,
42+
metadata text,
43+
ruleset text
5144
)`, qident(keyspace)),
5245

53-
// Table 3: subject_versions - versions within a subject
54-
// Partitioned by subject for efficient queries
46+
// Table 2: subject_versions - versions within a subject
47+
// Partitioned by subject for efficient queries.
48+
// SAI indexes on schema_id and deleted enable cross-partition lookups.
5549
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.subject_versions (
5650
subject text,
5751
version int,
5852
schema_id int,
5953
deleted boolean,
6054
created_at timeuuid,
55+
metadata text,
56+
ruleset text,
6157
PRIMARY KEY ((subject), version)
6258
) WITH CLUSTERING ORDER BY (version ASC)`, qident(keyspace)),
6359

64-
// Table 4: subject_latest - track latest version per subject
65-
// Avoids scanning partitions to find latest
60+
// Table 3: subject_latest - track latest version per subject
61+
// Avoids scanning partitions to find latest. Also serves as subject listing
62+
// (replaces the old bucketed subjects table).
6663
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.subject_latest (
6764
subject text PRIMARY KEY,
6865
latest_version int,
6966
latest_schema_id int,
7067
updated_at timeuuid
7168
)`, qident(keyspace)),
7269

73-
// Table 5: subjects - bucketed subject listing
74-
// Avoids expensive DISTINCT queries
75-
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.subjects (
76-
bucket int,
77-
subject text,
78-
PRIMARY KEY ((bucket), subject)
79-
)`, qident(keyspace)),
80-
81-
// Table 6: schema_references - schema dependencies
70+
// Table 4: schema_references - schema dependencies
8271
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.schema_references (
8372
schema_id int,
8473
name text,
@@ -87,7 +76,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
8776
PRIMARY KEY ((schema_id), name)
8877
)`, qident(keyspace)),
8978

90-
// Table 7: references_by_target - reverse lookup for "referenced by"
79+
// Table 5: references_by_target - reverse lookup for "referenced by"
9180
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.references_by_target (
9281
ref_subject text,
9382
ref_version int,
@@ -96,35 +85,51 @@ func Migrate(session *gocql.Session, keyspace string) error {
9685
PRIMARY KEY ((ref_subject, ref_version), schema_subject, schema_version)
9786
)`, qident(keyspace)),
9887

99-
// Table 8: subject_configs - compatibility configuration per subject
88+
// Table 6: subject_configs - compatibility configuration per subject
10089
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.subject_configs (
101-
subject text PRIMARY KEY,
102-
compatibility text,
103-
updated_at timeuuid
90+
subject text PRIMARY KEY,
91+
compatibility text,
92+
alias text,
93+
normalize boolean,
94+
validate_fields boolean,
95+
default_metadata text,
96+
override_metadata text,
97+
default_ruleset text,
98+
override_ruleset text,
99+
compatibility_group text,
100+
updated_at timeuuid
104101
)`, qident(keyspace)),
105102

106-
// Table 9: global_config - global compatibility configuration
103+
// Table 7: global_config - global compatibility configuration
107104
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.global_config (
108-
key text PRIMARY KEY,
109-
compatibility text,
110-
updated_at timeuuid
105+
key text PRIMARY KEY,
106+
compatibility text,
107+
alias text,
108+
normalize boolean,
109+
validate_fields boolean,
110+
default_metadata text,
111+
override_metadata text,
112+
default_ruleset text,
113+
override_ruleset text,
114+
compatibility_group text,
115+
updated_at timeuuid
111116
)`, qident(keyspace)),
112117

113-
// Table 10: modes - registry running mode (READWRITE/READONLY/etc)
118+
// Table 8: modes - registry running mode (READWRITE/READONLY/etc)
114119
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.modes (
115120
key text PRIMARY KEY,
116121
mode text,
117122
updated_at timeuuid
118123
)`, qident(keyspace)),
119124

120-
// Table 11: id_alloc - block-based ID allocation
125+
// Table 9: id_alloc - block-based ID allocation
121126
// Uses LWT for atomic block reservation
122127
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.id_alloc (
123128
name text PRIMARY KEY,
124129
next_id int
125130
)`, qident(keyspace)),
126131

127-
// Table 12: users_by_id - user records
132+
// Table 10: users_by_id - user records
128133
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.users_by_id (
129134
user_id bigint PRIMARY KEY,
130135
email text,
@@ -136,7 +141,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
136141
updated_at timeuuid
137142
)`, qident(keyspace)),
138143

139-
// Table 13: users_by_email - lookup by email (used as users_by_username)
144+
// Table 11: users_by_email - lookup by email (used as users_by_username)
140145
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.users_by_email (
141146
email text PRIMARY KEY,
142147
user_id bigint,
@@ -148,7 +153,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
148153
updated_at timeuuid
149154
)`, qident(keyspace)),
150155

151-
// Table 14: api_keys_by_id - API key records
156+
// Table 12: api_keys_by_id - API key records
152157
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.api_keys_by_id (
153158
api_key_id bigint PRIMARY KEY,
154159
user_id bigint,
@@ -162,7 +167,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
162167
last_used timestamp
163168
)`, qident(keyspace)),
164169

165-
// Table 15: api_keys_by_user - lookup by user
170+
// Table 13: api_keys_by_user - lookup by user
166171
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.api_keys_by_user (
167172
user_id bigint,
168173
api_key_id bigint,
@@ -177,7 +182,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
177182
PRIMARY KEY ((user_id), api_key_id)
178183
)`, qident(keyspace)),
179184

180-
// Table 16: api_keys_by_hash - lookup by hash for authentication
185+
// Table 14: api_keys_by_hash - lookup by hash for authentication
181186
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.api_keys_by_hash (
182187
api_key_hash text PRIMARY KEY,
183188
api_key_id bigint,
@@ -198,7 +203,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
198203
}
199204
}
200205

201-
// ALTER TABLE migrations — add new columns for metadata, ruleset, and config fields.
206+
// ALTER TABLE migrations — add new columns to tables that may already exist.
202207
// Each ALTER is executed individually because Cassandra returns an error if the
203208
// column already exists. We silently ignore "already exist" errors to stay idempotent.
204209
alterStmts := []string{
@@ -217,6 +222,8 @@ func Migrate(session *gocql.Session, keyspace string) error {
217222
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD override_metadata text`, qident(keyspace)),
218223
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD default_ruleset text`, qident(keyspace)),
219224
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD override_ruleset text`, qident(keyspace)),
225+
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD compatibility_group text`, qident(keyspace)),
226+
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD validate_fields boolean`, qident(keyspace)),
220227

221228
// global_config: same config fields
222229
fmt.Sprintf(`ALTER TABLE %s.global_config ADD normalize boolean`, qident(keyspace)),
@@ -225,13 +232,7 @@ func Migrate(session *gocql.Session, keyspace string) error {
225232
fmt.Sprintf(`ALTER TABLE %s.global_config ADD override_metadata text`, qident(keyspace)),
226233
fmt.Sprintf(`ALTER TABLE %s.global_config ADD default_ruleset text`, qident(keyspace)),
227234
fmt.Sprintf(`ALTER TABLE %s.global_config ADD override_ruleset text`, qident(keyspace)),
228-
229-
// subject_configs and global_config: compatibility_group
230-
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD compatibility_group text`, qident(keyspace)),
231235
fmt.Sprintf(`ALTER TABLE %s.global_config ADD compatibility_group text`, qident(keyspace)),
232-
233-
// subject_configs and global_config: validate_fields
234-
fmt.Sprintf(`ALTER TABLE %s.subject_configs ADD validate_fields boolean`, qident(keyspace)),
235236
fmt.Sprintf(`ALTER TABLE %s.global_config ADD validate_fields boolean`, qident(keyspace)),
236237
}
237238
for _, stmt := range alterStmts {
@@ -244,6 +245,37 @@ func Migrate(session *gocql.Session, keyspace string) error {
244245
}
245246
}
246247

248+
// SAI indexes (Cassandra 5.0+ required) — replace the old schemas_by_fingerprint
249+
// and subjects tables with efficient secondary lookups.
250+
saiStmts := []string{
251+
// Fingerprint lookup on schemas_by_id — eliminates schemas_by_fingerprint table
252+
fmt.Sprintf(`CREATE CUSTOM INDEX IF NOT EXISTS idx_schemas_fingerprint ON %s.schemas_by_id (fingerprint) USING 'StorageAttachedIndex'`, qident(keyspace)),
253+
// Schema ID lookup on subject_versions — enables O(1) GetSubjectsBySchemaID/GetVersionsBySchemaID
254+
fmt.Sprintf(`CREATE CUSTOM INDEX IF NOT EXISTS idx_sv_schema_id ON %s.subject_versions (schema_id) USING 'StorageAttachedIndex'`, qident(keyspace)),
255+
// Deleted flag lookup on subject_versions — enables efficient non-deleted filtering
256+
fmt.Sprintf(`CREATE CUSTOM INDEX IF NOT EXISTS idx_sv_deleted ON %s.subject_versions (deleted) USING 'StorageAttachedIndex'`, qident(keyspace)),
257+
}
258+
for _, stmt := range saiStmts {
259+
if err := session.Query(stmt).Exec(); err != nil {
260+
// Ignore "already exists" for idempotency
261+
if !strings.Contains(err.Error(), "already exist") {
262+
return fmt.Errorf("cassandra migrate failed: %w (stmt=%s)", err, oneLine(stmt))
263+
}
264+
}
265+
}
266+
267+
// Drop legacy tables replaced by SAI indexes (breaking change).
268+
// These tables are no longer used by the storage layer.
269+
dropStmts := []string{
270+
fmt.Sprintf(`DROP TABLE IF EXISTS %s.schemas_by_fingerprint`, qident(keyspace)),
271+
fmt.Sprintf(`DROP TABLE IF EXISTS %s.subjects`, qident(keyspace)),
272+
}
273+
for _, stmt := range dropStmts {
274+
if err := session.Query(stmt).Exec(); err != nil {
275+
return fmt.Errorf("cassandra migrate failed: %w (stmt=%s)", err, oneLine(stmt))
276+
}
277+
}
278+
247279
// Initialize allocator row
248280
if err := session.Query(
249281
fmt.Sprintf(`INSERT INTO %s.id_alloc (name, next_id) VALUES (?, ?) IF NOT EXISTS`, qident(keyspace)),

0 commit comments

Comments
 (0)