Skip to content

Commit c9f1551

Browse files
committed
Resolve commented issues
1 parent 278c8a9 commit c9f1551

File tree

9 files changed

+44
-416
lines changed

9 files changed

+44
-416
lines changed

gateway/gateway-controller/cmd/controller/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ func main() {
120120
if err := eventHub.Initialize(ctx); err != nil {
121121
log.Fatal("Failed to initialize EventHub", zap.Error(err))
122122
}
123-
eventHub.RegisterOrganization("default")
123+
if err := eventHub.RegisterOrganization("default"); err != nil {
124+
log.Fatal("Failed to register default organization", zap.Error(err))
125+
}
124126
log.Info("EventHub initialized successfully")
125127
} else {
126128
log.Fatal("EventHub requires persistent storage. Multi-replica mode will not function correctly in memory-only mode.")

gateway/gateway-controller/pkg/eventhub/eventhub.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package eventhub
33
import (
44
"context"
55
"database/sql"
6+
"sync"
67
"time"
78

89
"go.uber.org/zap"
@@ -15,6 +16,7 @@ type eventHub struct {
1516
logger *zap.Logger
1617

1718
initialized bool
19+
mu sync.RWMutex
1820
}
1921

2022
// New creates a new EventHub instance with SQLite backend (default)
@@ -43,6 +45,9 @@ func NewWithBackend(backend EventhubImpl, logger *zap.Logger) EventHub {
4345

4446
// Initialize sets up the EventHub and starts background workers
4547
func (eh *eventHub) Initialize(ctx context.Context) error {
48+
eh.mu.Lock()
49+
defer eh.mu.Unlock()
50+
4651
if eh.initialized {
4752
return nil
4853
}
@@ -82,6 +87,9 @@ func (eh *eventHub) CleanUpEvents(ctx context.Context, timeFrom, timeEnd time.Ti
8287

8388
// Close gracefully shuts down the EventHub
8489
func (eh *eventHub) Close() error {
90+
eh.mu.Lock()
91+
defer eh.mu.Unlock()
92+
8593
if !eh.initialized {
8694
return nil
8795
}

gateway/gateway-controller/pkg/eventhub/sqlite_backend.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type SQLiteBackend struct {
2626
wg sync.WaitGroup
2727

2828
initialized bool
29-
mu sync.RWMutex
29+
3030
}
3131

3232
// NewSQLiteBackend creates a new SQLite-based backend
@@ -44,8 +44,6 @@ func NewSQLiteBackend(db *sql.DB, logger *zap.Logger, config *SQLiteBackendConfi
4444

4545
// Initialize sets up the SQLite backend and starts background workers
4646
func (b *SQLiteBackend) Initialize(ctx context.Context) error {
47-
b.mu.Lock()
48-
defer b.mu.Unlock()
4947

5048
if b.initialized {
5149
return nil
@@ -216,8 +214,6 @@ func (b *SQLiteBackend) CleanupRange(ctx context.Context, from, to time.Time) er
216214

217215
// Close gracefully shuts down the SQLite backend
218216
func (b *SQLiteBackend) Close() error {
219-
b.mu.Lock()
220-
defer b.mu.Unlock()
221217

222218
if !b.initialized {
223219
return nil
@@ -305,8 +301,16 @@ func (b *SQLiteBackend) pollAllOrganizations() {
305301
}
306302

307303
if len(events) > 0 {
308-
b.deliverEvents(org, events)
304+
if len(events) > 0 {
305+
if b.deliverEvents(org, events) != nil{
306+
org.updatePollState(state.VersionID, time.Now())
307+
}
308+
// If delivery failed (channel full), don't update timestamp
309+
// so events will be retried on next poll
310+
} else {
311+
org.updatePollState(state.VersionID, time.Now())
309312
}
313+
}
310314

311315
org.updatePollState(state.VersionID, time.Now())
312316
}
@@ -339,6 +343,7 @@ func (b *SQLiteBackend) getAllStates(ctx context.Context) ([]OrganizationState,
339343

340344
// getEventsSince retrieves events for an organization after a given timestamp
341345
func (b *SQLiteBackend) getEventsSince(ctx context.Context, orgID string, since time.Time) ([]Event, error) {
346+
// TODO: (VirajSalaka) Implement pagination if large number of events
342347
query := `
343348
SELECT processed_timestamp, originated_timestamp, event_type,
344349
action, entity_id, correlation_id, event_data
@@ -370,17 +375,18 @@ func (b *SQLiteBackend) getEventsSince(ctx context.Context, orgID string, since
370375
}
371376

372377
// deliverEvents sends events to all subscribers of an organization
373-
func (b *SQLiteBackend) deliverEvents(org *organization, events []Event) {
378+
func (b *SQLiteBackend) deliverEvents(org *organization, events []Event) error {
374379
subscribers := org.getSubscribers()
375380

376381
if len(subscribers) == 0 {
377382
b.logger.Debug("No subscribers for organization",
378383
zap.String("organization", string(org.id)),
379384
zap.Int("events", len(events)),
380385
)
381-
return
386+
return nil
382387
}
383388

389+
// TODO: (VirajSalaka) One subscriber is considered here. Handle multiple subscribers properly.
384390
for _, ch := range subscribers {
385391
select {
386392
case ch <- events:
@@ -389,12 +395,14 @@ func (b *SQLiteBackend) deliverEvents(org *organization, events []Event) {
389395
zap.Int("events", len(events)),
390396
)
391397
default:
392-
b.logger.Warn("Subscriber channel full, dropping events",
398+
b.logger.Error("Subscriber channel full, dropping events",
393399
zap.String("organization", string(org.id)),
394400
zap.Int("events", len(events)),
395401
)
402+
return fmt.Errorf("subscriber channel full")
396403
}
397404
}
405+
return nil
398406
}
399407

400408
// cleanupLoop runs periodic cleanup of old events

0 commit comments

Comments
 (0)