Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
510b16f
Introduce eventhub for api-platform
VirajSalaka Dec 23, 2025
5ecb68e
Add organizationID to the topic_states table
VirajSalaka Dec 23, 2025
2595d6b
Organization ID based events
VirajSalaka Dec 23, 2025
34524cb
Add Event Listener: Not tested
VirajSalaka Dec 28, 2025
7e4b8fd
Make Event Listener Generic: Not tested
VirajSalaka Dec 28, 2025
bae76cf
Example Usage for event hub
VirajSalaka Dec 31, 2025
9f22d94
Fix build failure
VirajSalaka Dec 31, 2025
7672acc
Generalize Eventhub Implementation
VirajSalaka Dec 31, 2025
52ec10f
Fix API Deployment via eventhub
VirajSalaka Jan 4, 2026
f498c26
Add todo comments
VirajSalaka Jan 4, 2026
90fb3fb
Update Inmemory Database via Eventhub all the time
VirajSalaka Jan 4, 2026
0e5ed4f
Refactor - rename
VirajSalaka Jan 4, 2026
247e2f7
Start Event Listener
VirajSalaka Jan 5, 2026
e197da5
Refactor - Organization ID
VirajSalaka Jan 5, 2026
ab7ef78
Include correlation ID for tracing event processing
VirajSalaka Jan 5, 2026
e615dd6
Move API Update related logic to api_deployment.go and reuse methods …
VirajSalaka Jan 6, 2026
3ae57d6
Remove todo
VirajSalaka Jan 6, 2026
c58af8c
Merge branch 'refactor-update-deployment' into sync-2-temp
VirajSalaka Jan 7, 2026
95bddd7
Fix test failures
VirajSalaka Jan 7, 2026
8fa1f34
Refactor
VirajSalaka Jan 7, 2026
278c8a9
Fix Database Schema migration related changes
VirajSalaka Jan 7, 2026
c9f1551
Resolve commented issues
VirajSalaka Jan 7, 2026
a4c6a1c
Resolve commented issues
VirajSalaka Jan 7, 2026
1f75156
Resolve review comments
VirajSalaka Jan 9, 2026
48f0f0f
Improve DB connections to optimize querying
VirajSalaka Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions gateway/gateway-controller/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"flag"
"fmt"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds"

