Skip to content

Commit 5ab34ba

Browse files
committed
Resolve commented issues
1 parent c9f1551 commit 5ab34ba

File tree

4 files changed

+15
-11
lines changed

4 files changed

+15
-11
lines changed

gateway/gateway-controller/cmd/controller/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ func main() {
121121
log.Fatal("Failed to initialize EventHub", zap.Error(err))
122122
}
123123
if err := eventHub.RegisterOrganization("default"); err != nil {
124-
log.Fatal("Failed to register default organization", zap.Error(err))
124+
log.Error("Failed to register default organization", zap.Error(err))
125+
} else {
126+
log.Info("EventHub initialized successfully")
125127
}
126-
log.Info("EventHub initialized successfully")
127128
} else {
128129
log.Fatal("EventHub requires persistent storage. Multi-replica mode will not function correctly in memory-only mode.")
129130
}

gateway/gateway-controller/pkg/eventhub/sqlite_backend.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,7 @@ func (b *SQLiteBackend) pollAllOrganizations() {
301301
}
302302

303303
if len(events) > 0 {
304-
if len(events) > 0 {
305-
if b.deliverEvents(org, events) != nil{
304+
if b.deliverEvents(org, events) == nil{
306305
org.updatePollState(state.VersionID, time.Now())
307306
}
308307
// If delivery failed (channel full), don't update timestamp
@@ -311,9 +310,6 @@ func (b *SQLiteBackend) pollAllOrganizations() {
311310
org.updatePollState(state.VersionID, time.Now())
312311
}
313312
}
314-
315-
org.updatePollState(state.VersionID, time.Now())
316-
}
317313
}
318314

319315
// getAllStates retrieves all organization states

gateway/gateway-controller/pkg/storage/gateway-controller-db.sql

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,17 @@ CREATE INDEX IF NOT EXISTS idx_organization_states_updated ON organization_state
165165
CREATE TABLE IF NOT EXISTS events (
166166
organization_id TEXT NOT NULL,
167167
processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
168+
processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
168169
originated_timestamp TIMESTAMP NOT NULL,
169170
event_type TEXT NOT NULL,
170171
action TEXT NOT NULL CHECK(action IN ('CREATE', 'UPDATE', 'DELETE')),
171172
entity_id TEXT NOT NULL,
172-
correlation_id TEXT NOT NULL DEFAULT '',
173+
correlation_id TEXT NOT NULL,
173174
event_data TEXT NOT NULL,
174-
PRIMARY KEY (organization_id, processed_timestamp)
175+
PRIMARY KEY (correlation_id)
175176
);
176177

178+
CREATE INDEX IF NOT EXISTS idx_events_org_time ON events(organization_id, processed_timestamp);
179+
177180
-- Set schema version to 6
178181
PRAGMA user_version = 6;

gateway/gateway-controller/pkg/storage/sqlite.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,17 @@ func (s *SQLiteStorage) initSchema() error {
239239
event_type TEXT NOT NULL,
240240
action TEXT NOT NULL CHECK(action IN ('CREATE', 'UPDATE', 'DELETE')),
241241
entity_id TEXT NOT NULL,
242-
correlation_id TEXT NOT NULL DEFAULT '',
242+
correlation_id TEXT NOT NULL,
243243
event_data TEXT NOT NULL,
244-
PRIMARY KEY (organization_id, processed_timestamp)
244+
PRIMARY KEY (correlation_id)
245245
);`); err != nil {
246246
return fmt.Errorf("failed to create events table: %w", err)
247247
}
248248

249+
if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_events_org_time ON events(organization_id, processed_timestamp);`); err != nil {
250+
return fmt.Errorf("failed to create events organization-time index: %w", err)
251+
}
252+
249253
if _, err := s.db.Exec("PRAGMA user_version = 6"); err != nil {
250254
return fmt.Errorf("failed to set schema version to 6: %w", err)
251255
}

0 commit comments

Comments
 (0)