Skip to content

Commit a599798

Browse files
committed
fix: relax fingerprint claim in Cassandra ImportSchema and add cleanup
Import mode preserves external IDs, so the same schema content can legitimately have different IDs across subjects/imports. The fingerprint LWT claim should not reject these — it's for CreateSchema dedup only. Also add schema_fingerprints to BDD Cassandra cleanup truncation list.
1 parent a36a0a7 commit a599798

File tree

2 files changed

+6
-11
lines changed

2 files changed

+6
-11
lines changed

internal/storage/cassandra/store.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -386,16 +386,11 @@ func (s *Store) ImportSchema(ctx context.Context, record *storage.SchemaRecord)
386386

387387
// Insert schema content if this is a new ID
388388
if !idExists {
389-
// Claim fingerprint atomically via LWT — prevents a concurrent CreateSchema
390-
// from allocating a different ID for the same schema content.
391-
applied, existingFpID, fpErr := s.claimFingerprint(ctx, fp, record.ID)
392-
if fpErr != nil {
393-
return fmt.Errorf("failed to claim fingerprint for import: %w", fpErr)
394-
}
395-
if !applied && existingFpID != record.ID {
396-
// Fingerprint already claimed by a different schema_id — conflict
397-
return storage.ErrSchemaIDConflict
398-
}
389+
// For imports, claim fingerprint but don't reject on conflict — import mode
390+
// preserves external IDs, so the same schema content (e.g. {"type":"string"})
391+
// can legitimately appear with different IDs across different subjects/imports.
392+
// The fingerprint table is primarily for CreateSchema dedup, not import enforcement.
393+
_, _, _ = s.claimFingerprint(ctx, fp, record.ID)
399394

400395
if err := s.writeQuery(
401396
fmt.Sprintf(`INSERT INTO %s.schemas_by_id (schema_id, schema_type, fingerprint, schema_text, canonical_text, created_at, metadata, ruleset)

tests/bdd/bdd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func cleanCassandra() error {
330330
"id_alloc", "modes", "global_config", "subject_configs",
331331
"references_by_target", "schema_references",
332332
"subject_latest", "subject_versions",
333-
"schemas_by_id",
333+
"schemas_by_id", "schema_fingerprints",
334334
}
335335
for _, t := range tables {
336336
if err := session.Query("TRUNCATE " + t).Exec(); err != nil {

0 commit comments

Comments
 (0)