"github.com/gin-gonic/gin"
"github.com/wso2/api-platform/common/authenticators"
commonmodels "github.com/wso2/api-platform/common/models"
Expand All @@ -20,6 +21,8 @@ import (
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/config"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventhub"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventlistener"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/logger"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/models"
"github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds"
Expand Down Expand Up @@ -107,6 +110,26 @@ func main() {
// Initialize in-memory API key store for xDS
apiKeyStore := storage.NewAPIKeyStore(log)

// Initialize EventHub if multi-replica mode is enabled
var eventHub eventhub.EventHub
if cfg.GatewayController.Server.EnableReplicaSync {
if cfg.IsPersistentMode() && db != nil {
log.Info("Initializing EventHub for multi-replica mode")
eventHub = eventhub.New(db.GetDB(), log, eventhub.DefaultConfig())
ctx := context.Background()
if err := eventHub.Initialize(ctx); err != nil {
log.Fatal("Failed to initialize EventHub", zap.Error(err))
}
if err := eventHub.RegisterOrganization("default"); err != nil {
log.Error("Failed to register default organization", zap.Error(err))
} else {
log.Info("EventHub initialized successfully")
}
} else {
log.Fatal("EventHub requires persistent storage. Multi-replica mode will not function correctly in memory-only mode.")
}
}

// Load configurations from database on startup (if persistent mode)
if cfg.IsPersistentMode() && db != nil {
log.Info("Loading configurations from database")
Expand Down Expand Up @@ -246,6 +269,26 @@ func main() {
log.Info("Policy xDS server is disabled")
}

// Initialize and start EventListener if EventHub is available
var evtListener *eventlistener.EventListener
if eventHub != nil {
log.Info("Initializing EventListener")
eventSource := eventlistener.NewEventHubAdapter(eventHub, log)
evtListener = eventlistener.NewEventListener(
eventSource,
configStore,
db,
snapshotManager,
policyManager, // Can be nil if policy server is disabled
&cfg.GatewayController.Router,
log,
)
if err := evtListener.Start(context.Background()); err != nil {
log.Fatal("Failed to start EventListener", zap.Error(err))
}
Comment on lines +286 to +288
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Search for EventListener.Start method implementation
ast-grep --pattern $'func ($_ *EventListener) Start($_ context.Context) error {
  $$$
}'

Repository: wso2/api-platform

Length of output: 1940


🏁 Script executed:

# Find the Subscribe method implementation
ast-grep --pattern $'func ($_ *$_) Subscribe($_ context.Context, $_ string, $_ chan $_) error {
  $$$
}'

Repository: wso2/api-platform

Length of output: 43


🏁 Script executed:

# Search for Subscribe method definitions
rg "func.*Subscribe.*context\.Context" -A 15

Repository: wso2/api-platform

Length of output: 3289


🏁 Script executed:

# Get the full Subscribe method from EventHubAdapter
sed -n '/^func (a \*EventHubAdapter) Subscribe(ctx context.Context, organizationID string, eventChan chan<- \[\]Event) error/,/^func/p' gateway/gateway-controller/pkg/eventlistener/eventhub_adapter.go | head -100

Repository: wso2/api-platform

Length of output: 1567


🏁 Script executed:

# Search for EventHub interface definition and Subscribe/RegisterOrganization methods
rg "type.*EventHub.*interface|func.*Subscribe\(.*string.*chan|func.*RegisterOrganization" -A 5 gateway/gateway-controller/pkg/eventlistener/

Repository: wso2/api-platform

Length of output: 1376


🏁 Script executed:

# Search for EventHub interface
rg "type EventHub interface" -A 20 gateway/gateway-controller/

Repository: wso2/api-platform

Length of output: 2106


🏁 Script executed:

# Search for RegisterOrganization and Subscribe implementations in eventhub package
rg "func.*RegisterOrganization|func.*Subscribe\(organizationID" gateway/gateway-controller/pkg/eventhub/ -A 15

Repository: wso2/api-platform

Length of output: 5382


🏁 Script executed:

# Search for Subscribe implementation in backend
rg "func \(b \*SQLiteBackend\) Subscribe" gateway/gateway-controller/pkg/eventhub/ -A 20

Repository: wso2/api-platform

Length of output: 1887


EventListener.Start blocks on database operations and should use a timeout or spawn initialization in a goroutine.

The Start method calls eventSource.Subscribe(), which in turn calls eventHub.RegisterOrganization(). This method performs a synchronous database operation (db.ExecContext()) that can block indefinitely on the main goroutine until database I/O completes. Although goroutines are spawned afterward, the initial blocking on RegisterOrganization prevents the main goroutine from proceeding to start the HTTP server and other components. Either wrap the initialization in a goroutine or provide a timeout context to prevent indefinite blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the startup phase. So I think it is okay as it delays API processing until the event listener properly configured.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VirajSalaka, that makes sense. Blocking during the startup phase to ensure all critical components are properly initialized before accepting traffic is a reasonable approach, especially with the fail-fast pattern (using log.Fatal() on failure). This ensures the application doesn't start in a partially initialized state where it could accept requests before the EventListener is ready to process events.

The synchronous initialization here acts as a gate to prevent race conditions, which aligns well with the multi-replica synchronization goals of this PR.


🧠 Learnings used
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 642
File: gateway/gateway-controller/pkg/utils/api_deployment.go:570-589
Timestamp: 2026-01-07T17:41:01.083Z
Learning: In gateway/gateway-controller/pkg/utils/api_deployment.go, when enableReplicaSync is true (multi-replica mode), the in-memory store (s.store) is intentionally not updated immediately during DeployAPIConfiguration. Instead, all replicas (including the originating replica) update their in-memory stores through the event-driven path via the EventListener to ensure consistent state synchronization across the cluster. This means immediate reads may not find the config until the EventListener processes the event, which is expected behavior.

log.Info("EventListener started successfully")
}
Comment on lines +272 to +290
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

EventListener.Start blocks the main goroutine on database operations.

evtListener.Start() is called synchronously on Line 286, which can block indefinitely if the underlying EventHub performs database operations (such as RegisterOrganization). This prevents the HTTP server and other components from starting until initialization completes, potentially causing startup hangs if the database is slow or locked.

🔧 Proposed fix: Add timeout or use goroutine

Option 1: Add timeout context

-	if err := evtListener.Start(context.Background()); err != nil {
-		log.Fatal("Failed to start EventListener", zap.Error(err))
-	}
+	startCtx, startCancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer startCancel()
+	if err := evtListener.Start(startCtx); err != nil {
+		log.Fatal("Failed to start EventListener", zap.Error(err))
+	}

Option 2: Initialize in goroutine (if non-blocking startup is acceptable)

+	go func() {
+		if err := evtListener.Start(context.Background()); err != nil {
+			log.Error("Failed to start EventListener", zap.Error(err))
+		}
+	}()
-	if err := evtListener.Start(context.Background()); err != nil {
-		log.Fatal("Failed to start EventListener", zap.Error(err))
-	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Initialize and start EventListener if EventHub is available
var evtListener *eventlistener.EventListener
if eventHub != nil {
log.Info("Initializing EventListener")
eventSource := eventlistener.NewEventHubAdapter(eventHub, log)
evtListener = eventlistener.NewEventListener(
eventSource,
configStore,
db,
snapshotManager,
policyManager, // Can be nil if policy server is disabled
&cfg.GatewayController.Router,
log,
)
if err := evtListener.Start(context.Background()); err != nil {
log.Fatal("Failed to start EventListener", zap.Error(err))
}
log.Info("EventListener started successfully")
}
// Initialize and start EventListener if EventHub is available
var evtListener *eventlistener.EventListener
if eventHub != nil {
log.Info("Initializing EventListener")
eventSource := eventlistener.NewEventHubAdapter(eventHub, log)
evtListener = eventlistener.NewEventListener(
eventSource,
configStore,
db,
snapshotManager,
policyManager, // Can be nil if policy server is disabled
&cfg.GatewayController.Router,
log,
)
startCtx, startCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer startCancel()
if err := evtListener.Start(startCtx); err != nil {
log.Fatal("Failed to start EventListener", zap.Error(err))
}
log.Info("EventListener started successfully")
}
Suggested change
// Initialize and start EventListener if EventHub is available
var evtListener *eventlistener.EventListener
if eventHub != nil {
log.Info("Initializing EventListener")
eventSource := eventlistener.NewEventHubAdapter(eventHub, log)
evtListener = eventlistener.NewEventListener(
eventSource,
configStore,
db,
snapshotManager,
policyManager, // Can be nil if policy server is disabled
&cfg.GatewayController.Router,
log,
)
if err := evtListener.Start(context.Background()); err != nil {
log.Fatal("Failed to start EventListener", zap.Error(err))
}
log.Info("EventListener started successfully")
}
// Initialize and start EventListener if EventHub is available
var evtListener *eventlistener.EventListener
if eventHub != nil {
log.Info("Initializing EventListener")
eventSource := eventlistener.NewEventHubAdapter(eventHub, log)
evtListener = eventlistener.NewEventListener(
eventSource,
configStore,
db,
snapshotManager,
policyManager, // Can be nil if policy server is disabled
&cfg.GatewayController.Router,
log,
)
go func() {
if err := evtListener.Start(context.Background()); err != nil {
log.Error("Failed to start EventListener", zap.Error(err))
}
}()
log.Info("EventListener started successfully")
}
🤖 Prompt for AI Agents
In @gateway/gateway-controller/cmd/controller/main.go around lines 272 - 290,
EventListener.Start is called synchronously and can block startup; change the
initialization to avoid blocking by either (A) creating a cancellable timeout
context and passing that to evtListener.Start to bound startup DB ops, or (B)
start evtListener in a background goroutine so it does not block HTTP/server
startup (ensure Start errors are logged and propagated via a channel or health
check). Locate the code that constructs evtListener
(eventlistener.NewEventListener) and the call
evtListener.Start(context.Background()), replace context.Background() with a
context with timeout when using option A, or call Start(...) inside a goroutine
and signal failures through a dedicated error channel/health probe when using
option B.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to stop rest api server starting until the event listener is started properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VirajSalaka, understood! The current implementation already ensures that the REST API server won't start until the EventListener is successfully initialized. The evtListener.Start() call on line 286 is synchronous and will call log.Fatal() if it fails, which prevents any subsequent initialization including the REST API server.

The blocking behavior you want is already in place:

if err := evtListener.Start(context.Background()); err != nil {
    log.Fatal("Failed to start EventListener", zap.Error(err))  // ← Stops everything
}
log.Info("EventListener started successfully")  // ← Only reached on success

The REST API server setup happens much later in main(), so it will only start after EventListener has successfully completed its initialization.

Optional improvement: To prevent indefinite hangs during startup (e.g., if database operations are slow), you could add a timeout while maintaining the blocking behavior:

startCtx, startCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer startCancel()
if err := evtListener.Start(startCtx); err != nil {
    log.Fatal("Failed to start EventListener", zap.Error(err))
}

This keeps your desired startup ordering while adding safety against deadlocks.


✏️ Learnings added
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 642
File: gateway/gateway-controller/cmd/controller/main.go:272-290
Timestamp: 2026-01-09T08:31:32.669Z
Learning: In gateway/gateway-controller/cmd/controller/main.go, the EventListener.Start() call must be synchronous (blocking) to ensure the EventListener is fully initialized before the REST API server starts. This blocking behavior is intentional to maintain proper startup ordering in multi-replica mode.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: VirajSalaka
Repo: wso2/api-platform PR: 642
File: gateway/gateway-controller/pkg/utils/api_deployment.go:570-589
Timestamp: 2026-01-07T17:41:01.083Z
Learning: In gateway/gateway-controller/pkg/utils/api_deployment.go, when enableReplicaSync is true (multi-replica mode), the in-memory store (s.store) is intentionally not updated immediately during DeployAPIConfiguration. Instead, all replicas (including the originating replica) update their in-memory stores through the event-driven path via the EventListener to ensure consistent state synchronization across the cluster. This means immediate reads may not find the config until the EventListener processes the event, which is expected behavior.


// Load policy definitions from files (must be done before creating validator)
policyLoader := utils.NewPolicyLoader(log)
policyDir := cfg.GatewayController.Policies.DefinitionsPath
Expand All @@ -272,7 +315,7 @@ func main() {
validator.SetPolicyValidator(policyValidator)

// Initialize and start control plane client with dependencies for API creation
cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, validator, &cfg.GatewayController.Router)
cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, policyManager, validator, &cfg.GatewayController.Router)
if err := cpClient.Start(); err != nil {
log.Error("Failed to start control plane client", zap.Error(err))
// Don't fail startup - gateway can run in degraded mode without control plane
Expand Down Expand Up @@ -301,7 +344,7 @@ func main() {

// Initialize API server with the configured validator and API key manager
apiServer := handlers.NewAPIServer(configStore, db, snapshotManager, policyManager, log, cpClient,
policyDefinitions, templateDefinitions, validator, &cfg.GatewayController.Router, apiKeyXDSManager)
policyDefinitions, templateDefinitions, validator, &cfg.GatewayController.Router, apiKeyXDSManager, eventHub, cfg.GatewayController.Server.EnableReplicaSync)

// Register API routes (includes certificate management endpoints from OpenAPI spec)
api.RegisterHandlers(router, apiServer)
Expand Down Expand Up @@ -347,6 +390,11 @@ func main() {
policyXDSServer.Stop()
}

// Stop EventListener if it was started
if evtListener != nil {
evtListener.Stop()
}

log.Info("Gateway-Controller stopped")
}

Expand Down
Loading