Skip to content

Commit 278c8a9

Browse files
committed
Fix Database Schema migration related changes
1 parent 8fa1f34 commit 278c8a9

File tree

3 files changed

+13
-136
lines changed

3 files changed

+13
-136
lines changed

gateway/gateway-controller/pkg/storage/gateway-controller-db.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ CREATE INDEX IF NOT EXISTS idx_api_key_status ON api_keys(status);
150150
CREATE INDEX IF NOT EXISTS idx_api_key_expiry ON api_keys(expires_at);
151151
CREATE INDEX IF NOT EXISTS idx_created_by ON api_keys(created_by);
152152

153-
-- EventHub: Organization States Table (added in schema version 9)
153+
-- EventHub: Organization States Table (added in schema version 6)
154154
-- Tracks version information per organization for change detection
155155
CREATE TABLE IF NOT EXISTS organization_states (
156156
organization TEXT PRIMARY KEY,
@@ -160,7 +160,7 @@ CREATE TABLE IF NOT EXISTS organization_states (
160160

161161
CREATE INDEX IF NOT EXISTS idx_organization_states_updated ON organization_states(updated_at);
162162

163-
-- EventHub: Unified Events Table (added in schema version 9)
163+
-- EventHub: Unified Events Table (added in schema version 6)
164164
-- Stores all entity change events (APIs, certificates, LLM templates, etc.)
165165
CREATE TABLE IF NOT EXISTS events (
166166
organization_id TEXT NOT NULL,
@@ -177,5 +177,5 @@ CREATE TABLE IF NOT EXISTS events (
177177
CREATE INDEX IF NOT EXISTS idx_events_lookup ON events(organization_id, processed_timestamp);
178178
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
179179

180-
-- Set schema version to 9
181-
PRAGMA user_version = 9;
180+
-- Set schema version to 6
181+
PRAGMA user_version = 6;

gateway/gateway-controller/pkg/storage/sqlite.go

Lines changed: 8 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -215,120 +215,10 @@ func (s *SQLiteStorage) initSchema() error {
215215
}
216216

217217
if version == 5 {
218-
// Add event tables for EventHub
219-
if _, err := s.db.Exec(`CREATE TABLE IF NOT EXISTS api_events (
220-
id INTEGER PRIMARY KEY AUTOINCREMENT,
221-
organization_id TEXT NOT NULL,
222-
processed_timestamp TIMESTAMP NOT NULL,
223-
originated_timestamp TIMESTAMP NOT NULL,
224-
event_data TEXT NOT NULL
225-
);`); err != nil {
226-
return fmt.Errorf("failed to migrate schema to version 6 (api_events): %w", err)
227-
}
228-
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_api_events_lookup ON api_events(organization_id, processed_timestamp);`); err != nil {
229-
return fmt.Errorf("failed to create api_events index: %w", err)
230-
}
231-
232-
if _, err := s.db.Exec(`CREATE TABLE IF NOT EXISTS certificate_events (
233-
id INTEGER PRIMARY KEY AUTOINCREMENT,
234-
organization_id TEXT NOT NULL,
235-
processed_timestamp TIMESTAMP NOT NULL,
236-
originated_timestamp TIMESTAMP NOT NULL,
237-
event_data TEXT NOT NULL
238-
);`); err != nil {
239-
return fmt.Errorf("failed to migrate schema to version 6 (certificate_events): %w", err)
240-
}
241-
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_cert_events_lookup ON certificate_events(organization_id, processed_timestamp);`); err != nil {
242-
return fmt.Errorf("failed to create certificate_events index: %w", err)
243-
}
244-
245-
if _, err := s.db.Exec(`CREATE TABLE IF NOT EXISTS llm_template_events (
246-
id INTEGER PRIMARY KEY AUTOINCREMENT,
247-
organization_id TEXT NOT NULL,
248-
processed_timestamp TIMESTAMP NOT NULL,
249-
originated_timestamp TIMESTAMP NOT NULL,
250-
event_data TEXT NOT NULL
251-
);`); err != nil {
252-
return fmt.Errorf("failed to migrate schema to version 6 (llm_template_events): %w", err)
253-
}
254-
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_llm_events_lookup ON llm_template_events(organization_id, processed_timestamp);`); err != nil {
255-
return fmt.Errorf("failed to create llm_template_events index: %w", err)
256-
}
257-
258-
if _, err := s.db.Exec("PRAGMA user_version = 6"); err != nil {
259-
return fmt.Errorf("failed to set schema version to 6: %w", err)
260-
}
261-
s.logger.Info("Schema migrated to version 6 (event tables)")
262-
version = 6
263-
}
264-
265-
if version == 6 {
266-
// Add topic_states table
267-
if _, err := s.db.Exec(`CREATE TABLE IF NOT EXISTS topic_states (
268-
topic_name TEXT PRIMARY KEY,
269-
version_id TEXT NOT NULL DEFAULT '',
270-
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
271-
);`); err != nil {
272-
return fmt.Errorf("failed to migrate schema to version 7 (topic_states): %w", err)
273-
}
274-
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_topic_states_updated ON topic_states(updated_at);`); err != nil {
275-
return fmt.Errorf("failed to create topic_states index: %w", err)
276-
}
277-
if _, err := s.db.Exec("PRAGMA user_version = 7"); err != nil {
278-
return fmt.Errorf("failed to set schema version to 7: %w", err)
279-
}
280-
s.logger.Info("Schema migrated to version 7 (topic_states table)")
281-
version = 7
282-
}
283-
284-
if version == 7 {
285-
// Migrate topic_states to include organization as part of primary key
286-
s.logger.Info("Migrating topic_states table to include organization (version 8)")
287-
288-
// Step 1: Rename old table
289-
if _, err := s.db.Exec(`ALTER TABLE topic_states RENAME TO topic_states_old;`); err != nil {
290-
return fmt.Errorf("failed to rename topic_states table: %w", err)
291-
}
292-
293-
// Step 2: Create new table with organization
294-
if _, err := s.db.Exec(`CREATE TABLE topic_states (
295-
organization TEXT NOT NULL,
296-
topic_name TEXT NOT NULL,
297-
version_id TEXT NOT NULL DEFAULT '',
298-
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
299-
PRIMARY KEY (organization, topic_name)
300-
);`); err != nil {
301-
return fmt.Errorf("failed to create new topic_states table: %w", err)
302-
}
303-
304-
// Step 3: Migrate data (set organization to empty string for existing rows)
305-
if _, err := s.db.Exec(`INSERT INTO topic_states (organization, topic_name, version_id, updated_at)
306-
SELECT '', topic_name, version_id, updated_at FROM topic_states_old;`); err != nil {
307-
return fmt.Errorf("failed to migrate topic_states data: %w", err)
308-
}
309-
310-
// Step 4: Drop old table
311-
if _, err := s.db.Exec(`DROP TABLE topic_states_old;`); err != nil {
312-
return fmt.Errorf("failed to drop old topic_states table: %w", err)
313-
}
314-
315-
// Step 5: Recreate index
316-
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_topic_states_updated ON topic_states(updated_at);`); err != nil {
317-
return fmt.Errorf("failed to create topic_states index: %w", err)
318-
}
319-
320-
if _, err := s.db.Exec("PRAGMA user_version = 8"); err != nil {
321-
return fmt.Errorf("failed to set schema version to 8: %w", err)
322-
}
323-
s.logger.Info("Schema migrated to version 8 (topic_states with organization)")
324-
version = 8
325-
}
326-
327-
if version == 8 {
328-
// Migrate to organization-centric Event Hub architecture
329-
s.logger.Info("Migrating to organization-centric Event Hub (version 9)")
218+
// Add EventHub tables for multi-replica synchronization
219+
s.logger.Info("Migrating to EventHub schema (version 6)")
330220

331-
// Step 1: Create new organization_states table
221+
// Create organization_states table
332222
if _, err := s.db.Exec(`CREATE TABLE organization_states (
333223
organization TEXT PRIMARY KEY,
334224
version_id TEXT NOT NULL DEFAULT '',
@@ -341,15 +231,7 @@ func (s *SQLiteStorage) initSchema() error {
341231
return fmt.Errorf("failed to create organization_states index: %w", err)
342232
}
343233

344-
// Step 2: Migrate state data (consolidate per organization)
345-
if _, err := s.db.Exec(`INSERT INTO organization_states (organization, version_id, updated_at)
346-
SELECT organization, MAX(version_id), MAX(updated_at)
347-
FROM topic_states
348-
GROUP BY organization;`); err != nil {
349-
return fmt.Errorf("failed to migrate state data: %w", err)
350-
}
351-
352-
// Step 3: Create unified events table
234+
// Create unified events table
353235
if _, err := s.db.Exec(`CREATE TABLE events (
354236
organization_id TEXT NOT NULL,
355237
processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -372,17 +254,12 @@ func (s *SQLiteStorage) initSchema() error {
372254
return fmt.Errorf("failed to create events type index: %w", err)
373255
}
374256

375-
// Step 4: Drop old topic_states table (data already migrated)
376-
if _, err := s.db.Exec(`DROP TABLE topic_states;`); err != nil {
377-
return fmt.Errorf("failed to drop topic_states table: %w", err)
378-
}
379-
380-
if _, err := s.db.Exec("PRAGMA user_version = 9"); err != nil {
381-
return fmt.Errorf("failed to set schema version to 9: %w", err)
257+
if _, err := s.db.Exec("PRAGMA user_version = 6"); err != nil {
258+
return fmt.Errorf("failed to set schema version to 6: %w", err)
382259
}
383260

384-
s.logger.Info("Schema migrated to version 9 (organization-centric Event Hub)")
385-
version = 9
261+
s.logger.Info("Schema migrated to version 6 (EventHub tables)")
262+
version = 6
386263
}
387264

388265
s.logger.Info("Database schema up to date", zap.Int("version", version))

gateway/gateway-controller/tests/integration/schema_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestSchemaInitialization(t *testing.T) {
9595
var version int
9696
err := rawDB.QueryRow("PRAGMA user_version").Scan(&version)
9797
assert.NoError(t, err)
98-
assert.Equal(t, 9, version, "Schema version should be 5")
98+
assert.Equal(t, 6, version, "Schema version should be 6")
9999
})
100100

101101
// Verify deployments table exists

0 commit comments

Comments
 (0)