diff --git a/gateway/gateway-controller/cmd/controller/main.go b/gateway/gateway-controller/cmd/controller/main.go index 84f9ca684..a444f20b0 100644 --- a/gateway/gateway-controller/cmd/controller/main.go +++ b/gateway/gateway-controller/cmd/controller/main.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" "net/http" "os" "os/signal" @@ -12,6 +11,8 @@ import ( "syscall" "time" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" + "github.com/gin-gonic/gin" "github.com/wso2/api-platform/common/authenticators" commonmodels "github.com/wso2/api-platform/common/models" @@ -20,6 +21,8 @@ import ( "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventhub" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventlistener" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/logger" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" @@ -107,6 +110,26 @@ func main() { // Initialize in-memory API key store for xDS apiKeyStore := storage.NewAPIKeyStore(log) + // Initialize EventHub if multi-replica mode is enabled + var eventHub eventhub.EventHub + if cfg.GatewayController.Server.EnableReplicaSync { + if cfg.IsPersistentMode() && db != nil { + log.Info("Initializing EventHub for multi-replica mode") + eventHub = eventhub.New(db.GetDB(), log, eventhub.DefaultConfig()) + ctx := context.Background() + if err := eventHub.Initialize(ctx); err != nil { + log.Fatal("Failed to initialize EventHub", zap.Error(err)) + } + if err := eventHub.RegisterOrganization("default"); err != nil { + log.Error("Failed to register default organization", zap.Error(err)) + } else { + log.Info("EventHub initialized successfully") + } + } else { + log.Fatal("EventHub requires persistent storage. Multi-replica mode will not function correctly in memory-only mode.") + } + } + // Load configurations from database on startup (if persistent mode) if cfg.IsPersistentMode() && db != nil { log.Info("Loading configurations from database") @@ -246,6 +269,26 @@ func main() { log.Info("Policy xDS server is disabled") } + // Initialize and start EventListener if EventHub is available + var evtListener *eventlistener.EventListener + if eventHub != nil { + log.Info("Initializing EventListener") + eventSource := eventlistener.NewEventHubAdapter(eventHub, log) + evtListener = eventlistener.NewEventListener( + eventSource, + configStore, + db, + snapshotManager, + policyManager, // Can be nil if policy server is disabled + &cfg.GatewayController.Router, + log, + ) + if err := evtListener.Start(context.Background()); err != nil { + log.Fatal("Failed to start EventListener", zap.Error(err)) + } + log.Info("EventListener started successfully") + } + // Load policy definitions from files (must be done before creating validator) policyLoader := utils.NewPolicyLoader(log) policyDir := cfg.GatewayController.Policies.DefinitionsPath @@ -272,7 +315,7 @@ func main() { validator.SetPolicyValidator(policyValidator) // Initialize and start control plane client with dependencies for API creation - cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, validator, &cfg.GatewayController.Router) + cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, policyManager, validator, &cfg.GatewayController.Router) if err := cpClient.Start(); err != nil { log.Error("Failed to start control plane client", zap.Error(err)) // Don't fail startup - gateway can run in degraded mode without control plane @@ -301,7 +344,7 @@ func main() { // Initialize API server with the configured validator and API key manager apiServer := handlers.NewAPIServer(configStore, db, snapshotManager, policyManager, log, cpClient, - policyDefinitions, templateDefinitions, validator, &cfg.GatewayController.Router, apiKeyXDSManager) + policyDefinitions, templateDefinitions, validator, &cfg.GatewayController.Router, apiKeyXDSManager, eventHub, cfg.GatewayController.Server.EnableReplicaSync) // Register API routes (includes certificate management endpoints from OpenAPI spec) api.RegisterHandlers(router, apiServer) @@ -347,6 +390,11 @@ func main() { policyXDSServer.Stop() } + // Stop EventListener if it was started + if evtListener != nil { + evtListener.Stop() + } + log.Info("Gateway-Controller stopped") } diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers.go b/gateway/gateway-controller/pkg/api/handlers/handlers.go index 0634bd4ab..bcb7f8f1f 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers.go @@ -22,9 +22,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/wso2/api-platform/common/constants" - "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" - "io" "net/http" "sort" @@ -34,17 +31,19 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/wso2/api-platform/common/constants" commonmodels "github.com/wso2/api-platform/common/models" api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventhub" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" - policyenginev1 "github.com/wso2/api-platform/sdk/gateway/policyengine/v1" "go.uber.org/zap" ) @@ -82,8 +81,10 @@ func NewAPIServer( validator config.Validator, routerConfig *config.RouterConfig, apiKeyXDSManager *apikeyxds.APIKeyStateManager, + eventHub eventhub.EventHub, + enableMultiReplicaMode bool, ) *APIServer { - deploymentService := utils.NewAPIDeploymentService(store, db, snapshotManager, validator, routerConfig) + deploymentService := utils.NewAPIDeploymentService(store, db, snapshotManager, policyManager, validator, routerConfig, eventHub, enableMultiReplicaMode) server := &APIServer{ store: store, db: db, @@ -225,30 +226,7 @@ func (s *APIServer) CreateAPI(c *gin.Context) { Id: stringPtr(result.StoredConfig.GetHandle()), CreatedAt: timePtr(result.StoredConfig.CreatedAt), }) - - // Build and add policy config derived from API configuration if policies are present - if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(result.StoredConfig) - if storedPolicy != nil { - if err := s.policyManager.AddPolicy(storedPolicy); err != nil { - log.Error("Failed to add derived policy configuration", zap.Error(err)) - } else { - log.Info("Derived policy configuration added", - zap.String("policy_id", storedPolicy.ID), - zap.Int("route_count", len(storedPolicy.Configuration.Routes))) - } - } else if result.IsUpdate { - // API was updated and no longer has policies, remove the existing policy configuration - policyID := result.StoredConfig.ID + "-policies" - if err := s.policyManager.RemovePolicy(policyID); err != nil { - // Log at debug level since policy may not exist if API never had policies - log.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) - } else { - log.Info("Derived policy configuration removed (API no longer has policies)", - zap.String("policy_id", policyID)) - } - } - } + // Policy management is now handled by the deployment service } // ListAPIs implements ServerInterface.ListAPIs @@ -487,7 +465,6 @@ func (s *APIServer) GetAPIById(c *gin.Context, id string) { func (s *APIServer) UpdateAPI(c *gin.Context, id string) { // Get correlation-aware logger from context log := middleware.GetLogger(c, s.logger) - handle := id // Read request body body, err := io.ReadAll(c.Request.Body) @@ -500,249 +477,82 @@ func (s *APIServer) UpdateAPI(c *gin.Context, id string) { return } - // Parse configuration - contentType := c.GetHeader("Content-Type") - var apiConfig api.APIConfiguration - err = s.parser.Parse(body, contentType, &apiConfig) - if err != nil { - log.Error("Failed to parse configuration", zap.Error(err)) - c.JSON(http.StatusBadRequest, api.ErrorResponse{ - Status: "error", - Message: "Failed to parse configuration", - }) - return - } + // Get correlation ID from context + correlationID := middleware.GetCorrelationID(c) - // Validate that the handle in the YAML matches the path parameter - if apiConfig.Metadata.Name != "" { - if apiConfig.Metadata.Name != handle { - log.Warn("Handle mismatch between path and YAML metadata", - zap.String("path_handle", handle), - zap.String("yaml_handle", apiConfig.Metadata.Name)) + // Update API configuration using the utility service + result, err := s.deploymentService.UpdateAPIConfiguration(utils.APIUpdateParams{ + Handle: id, + Data: body, + ContentType: c.GetHeader("Content-Type"), + CorrelationID: correlationID, + Logger: log, + }) + + if err != nil { + // Map error types to HTTP status codes + switch e := err.(type) { + case *utils.NotFoundError: + c.JSON(http.StatusNotFound, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + case *utils.HandleMismatchError: c.JSON(http.StatusBadRequest, api.ErrorResponse{ Status: "error", - Message: fmt.Sprintf("Handle mismatch: path has '%s' but YAML metadata.name has '%s'", handle, apiConfig.Metadata.Name), + Message: e.Error(), }) - return - } - } - - // Validate configuration - validationErrors := s.validator.Validate(&apiConfig) - if len(validationErrors) > 0 { - log.Warn("Configuration validation failed", - zap.String("handle", handle), - zap.Int("num_errors", len(validationErrors))) - - errors := make([]api.ValidationError, len(validationErrors)) - for i, e := range validationErrors { - errors[i] = api.ValidationError{ - Field: stringPtr(e.Field), - Message: stringPtr(e.Message), - } - } - - c.JSON(http.StatusBadRequest, api.ErrorResponse{ - Status: "error", - Message: "Configuration validation failed", - Errors: &errors, - }) - return - } - - if s.db == nil { - c.JSON(http.StatusServiceUnavailable, api.ErrorResponse{ - Status: "error", - Message: "Database storage not available", - }) - return - } - - // Check if config exists - existing, err := s.db.GetConfigByHandle(handle) - if err != nil { - log.Warn("API configuration not found", - zap.String("handle", handle)) - c.JSON(http.StatusNotFound, api.ErrorResponse{ - Status: "error", - Message: fmt.Sprintf("API configuration with handle '%s' not found", handle), - }) - return - } - - // Update stored configuration - now := time.Now() - existing.Configuration = apiConfig - existing.Status = models.StatusPending - existing.UpdatedAt = now - existing.DeployedAt = nil - existing.DeployedVersion = 0 - - if apiConfig.Kind == api.Asyncwebsub { - topicsToRegister, topicsToUnregister := s.deploymentService.GetTopicsForUpdate(*existing) - // TODO: Pre configure the dynamic forward proxy rules for event gw - // This was communication bridge will be created on the gw startup - // Can perform internal communication with websub hub without relying on the dynamic rules - // Execute topic operations with wait group and errors tracking - var wg2 sync.WaitGroup - var regErrs int32 - var deregErrs int32 - - if len(topicsToRegister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - log.Info("Starting topic registration", zap.Int("total_topics", len(list)), zap.String("api_id", existing.ID)) - //fmt.Println("Topics Registering Started") - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.deploymentService.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, log); err != nil { - log.Error("Failed to register topic with WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - atomic.AddInt32(®Errs, 1) - } else { - log.Info("Successfully registered topic with WebSubHub", - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - } - }(topic) - } - childWg.Wait() - }(topicsToRegister) - } - - if len(topicsToUnregister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - log.Info("Starting topic deregistration", zap.Int("total_topics", len(list)), zap.String("api_id", existing.ID)) - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.deploymentService.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, log); err != nil { - log.Error("Failed to deregister topic from WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - atomic.AddInt32(&deregErrs, 1) - } else { - log.Info("Successfully deregistered topic from WebSubHub", - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - } - }(topic) + case *utils.APIValidationError: + errors := make([]api.ValidationError, len(e.Errors)) + for i, ve := range e.Errors { + errors[i] = api.ValidationError{ + Field: stringPtr(ve.Field), + Message: stringPtr(ve.Message), } - childWg.Wait() - }(topicsToUnregister) - } - wg2.Wait() - - log.Info("Topic lifecycle operations completed", - zap.String("api_id", existing.ID), - zap.Int("registered", len(topicsToRegister)), - zap.Int("deregistered", len(topicsToUnregister)), - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - - // Check if topic operations failed and return error - if regErrs > 0 || deregErrs > 0 { - log.Error("Failed to register & deregister topics", zap.Error(err)) - c.JSON(http.StatusInternalServerError, api.ErrorResponse{ + } + c.JSON(http.StatusBadRequest, api.ErrorResponse{ Status: "error", - Message: "Topic lifecycle operations failed", + Message: "Configuration validation failed", + Errors: &errors, }) - return - } - } - - // Atomic dual-write: database + in-memory - // Update database first (only if persistent mode) - if s.db != nil { - if err := s.db.UpdateConfig(existing); err != nil { - log.Error("Failed to update config in database", zap.Error(err)) + case *utils.ParseError: + c.JSON(http.StatusBadRequest, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + case *utils.TopicOperationError: c.JSON(http.StatusInternalServerError, api.ErrorResponse{ Status: "error", - Message: "Failed to persist configuration update", + Message: e.Error(), }) - return - } - } - - if err := s.store.Update(existing); err != nil { - // Log conflict errors at info level, other errors at error level - if storage.IsConflictError(err) { - log.Info("API configuration handle already exists", - zap.String("id", existing.ID), - zap.String("handle", handle)) + case *utils.ConflictError: c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", - Message: err.Error(), + Message: e.Error(), }) - } else { - log.Error("Failed to update config in memory store", zap.Error(err)) + case *utils.DatabaseUnavailableError: + c.JSON(http.StatusServiceUnavailable, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + default: + log.Error("Failed to update API configuration", zap.Error(err)) c.JSON(http.StatusInternalServerError, api.ErrorResponse{ Status: "error", - Message: "Failed to update configuration in memory store", + Message: err.Error(), }) } return } - // Get correlation ID from context - correlationID := middleware.GetCorrelationID(c) - - // Update xDS snapshot asynchronously - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := s.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { - log.Error("Failed to update xDS snapshot", zap.Error(err)) - } - }() - - log.Info("API configuration updated", - zap.String("id", existing.ID), - zap.String("handle", handle)) - // Return success response (id is the handle) c.JSON(http.StatusOK, api.APIUpdateResponse{ Status: stringPtr("success"), Message: stringPtr("API configuration updated successfully"), - Id: stringPtr(existing.GetHandle()), - UpdatedAt: timePtr(existing.UpdatedAt), + Id: stringPtr(result.StoredConfig.GetHandle()), + UpdatedAt: timePtr(result.StoredConfig.UpdatedAt), }) - - // Rebuild and update derived policy configuration - if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(existing) - if storedPolicy != nil { - if err := s.policyManager.AddPolicy(storedPolicy); err != nil { - log.Error("Failed to update derived policy configuration", zap.Error(err)) - } else { - log.Info("Derived policy configuration updated", - zap.String("policy_id", storedPolicy.ID), - zap.Int("route_count", len(storedPolicy.Configuration.Routes))) - } - } else { - // API no longer has policies, remove the existing policy configuration - policyID := existing.ID + "-policies" - if err := s.policyManager.RemovePolicy(policyID); err != nil { - // Log at debug level since policy may not exist if API never had policies - log.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) - } else { - log.Info("Derived policy configuration removed (API no longer has policies)", - zap.String("policy_id", policyID)) - } - } - } + // Policy management is now handled by the deployment service } // DeleteAPI implements ServerInterface.DeleteAPI @@ -898,6 +708,7 @@ func (s *APIServer) DeleteAPI(c *gin.Context, id string) { correlationID := middleware.GetCorrelationID(c) // Update xDS snapshot asynchronously + // TODO: (VirajSalaka) Fix to work with eventhub go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -918,6 +729,7 @@ func (s *APIServer) DeleteAPI(c *gin.Context, id string) { }) // Remove derived policy configuration + // TODO: (VirajSalaka) Fix to work with eventhub if s.policyManager != nil { policyID := cfg.ID + "-policies" if err := s.policyManager.RemovePolicy(policyID); err != nil { @@ -1175,7 +987,7 @@ func (s *APIServer) CreateLLMProvider(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(stored) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(stored) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -1265,7 +1077,7 @@ func (s *APIServer) UpdateLLMProvider(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) @@ -1412,7 +1224,7 @@ func (s *APIServer) CreateLLMProxy(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(stored) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(stored) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -1502,7 +1314,7 @@ func (s *APIServer) UpdateLLMProxy(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) @@ -1593,182 +1405,6 @@ func (s *APIServer) ListPolicies(c *gin.Context) { c.JSON(http.StatusOK, resp) } -// buildStoredPolicyFromAPI constructs a StoredPolicyConfig from an API config -// Merging rules: When operation has policies, they define the order (can reorder, override, or extend API policies). -// Remaining API-level policies not mentioned in operation policies are appended at the end. -// When operation has no policies, API-level policies are used in their declared order. -// RouteKey uses the fully qualified route path (context + operation path) and must match the route name format -// used by the xDS translator for consistency. -func (s *APIServer) buildStoredPolicyFromAPI(cfg *models.StoredConfig) *models.StoredPolicyConfig { - // TODO: (renuka) duplicate buildStoredPolicyFromAPI funcs. Refactor this. - apiCfg := &cfg.Configuration - - // Collect API-level policies - apiPolicies := make(map[string]policyenginev1.PolicyInstance) // name -> policy - if cfg.GetPolicies() != nil { - for _, p := range *cfg.GetPolicies() { - apiPolicies[p.Name] = convertAPIPolicy(p) - } - } - - routes := make([]policyenginev1.PolicyChain, 0) - switch apiCfg.Kind { - case api.Asyncwebsub: - // Build routes with merged policies - apiData, err := apiCfg.Spec.AsWebhookAPIData() - if err != nil { - // Handle error appropriately (e.g., log or return) - return nil - } - for _, ch := range apiData.Channels { - var finalPolicies []policyenginev1.PolicyInstance - - if ch.Policies != nil && len(*ch.Policies) > 0 { - // Operation has policies: use operation policy order as authoritative - // This allows operations to reorder, override, or extend API-level policies - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*ch.Policies)) - addedNames := make(map[string]struct{}) - - for _, opPolicy := range *ch.Policies { - finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) - addedNames[opPolicy.Name] = struct{}{} - } - - // Add any API-level policies not mentioned in operation policies (append at end) - if apiData.Policies != nil { - for _, apiPolicy := range *apiData.Policies { - if _, exists := addedNames[apiPolicy.Name]; !exists { - finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) - } - } - } - } else { - // No operation policies: use API-level policies in their declared order - if apiData.Policies != nil { - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) - for _, p := range *apiData.Policies { - finalPolicies = append(finalPolicies, apiPolicies[p.Name]) - } - } - } - - routeKey := xds.GenerateRouteName("POST", apiData.Context, apiData.Version, ch.Path, s.routerConfig.GatewayHost) - routes = append(routes, policyenginev1.PolicyChain{ - RouteKey: routeKey, - Policies: finalPolicies, - }) - } - case api.RestApi: - // Build routes with merged policies - apiData, err := apiCfg.Spec.AsAPIConfigData() - if err != nil { - // Handle error appropriately (e.g., log or return) - return nil - } - for _, op := range apiData.Operations { - var finalPolicies []policyenginev1.PolicyInstance - - if op.Policies != nil && len(*op.Policies) > 0 { - // Operation has policies: use operation policy order as authoritative - // This allows operations to reorder, override, or extend API-level policies - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*op.Policies)) - addedNames := make(map[string]struct{}) - - for _, opPolicy := range *op.Policies { - finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) - addedNames[opPolicy.Name] = struct{}{} - } - - // Add any API-level policies not mentioned in operation policies (append at end) - if apiData.Policies != nil { - for _, apiPolicy := range *apiData.Policies { - if _, exists := addedNames[apiPolicy.Name]; !exists { - finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) - } - } - } - } else { - // No operation policies: use API-level policies in their declared order - if apiData.Policies != nil { - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) - for _, p := range *apiData.Policies { - finalPolicies = append(finalPolicies, apiPolicies[p.Name]) - } - } - } - - // Determine effective vhosts (fallback to global router defaults when not provided) - effectiveMainVHost := s.routerConfig.VHosts.Main.Default - effectiveSandboxVHost := s.routerConfig.VHosts.Sandbox.Default - if apiData.Vhosts != nil { - if strings.TrimSpace(apiData.Vhosts.Main) != "" { - effectiveMainVHost = apiData.Vhosts.Main - } - if apiData.Vhosts.Sandbox != nil && strings.TrimSpace(*apiData.Vhosts.Sandbox) != "" { - effectiveSandboxVHost = *apiData.Vhosts.Sandbox - } - } - - vhosts := []string{effectiveMainVHost} - if apiData.Upstream.Sandbox != nil && apiData.Upstream.Sandbox.Url != nil && - strings.TrimSpace(*apiData.Upstream.Sandbox.Url) != "" { - vhosts = append(vhosts, effectiveSandboxVHost) - } - - for _, vhost := range vhosts { - routes = append(routes, policyenginev1.PolicyChain{ - RouteKey: xds.GenerateRouteName(string(op.Method), apiData.Context, apiData.Version, op.Path, vhost), - Policies: finalPolicies, - }) - } - } - } - - // If there are no policies at all, return nil (skip creation) - policyCount := 0 - for _, r := range routes { - policyCount += len(r.Policies) - } - if policyCount == 0 { - return nil - } - - now := time.Now().Unix() - stored := &models.StoredPolicyConfig{ - ID: cfg.ID + "-policies", - Configuration: policyenginev1.Configuration{ - Routes: routes, - Metadata: policyenginev1.Metadata{ - CreatedAt: now, - UpdatedAt: now, - ResourceVersion: 0, - APIName: cfg.GetDisplayName(), - Version: cfg.GetVersion(), - Context: cfg.GetContext(), - }, - }, - Version: 0, - } - return stored -} - -// convertAPIPolicy converts generated api.Policy to policyenginev1.PolicyInstance -func convertAPIPolicy(p api.Policy) policyenginev1.PolicyInstance { - paramsMap := make(map[string]interface{}) - if p.Params != nil { - for k, v := range *p.Params { - paramsMap[k] = v - } - } - return policyenginev1.PolicyInstance{ - Name: p.Name, - Version: p.Version, - Enabled: true, // Default to enabled - ExecutionCondition: p.ExecutionCondition, - Parameters: paramsMap, - } -} - // CreateMCPProxy implements ServerInterface.CreateMCPProxy // (POST /mcp-proxies) func (s *APIServer) CreateMCPProxy(c *gin.Context) { @@ -1830,7 +1466,7 @@ func (s *APIServer) CreateMCPProxy(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(cfg) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(cfg) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -2007,7 +1643,7 @@ func (s *APIServer) UpdateMCPProxy(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) diff --git a/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go b/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go index 1009978a6..3b706e5ca 100644 --- a/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go @@ -26,6 +26,8 @@ import ( api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) // newTestAPIServer creates a minimal APIServer instance for testing @@ -34,11 +36,15 @@ func newTestAPIServer() *APIServer { Main: config.VHostEntry{Default: "localhost"}, Sandbox: config.VHostEntry{Default: "sandbox-*"}, } + routerConfig := &config.RouterConfig{ + GatewayHost: "localhost", + VHosts: *vhosts, + } + configStore := storage.NewConfigStore() + deploymentService := utils.NewAPIDeploymentService(configStore, nil, nil, nil, nil, routerConfig, nil, false) return &APIServer{ - routerConfig: &config.RouterConfig{ - GatewayHost: "localhost", - VHosts: *vhosts, - }, + routerConfig: routerConfig, + deploymentService: deploymentService, } } @@ -194,7 +200,7 @@ func TestPolicyOrderingDeterministic(t *testing.T) { // Call the function server := newTestAPIServer() - result := server.buildStoredPolicyFromAPI(cfg) // Verify result is not nil when policies exist + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) // Verify result is not nil when policies exist if len(tt.expectedOrder) > 0 { require.NotNil(t, result, tt.description) require.Len(t, result.Configuration.Routes, 1, "Should have one route") @@ -308,7 +314,7 @@ func TestMultipleOperationsIndependentPolicies(t *testing.T) { } server := newTestAPIServer() - result := server.buildStoredPolicyFromAPI(cfg) + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) require.NotNil(t, result) require.Len(t, result.Configuration.Routes, 5, "Should have 5 routes") @@ -435,7 +441,7 @@ func TestPolicyOrderingConsistency(t *testing.T) { var firstOrder []string server := newTestAPIServer() for i := 0; i < 100; i++ { - result := server.buildStoredPolicyFromAPI(cfg) + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) require.NotNil(t, result) require.Len(t, result.Configuration.Routes, 1) diff --git a/gateway/gateway-controller/pkg/config/config.go b/gateway/gateway-controller/pkg/config/config.go index 90fe7a1f8..766281b8d 100644 --- a/gateway/gateway-controller/pkg/config/config.go +++ b/gateway/gateway-controller/pkg/config/config.go @@ -117,9 +117,10 @@ type TracingConfig struct { // ServerConfig holds server-related configuration type ServerConfig struct { - APIPort int `koanf:"api_port"` - XDSPort int `koanf:"xds_port"` - ShutdownTimeout time.Duration `koanf:"shutdown_timeout"` + APIPort int `koanf:"api_port"` + XDSPort int `koanf:"xds_port"` + ShutdownTimeout time.Duration `koanf:"shutdown_timeout"` + EnableReplicaSync bool `koanf:"enable_replica_sync"` } // PolicyServerConfig holds policy xDS server-related configuration @@ -359,9 +360,10 @@ func defaultConfig() *Config { return &Config{ GatewayController: GatewayController{ Server: ServerConfig{ - APIPort: 9090, - XDSPort: 18000, - ShutdownTimeout: 15 * time.Second, + APIPort: 9090, + XDSPort: 18000, + ShutdownTimeout: 15 * time.Second, + EnableReplicaSync: false, }, PolicyServer: PolicyServerConfig{ Enabled: true, diff --git a/gateway/gateway-controller/pkg/controlplane/client.go b/gateway/gateway-controller/pkg/controlplane/client.go index ad82be485..3d7f7c399 100644 --- a/gateway/gateway-controller/pkg/controlplane/client.go +++ b/gateway/gateway-controller/pkg/controlplane/client.go @@ -32,6 +32,7 @@ import ( "github.com/gorilla/websocket" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" @@ -113,6 +114,7 @@ func NewClient( store *storage.ConfigStore, db storage.Storage, snapshotManager *xds.SnapshotManager, + policyManager *policyxds.PolicyManager, validator config.Validator, routerConfig *config.RouterConfig, ) *Client { @@ -126,7 +128,8 @@ func NewClient( snapshotManager: snapshotManager, parser: config.NewParser(), validator: validator, - deploymentService: utils.NewAPIDeploymentService(store, db, snapshotManager, validator, routerConfig), + // TODO: (VirajSalaka) Decide on behavior when controlplane is involved. + deploymentService: utils.NewAPIDeploymentService(store, db, snapshotManager, policyManager, validator, routerConfig, nil, false), state: &ConnectionState{ Current: Disconnected, Conn: nil, diff --git a/gateway/gateway-controller/pkg/eventhub/backend.go b/gateway/gateway-controller/pkg/eventhub/backend.go new file mode 100644 index 000000000..108d66ece --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/backend.go @@ -0,0 +1,92 @@ +package eventhub + +import ( + "context" + "time" +) + +// EventhubImpl is the interface that different message broker implementations must satisfy. +// Implementations can include SQLite (polling-based), NATS, Azure Service Bus, Kafka, etc. +type EventhubImpl interface { + // Initialize sets up the backend connection and resources. + // For SQLite: opens database, creates tables + // For NATS: connects to server, sets up streams + // For Azure Service Bus: connects to namespace, creates topics + Initialize(ctx context.Context) error + + // RegisterOrganization creates the necessary resources for tracking an organization. + // For SQLite: creates entry in organization_states table + // For NATS: creates subject/stream for the organization + // For Azure Service Bus: creates topic for the organization + RegisterOrganization(ctx context.Context, orgID string) error + + // Publish publishes an event for an organization. + // The implementation should ensure delivery semantics appropriate for the broker. + Publish(ctx context.Context, orgID string, eventType EventType, + action, entityID, correlationID string, eventData []byte) error + + // Subscribe registers a channel to receive events for an organization. + // Events are delivered as batches (slices) when available. + // The subscriber receives ALL event types and should filter if needed. + Subscribe(orgID string, eventChan chan<- []Event) error + + // Unsubscribe removes a subscription channel for an organization. + Unsubscribe(orgID string, eventChan chan<- []Event) error + + // Cleanup removes old events based on retention policy. + // For SQLite: deletes events older than specified time + // For message brokers: may be a no-op if broker handles retention + Cleanup(ctx context.Context, olderThan time.Time) error + + // CleanupRange removes events within a specific time range. + CleanupRange(ctx context.Context, from, to time.Time) error + + // Close gracefully shuts down the backend. + Close() error +} + +// BackendType represents the type of message broker backend +type BackendType string + +const ( + // BackendTypeSQLite uses SQLite with polling for event delivery + BackendTypeSQLite BackendType = "sqlite" + // BackendTypeNATS uses NATS JetStream for event delivery + BackendTypeNATS BackendType = "nats" + // BackendTypeAzureServiceBus uses Azure Service Bus for event delivery + BackendTypeAzureServiceBus BackendType = "azure-servicebus" +) + +// BackendConfig holds common configuration for all backends +type BackendConfig struct { + // Type specifies which backend implementation to use + Type BackendType + + // SQLite-specific configuration + SQLite *SQLiteBackendConfig + + // NATS-specific configuration (for future use) + // NATS *NATSBackendConfig + + // Azure Service Bus configuration (for future use) + // AzureServiceBus *AzureServiceBusConfig +} + +// SQLiteBackendConfig holds SQLite-specific configuration +type SQLiteBackendConfig struct { + // PollInterval is how often to poll for state changes + PollInterval time.Duration + // RetentionPeriod is how long to keep events + RetentionPeriod time.Duration + // CleanupInterval is how often to run automatic cleanup + CleanupInterval time.Duration +} + +// DefaultSQLiteBackendConfig returns sensible defaults for SQLite backend +func DefaultSQLiteBackendConfig() *SQLiteBackendConfig { + return &SQLiteBackendConfig{ + PollInterval: time.Second * 5, + CleanupInterval: time.Minute * 10, + RetentionPeriod: time.Hour, + } +} diff --git a/gateway/gateway-controller/pkg/eventhub/eventhub.go b/gateway/gateway-controller/pkg/eventhub/eventhub.go new file mode 100644 index 000000000..cf40eabee --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/eventhub.go @@ -0,0 +1,106 @@ +package eventhub + +import ( + "context" + "database/sql" + "sync" + "time" + + "go.uber.org/zap" +) + +// eventHub is the main implementation of EventHub interface +// It delegates to a Backend implementation for actual message broker operations +type eventHub struct { + backend EventhubImpl + logger *zap.Logger + + initialized bool + mu sync.RWMutex +} + +// New creates a new EventHub instance with SQLite backend (default) +// This maintains backward compatibility with existing code +func New(db *sql.DB, logger *zap.Logger, config Config) EventHub { + sqliteConfig := &SQLiteBackendConfig{ + PollInterval: config.PollInterval, + CleanupInterval: config.CleanupInterval, + RetentionPeriod: config.RetentionPeriod, + } + backend := NewSQLiteBackend(db, logger, sqliteConfig) + return &eventHub{ + backend: backend, + logger: logger, + } +} + +// NewWithBackend creates a new EventHub instance with a custom backend +// Use this to provide alternative message broker implementations (NATS, Azure Service Bus, etc.) +func NewWithBackend(backend EventhubImpl, logger *zap.Logger) EventHub { + return &eventHub{ + backend: backend, + logger: logger, + } +} + +// Initialize sets up the EventHub and starts background workers +func (eh *eventHub) Initialize(ctx context.Context) error { + eh.mu.Lock() + defer eh.mu.Unlock() + + if eh.initialized { + return nil + } + + eh.logger.Info("Initializing EventHub") + + if err := eh.backend.Initialize(ctx); err != nil { + return err + } + + eh.initialized = true + eh.logger.Info("EventHub initialized successfully") + return nil +} + +// RegisterOrganization registers a new organization with the EventHub +func (eh *eventHub) RegisterOrganization(organizationID string) error { + ctx := context.Background() + return eh.backend.RegisterOrganization(ctx, organizationID) +} + +// PublishEvent publishes an event for an organization +func (eh *eventHub) PublishEvent(ctx context.Context, organizationID string, + eventType EventType, action, entityID, correlationID string, eventData []byte) error { + return eh.backend.Publish(ctx, organizationID, eventType, action, entityID, correlationID, eventData) +} + +// Subscribe registers a channel to receive events for an organization +func (eh *eventHub) Subscribe(organizationID string, eventChan chan<- []Event) error { + return eh.backend.Subscribe(organizationID, eventChan) +} + +// CleanUpEvents removes events from the unified events table within the specified time range +func (eh *eventHub) CleanUpEvents(ctx context.Context, timeFrom, timeEnd time.Time) error { + return eh.backend.CleanupRange(ctx, timeFrom, timeEnd) +} + +// Close gracefully shuts down the EventHub +func (eh *eventHub) Close() error { + eh.mu.Lock() + defer eh.mu.Unlock() + + if !eh.initialized { + return nil + } + + eh.logger.Info("Shutting down EventHub") + + if err := eh.backend.Close(); err != nil { + return err + } + + eh.initialized = false + eh.logger.Info("EventHub shutdown complete") + return nil +} diff --git a/gateway/gateway-controller/pkg/eventhub/eventhub_test.go b/gateway/gateway-controller/pkg/eventhub/eventhub_test.go new file mode 100644 index 000000000..54eaaf909 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/eventhub_test.go @@ -0,0 +1,346 @@ +package eventhub + +import ( + "context" + "database/sql" + "encoding/json" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func setupTestDB(t *testing.T) *sql.DB { + db, err := sql.Open("sqlite3", ":memory:") + require.NoError(t, err) + + // Create organization_states table + _, err = db.Exec(` + CREATE TABLE organization_states ( + organization TEXT PRIMARY KEY, + version_id TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `) + require.NoError(t, err) + + // Create unified events table + _, err = db.Exec(` + CREATE TABLE events ( + organization_id TEXT NOT NULL, + processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + originated_timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + action TEXT NOT NULL CHECK(action IN ('CREATE', 'UPDATE', 'DELETE')), + entity_id TEXT NOT NULL, + correlation_id TEXT NOT NULL DEFAULT '', + event_data TEXT NOT NULL, + PRIMARY KEY (organization_id, processed_timestamp) + ) + `) + require.NoError(t, err) + + return db +} + +func TestEventHub_RegisterOrganization(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + hub := New(db, logger, DefaultConfig()) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + // Test successful registration + err = hub.RegisterOrganization("test-org") + assert.NoError(t, err) + + // Test duplicate registration + err = hub.RegisterOrganization("test-org") + assert.ErrorIs(t, err, ErrOrganizationAlreadyExists) +} + +func TestEventHub_PublishAndSubscribe(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + config := DefaultConfig() + config.PollInterval = 100 * time.Millisecond // Fast polling for test + hub := New(db, logger, config) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + // Register subscription + eventChan := make(chan []Event, 10) + err = hub.Subscribe("test-org", eventChan) + require.NoError(t, err) + + // Publish event + data, _ := json.Marshal(map[string]string{"key": "value"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id", data) + require.NoError(t, err) + + // Wait for event delivery via polling + select { + case events := <-eventChan: + assert.GreaterOrEqual(t, len(events), 1) + assert.Equal(t, "test-org", events[0].OrganizationID) + assert.Equal(t, EventTypeAPI, events[0].EventType) + assert.Equal(t, "CREATE", events[0].Action) + assert.Equal(t, "api-1", events[0].EntityID) + case <-time.After(time.Second): + t.Fatal("Timeout waiting for event") + } +} + +func TestEventHub_CleanUpEvents(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + hub := New(db, logger, DefaultConfig()) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + // Publish events + for i := 0; i < 5; i++ { + data, _ := json.Marshal(map[string]int{"index": i}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id", data) + require.NoError(t, err) + } + + // Cleanup all events + err = hub.CleanUpEvents(context.Background(), time.Time{}, time.Now().Add(time.Hour)) + require.NoError(t, err) + + // Verify events are deleted + var count int + err = db.QueryRow("SELECT COUNT(*) FROM events").Scan(&count) + require.NoError(t, err) + assert.Equal(t, 0, count) +} + +func TestEventHub_PollerDetectsChanges(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + config := DefaultConfig() + config.PollInterval = 50 * time.Millisecond + hub := New(db, logger, config) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + eventChan := make(chan []Event, 10) + err = hub.Subscribe("test-org", eventChan) + require.NoError(t, err) + + // Publish multiple events + for i := 0; i < 3; i++ { + data, _ := json.Marshal(map[string]int{"index": i}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id", data) + require.NoError(t, err) + time.Sleep(10 * time.Millisecond) + } + + // Wait for events to be delivered + var receivedEvents []Event + timeout := time.After(500 * time.Millisecond) + + for { + select { + case events := <-eventChan: + receivedEvents = append(receivedEvents, events...) + if len(receivedEvents) >= 3 { + assert.Len(t, receivedEvents, 3) + return + } + case <-timeout: + t.Fatalf("Timeout: received only %d events", len(receivedEvents)) + } + } +} + +func TestEventHub_AtomicPublish(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + hub := New(db, logger, DefaultConfig()) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + // Publish event + data, _ := json.Marshal(map[string]string{"test": "data"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id", data) + require.NoError(t, err) + + // Verify event was recorded in unified table + var eventCount int + err = db.QueryRow("SELECT COUNT(*) FROM events WHERE organization_id = ?", "test-org").Scan(&eventCount) + require.NoError(t, err) + assert.Equal(t, 1, eventCount) + + // Verify state was updated + var versionID string + err = db.QueryRow("SELECT version_id FROM organization_states WHERE organization = ?", "test-org").Scan(&versionID) + require.NoError(t, err) + assert.NotEmpty(t, versionID) +} + +func TestEventHub_MultipleSubscribers(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + config := DefaultConfig() + config.PollInterval = 50 * time.Millisecond + hub := New(db, logger, config) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + // Register multiple subscribers + eventChan1 := make(chan []Event, 10) + eventChan2 := make(chan []Event, 10) + err = hub.Subscribe("test-org", eventChan1) + require.NoError(t, err) + err = hub.Subscribe("test-org", eventChan2) + require.NoError(t, err) + + // Publish event + data, _ := json.Marshal(map[string]string{"test": "multi"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id", data) + require.NoError(t, err) + + // Both subscribers should receive the event + timeout := time.After(time.Second) + + select { + case events := <-eventChan1: + assert.Len(t, events, 1) + case <-timeout: + t.Fatal("Timeout waiting for event on subscriber 1") + } + + select { + case events := <-eventChan2: + assert.Len(t, events, 1) + case <-timeout: + t.Fatal("Timeout waiting for event on subscriber 2") + } +} + +func TestEventHub_GracefulShutdown(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + hub := New(db, logger, DefaultConfig()) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + // Close should complete without hanging + err = hub.Close() + assert.NoError(t, err) + + // Calling Close again should be safe + err = hub.Close() + assert.NoError(t, err) +} + +func TestEventHub_MultipleEventTypes(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + logger := zap.NewNop() + config := DefaultConfig() + config.PollInterval = 50 * time.Millisecond + hub := New(db, logger, config) + + err := hub.Initialize(context.Background()) + require.NoError(t, err) + defer hub.Close() + + err = hub.RegisterOrganization("test-org") + require.NoError(t, err) + + eventChan := make(chan []Event, 10) + err = hub.Subscribe("test-org", eventChan) + require.NoError(t, err) + + // Publish different event types + data1, _ := json.Marshal(map[string]string{"type": "api"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeAPI, "CREATE", "api-1", "test-correlation-id-1", data1) + require.NoError(t, err) + + data2, _ := json.Marshal(map[string]string{"type": "cert"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeCertificate, "UPDATE", "cert-1", "test-correlation-id-2", data2) + require.NoError(t, err) + + data3, _ := json.Marshal(map[string]string{"type": "llm"}) + err = hub.PublishEvent(context.Background(), "test-org", EventTypeLLMTemplate, "DELETE", "template-1", "test-correlation-id-3", data3) + require.NoError(t, err) + + // Wait for events to be delivered (all types should come through) + var receivedEvents []Event + timeout := time.After(time.Second) + + for { + select { + case events := <-eventChan: + receivedEvents = append(receivedEvents, events...) + if len(receivedEvents) >= 3 { + // Verify all event types were received + assert.Len(t, receivedEvents, 3) + + eventTypeMap := make(map[EventType]bool) + for _, e := range receivedEvents { + eventTypeMap[e.EventType] = true + } + + assert.True(t, eventTypeMap[EventTypeAPI]) + assert.True(t, eventTypeMap[EventTypeCertificate]) + assert.True(t, eventTypeMap[EventTypeLLMTemplate]) + return + } + case <-timeout: + t.Fatalf("Timeout: received only %d events", len(receivedEvents)) + } + } +} diff --git a/gateway/gateway-controller/pkg/eventhub/sqlite_backend.go b/gateway/gateway-controller/pkg/eventhub/sqlite_backend.go new file mode 100644 index 000000000..83b230c66 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/sqlite_backend.go @@ -0,0 +1,752 @@ +package eventhub + +import ( + "context" + "database/sql" + "fmt" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +// statementKey identifies a prepared statement for re-preparation +type statementKey int + +const ( + stmtKeyGetAllStates statementKey = iota + stmtKeyGetEventsSince + stmtKeyInsertEvent + stmtKeyUpdateOrgState + stmtKeyInsertOrgState + stmtKeyCleanup + stmtKeyCleanupRange +) + +// SQLiteBackend implements the Backend interface using SQLite with polling +type SQLiteBackend struct { + db *sql.DB + config *SQLiteBackendConfig + logger *zap.Logger + registry *organizationRegistry + + // Polling state + pollerCtx context.Context + pollerCancel context.CancelFunc + cleanupCtx context.Context + cleanupCancel context.CancelFunc + wg sync.WaitGroup + + // Prepared statements for performance + stmtGetAllStates *sql.Stmt + stmtGetEventsSince *sql.Stmt + stmtInsertEvent *sql.Stmt + stmtUpdateOrgState *sql.Stmt + stmtInsertOrgState *sql.Stmt + stmtCleanup *sql.Stmt + stmtCleanupRange *sql.Stmt + + initialized bool + mu sync.RWMutex +} + +// NewSQLiteBackend creates a new SQLite-based backend +func NewSQLiteBackend(db *sql.DB, logger *zap.Logger, config *SQLiteBackendConfig) *SQLiteBackend { + if config == nil { + config = DefaultSQLiteBackendConfig() + } + return &SQLiteBackend{ + db: db, + config: config, + logger: logger, + registry: newOrganizationRegistry(), + } +} + +// Initialize sets up the SQLite backend and starts background workers +func (b *SQLiteBackend) Initialize(ctx context.Context) error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.initialized { + return nil + } + + b.logger.Info("Initializing SQLite backend") + + // Prepare statements for performance + if err := b.prepareStatements(); err != nil { + return fmt.Errorf("failed to prepare statements: %w", err) + } + + // Start poller + b.pollerCtx, b.pollerCancel = context.WithCancel(ctx) + b.wg.Add(1) + go b.pollLoop() + + // Start cleanup goroutine + b.cleanupCtx, b.cleanupCancel = context.WithCancel(ctx) + b.wg.Add(1) + go b.cleanupLoop() + + b.initialized = true + b.logger.Info("SQLite backend initialized", + zap.Duration("pollInterval", b.config.PollInterval), + zap.Duration("cleanupInterval", b.config.CleanupInterval), + zap.Duration("retentionPeriod", b.config.RetentionPeriod), + ) + return nil +} + +// ensureInitialized checks if the backend is initialized and returns an error if not +func (b *SQLiteBackend) ensureInitialized() error { + b.mu.RLock() + defer b.mu.RUnlock() + + if !b.initialized { + return fmt.Errorf("SQLite backend not initialized") + } + return nil +} + +// getStatement returns the prepared statement for the given key (caller must hold at least RLock) +func (b *SQLiteBackend) getStatement(key statementKey) *sql.Stmt { + switch key { + case stmtKeyGetAllStates: + return b.stmtGetAllStates + case stmtKeyGetEventsSince: + return b.stmtGetEventsSince + case stmtKeyInsertEvent: + return b.stmtInsertEvent + case stmtKeyUpdateOrgState: + return b.stmtUpdateOrgState + case stmtKeyInsertOrgState: + return b.stmtInsertOrgState + case stmtKeyCleanup: + return b.stmtCleanup + case stmtKeyCleanupRange: + return b.stmtCleanupRange + default: + return nil + } +} + +// setStatement sets the prepared statement for the given key (caller must hold Lock) +func (b *SQLiteBackend) setStatement(key statementKey, stmt *sql.Stmt) { + switch key { + case stmtKeyGetAllStates: + b.stmtGetAllStates = stmt + case stmtKeyGetEventsSince: + b.stmtGetEventsSince = stmt + case stmtKeyInsertEvent: + b.stmtInsertEvent = stmt + case stmtKeyUpdateOrgState: + b.stmtUpdateOrgState = stmt + case stmtKeyInsertOrgState: + b.stmtInsertOrgState = stmt + case stmtKeyCleanup: + b.stmtCleanup = stmt + case stmtKeyCleanupRange: + b.stmtCleanupRange = stmt + } +} + +// prepareStatement prepares a single statement by key +func (b *SQLiteBackend) prepareStatement(key statementKey) (*sql.Stmt, error) { + var query string + switch key { + case stmtKeyGetAllStates: + query = ` + SELECT organization, version_id, updated_at + FROM organization_states + ORDER BY organization + ` + case stmtKeyGetEventsSince: + query = ` + SELECT processed_timestamp, originated_timestamp, event_type, + action, entity_id, correlation_id, event_data + FROM events + WHERE organization_id = ? AND processed_timestamp > ? + ORDER BY processed_timestamp ASC + ` + case stmtKeyInsertEvent: + query = ` + INSERT INTO events (organization_id, processed_timestamp, originated_timestamp, + event_type, action, entity_id, correlation_id, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ` + case stmtKeyUpdateOrgState: + query = ` + INSERT INTO organization_states (organization, version_id, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(organization) + DO UPDATE SET version_id = excluded.version_id, updated_at = excluded.updated_at + ` + case stmtKeyInsertOrgState: + query = ` + INSERT INTO organization_states (organization, version_id, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(organization) + DO NOTHING + ` + case stmtKeyCleanup: + query = `DELETE FROM events WHERE processed_timestamp < ?` + case stmtKeyCleanupRange: + query = `DELETE FROM events WHERE processed_timestamp >= ? AND processed_timestamp <= ?` + default: + return nil, fmt.Errorf("unknown statement key: %d", key) + } + + stmt, err := b.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement (key=%d): %w", key, err) + } + return stmt, nil +} + +// isRecoverableError checks if an error indicates a statement needs re-preparation +func (b *SQLiteBackend) isRecoverableError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + // SQLite schema change errors indicate statements need re-preparation + return strings.Contains(errStr, "schema") || + strings.Contains(errStr, "SQLITE_SCHEMA") +} + +// execWithRetry executes a prepared statement with automatic re-preparation on recoverable errors +func (b *SQLiteBackend) execWithRetry(ctx context.Context, key statementKey, args ...any) (sql.Result, error) { + b.mu.RLock() + stmt := b.getStatement(key) + b.mu.RUnlock() + + if stmt == nil { + return nil, fmt.Errorf("statement not initialized (key=%d)", key) + } + + result, err := stmt.ExecContext(ctx, args...) + if err != nil && b.isRecoverableError(err) { + // Re-prepare and retry once + b.logger.Warn("Statement execution failed with recoverable error, re-preparing", + zap.Int("statementKey", int(key)), + zap.Error(err)) + + b.mu.Lock() + newStmt, prepErr := b.prepareStatement(key) + if prepErr == nil { + // Close old statement + if oldStmt := b.getStatement(key); oldStmt != nil { + _ = oldStmt.Close() + } + b.setStatement(key, newStmt) + stmt = newStmt + } + b.mu.Unlock() + + if prepErr != nil { + return nil, fmt.Errorf("re-preparation failed after recoverable error: %w (original: %v)", prepErr, err) + } + + // Retry with new statement + result, err = stmt.ExecContext(ctx, args...) + } + return result, err +} + +// queryWithRetry executes a prepared query with automatic re-preparation on recoverable errors +func (b *SQLiteBackend) queryWithRetry(ctx context.Context, key statementKey, args ...any) (*sql.Rows, error) { + b.mu.RLock() + stmt := b.getStatement(key) + b.mu.RUnlock() + + if stmt == nil { + return nil, fmt.Errorf("statement not initialized (key=%d)", key) + } + + rows, err := stmt.QueryContext(ctx, args...) + if err != nil && b.isRecoverableError(err) { + // Re-prepare and retry once + b.logger.Warn("Statement query failed with recoverable error, re-preparing", + zap.Int("statementKey", int(key)), + zap.Error(err)) + + b.mu.Lock() + newStmt, prepErr := b.prepareStatement(key) + if prepErr == nil { + // Close old statement + if oldStmt := b.getStatement(key); oldStmt != nil { + _ = oldStmt.Close() + } + b.setStatement(key, newStmt) + stmt = newStmt + } + b.mu.Unlock() + + if prepErr != nil { + return nil, fmt.Errorf("re-preparation failed after recoverable error: %w (original: %v)", prepErr, err) + } + + // Retry with new statement + rows, err = stmt.QueryContext(ctx, args...) + } + return rows, err +} + +// prepareStatements prepares all frequently-used SQL statements for performance +func (b *SQLiteBackend) prepareStatements() (err error) { + // Clean up any successfully prepared statements if we fail partway through + defer func() { + if err != nil { + b.closeStatements() + } + }() + + // Prepare getAllStates query + b.stmtGetAllStates, err = b.db.Prepare(` + SELECT organization, version_id, updated_at + FROM organization_states + ORDER by organization + `) + if err != nil { + return fmt.Errorf("failed to prepare getAllStates: %w", err) + } + + // Prepare getEventsSince query + b.stmtGetEventsSince, err = b.db.Prepare(` + SELECT processed_timestamp, originated_timestamp, event_type, + action, entity_id, correlation_id, event_data + FROM events + WHERE organization_id = ? AND processed_timestamp > ? + ORDER BY processed_timestamp ASC + `) + if err != nil { + return fmt.Errorf("failed to prepare getEventsSince: %w", err) + } + + // Prepare insertEvent query + b.stmtInsertEvent, err = b.db.Prepare(` + INSERT INTO events (organization_id, processed_timestamp, originated_timestamp, + event_type, action, entity_id, correlation_id, event_data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare insertEvent: %w", err) + } + + // Prepare updateOrgState query + b.stmtUpdateOrgState, err = b.db.Prepare(` + INSERT INTO organization_states (organization, version_id, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(organization) + DO UPDATE SET version_id = excluded.version_id, updated_at = excluded.updated_at + `) + if err != nil { + return fmt.Errorf("failed to prepare updateOrgState: %w", err) + } + + // Prepare insertOrgState query (for RegisterOrganization) + b.stmtInsertOrgState, err = b.db.Prepare(` + INSERT INTO organization_states (organization, version_id, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(organization) + DO NOTHING + `) + if err != nil { + return fmt.Errorf("failed to prepare insertOrgState: %w", err) + } + + // Prepare cleanup query + b.stmtCleanup, err = b.db.Prepare(`DELETE FROM events WHERE processed_timestamp < ?`) + if err != nil { + return fmt.Errorf("failed to prepare cleanup: %w", err) + } + + // Prepare cleanupRange query + b.stmtCleanupRange, err = b.db.Prepare(`DELETE FROM events WHERE processed_timestamp >= ? AND processed_timestamp <= ?`) + if err != nil { + return fmt.Errorf("failed to prepare cleanupRange: %w", err) + } + + b.logger.Info("Prepared statements initialized successfully") + return nil +} + +// RegisterOrganization creates the necessary resources for an organization +func (b *SQLiteBackend) RegisterOrganization(ctx context.Context, orgID string) error { + if err := b.ensureInitialized(); err != nil { + return err + } + + // Register in local registry + if err := b.registry.register(orgID); err != nil { + return err + } + + // Initialize state in database using prepared statement with retry + _, err := b.execWithRetry(ctx, stmtKeyInsertOrgState, string(orgID), "", time.Now()) + if err != nil { + return fmt.Errorf("failed to initialize organization state: %w", err) + } + + b.logger.Info("Organization registered in SQLite backend", + zap.String("organization", string(orgID)), + ) + return nil +} + +// Publish publishes an event for an organization +func (b *SQLiteBackend) Publish(ctx context.Context, orgID string, + eventType EventType, action, entityID, correlationID string, eventData []byte) error { + + if err := b.ensureInitialized(); err != nil { + return err + } + + // Verify organization is registered + _, err := b.registry.get(orgID) + if err != nil { + return err + } + + // Publish atomically (event + state update in transaction) + tx, err := b.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + now := time.Now() + + // Use prepared statements within transaction + // Note: For transaction-bound statements, we use tx.Stmt() to get transaction-specific handles + // Retry logic doesn't apply within transactions as the transaction would need to be restarted + b.mu.RLock() + txStmtInsertEvent := tx.Stmt(b.stmtInsertEvent) + txStmtUpdateOrgState := tx.Stmt(b.stmtUpdateOrgState) + b.mu.RUnlock() + + // Insert event using prepared statement + _, err = txStmtInsertEvent.ExecContext(ctx, + string(orgID), now, now, string(eventType), action, entityID, correlationID, eventData) + if err != nil { + return fmt.Errorf("failed to record event: %w", err) + } + + // Update organization state version using prepared statement + newVersion := uuid.New().String() + _, err = txStmtUpdateOrgState.ExecContext(ctx, string(orgID), newVersion, now) + if err != nil { + return fmt.Errorf("failed to update state: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + b.logger.Debug("Event published", + zap.String("organization", string(orgID)), + zap.String("eventType", string(eventType)), + zap.String("action", action), + zap.String("entityID", entityID), + zap.String("correlationID", correlationID), + zap.String("version", newVersion), + ) + + return nil +} + +// Subscribe registers a channel to receive events for an organization +func (b *SQLiteBackend) Subscribe(orgID string, eventChan chan<- []Event) error { + if err := b.registry.addSubscriber(orgID, eventChan); err != nil { + return err + } + + b.logger.Info("Subscription registered", + zap.String("organization", string(orgID)), + ) + return nil +} + +// Unsubscribe removes a subscription channel for an organization +func (b *SQLiteBackend) Unsubscribe(orgID string, eventChan chan<- []Event) error { + if err := b.registry.removeSubscriber(orgID, eventChan); err != nil { + return err + } + + b.logger.Info("Subscription removed", + zap.String("organization", string(orgID)), + ) + return nil +} + +// Cleanup removes old events based on retention policy +func (b *SQLiteBackend) Cleanup(ctx context.Context, olderThan time.Time) error { + if err := b.ensureInitialized(); err != nil { + return err + } + + result, err := b.execWithRetry(ctx, stmtKeyCleanup, olderThan) + if err != nil { + return fmt.Errorf("failed to cleanup old events: %w", err) + } + + deleted, _ := result.RowsAffected() + b.logger.Info("Cleaned up old events", + zap.Int64("deleted", deleted), + zap.Time("olderThan", olderThan), + ) + return nil +} + +// CleanupRange removes events within a specific time range +func (b *SQLiteBackend) CleanupRange(ctx context.Context, from, to time.Time) error { + if err := b.ensureInitialized(); err != nil { + return err + } + + result, err := b.execWithRetry(ctx, stmtKeyCleanupRange, from, to) + if err != nil { + return fmt.Errorf("failed to cleanup events: %w", err) + } + + deleted, _ := result.RowsAffected() + b.logger.Info("Cleaned up events in range", + zap.Int64("deleted", deleted), + zap.Time("from", from), + zap.Time("to", to), + ) + return nil +} + +// Close gracefully shuts down the SQLite backend +func (b *SQLiteBackend) Close() error { + + b.mu.Lock() + defer b.mu.Unlock() + if !b.initialized { + return nil + } + + b.logger.Info("Shutting down SQLite backend") + + // Stop poller + if b.pollerCancel != nil { + b.pollerCancel() + } + + // Stop cleanup loop + if b.cleanupCancel != nil { + b.cleanupCancel() + } + + // Wait for goroutines + b.wg.Wait() + + // Close all prepared statements + b.closeStatements() + + b.initialized = false + b.logger.Info("SQLite backend shutdown complete") + return nil +} + +// closeStatements closes all prepared statements +func (b *SQLiteBackend) closeStatements() { + statements := []*sql.Stmt{ + b.stmtGetAllStates, + b.stmtGetEventsSince, + b.stmtInsertEvent, + b.stmtUpdateOrgState, + b.stmtInsertOrgState, + b.stmtCleanup, + b.stmtCleanupRange, + } + + for _, stmt := range statements { + if stmt != nil { + if err := stmt.Close(); err != nil { + b.logger.Warn("Failed to close prepared statement", zap.Error(err)) + } + } + } + b.logger.Info("Prepared statements closed successfully") +} + +// pollLoop runs the main polling loop for state changes +func (b *SQLiteBackend) pollLoop() { + defer b.wg.Done() + + ticker := time.NewTicker(b.config.PollInterval) + defer ticker.Stop() + + b.logger.Info("SQLite poller started", zap.Duration("interval", b.config.PollInterval)) + + for { + select { + case <-b.pollerCtx.Done(): + b.logger.Info("SQLite poller stopped") + return + case <-ticker.C: + b.pollAllOrganizations() + } + } +} + +// pollAllOrganizations checks all organizations for state changes +func (b *SQLiteBackend) pollAllOrganizations() { + ctx := b.pollerCtx + + // Single query for ALL organization states + states, err := b.getAllStates(ctx) + if err != nil { + b.logger.Error("Failed to fetch all states", zap.Error(err)) + return + } + + // Check each organization for changes + for _, state := range states { + orgID := state.Organization + + org, err := b.registry.get(orgID) + if err != nil { + // Organization not registered with subscribers, skip + continue + } + + // Check if version changed + if state.VersionID == org.knownVersion { + continue + } + + b.logger.Debug("State change detected", + zap.String("organization", string(orgID)), + zap.String("oldVersion", org.knownVersion), + zap.String("newVersion", state.VersionID), + ) + + // Fetch events since last poll + events, err := b.getEventsSince(ctx, orgID, org.lastPolled) + if err != nil { + b.logger.Error("Failed to fetch events", + zap.String("organization", string(orgID)), + zap.Error(err)) + continue + } + + if len(events) > 0 { + if b.deliverEvents(org, events) == nil { + org.updatePollState(state.VersionID, time.Now()) + } + // If delivery failed (channel full), don't update timestamp + // so events will be retried on next poll + } else { + org.updatePollState(state.VersionID, time.Now()) + } + } +} + +// getAllStates retrieves all organization states +func (b *SQLiteBackend) getAllStates(ctx context.Context) ([]OrganizationState, error) { + if err := b.ensureInitialized(); err != nil { + return nil, err + } + + rows, err := b.queryWithRetry(ctx, stmtKeyGetAllStates) + if err != nil { + return nil, fmt.Errorf("failed to query all states: %w", err) + } + defer rows.Close() + + var states []OrganizationState + for rows.Next() { + var state OrganizationState + if err := rows.Scan(&state.Organization, &state.VersionID, &state.UpdatedAt); err != nil { + return nil, fmt.Errorf("failed to scan state: %w", err) + } + states = append(states, state) + } + return states, rows.Err() +} + +// getEventsSince retrieves events for an organization after a given timestamp +func (b *SQLiteBackend) getEventsSince(ctx context.Context, orgID string, since time.Time) ([]Event, error) { + if err := b.ensureInitialized(); err != nil { + return nil, err + } + + // TODO: (VirajSalaka) Implement pagination if large number of events + rows, err := b.queryWithRetry(ctx, stmtKeyGetEventsSince, string(orgID), since) + if err != nil { + return nil, fmt.Errorf("failed to query events: %w", err) + } + defer rows.Close() + + var events []Event + for rows.Next() { + var e Event + var eventTypeStr string + e.OrganizationID = orgID + + if err := rows.Scan(&e.ProcessedTimestamp, &e.OriginatedTimestamp, + &eventTypeStr, &e.Action, &e.EntityID, &e.CorrelationID, &e.EventData); err != nil { + return nil, fmt.Errorf("failed to scan event: %w", err) + } + e.EventType = EventType(eventTypeStr) + events = append(events, e) + } + return events, rows.Err() +} + +// deliverEvents sends events to all subscribers of an organization +func (b *SQLiteBackend) deliverEvents(org *organization, events []Event) error { + subscribers := org.getSubscribers() + + if len(subscribers) == 0 { + b.logger.Debug("No subscribers for organization", + zap.String("organization", string(org.id)), + zap.Int("events", len(events)), + ) + return nil + } + + // TODO: (VirajSalaka) One subscriber is considered here. Handle multiple subscribers properly. + for _, ch := range subscribers { + select { + case ch <- events: + b.logger.Debug("Delivered events to subscriber", + zap.String("organization", string(org.id)), + zap.Int("events", len(events)), + ) + default: + b.logger.Error("Subscriber channel full, dropping events", + zap.String("organization", string(org.id)), + zap.Int("events", len(events)), + ) + return fmt.Errorf("subscriber channel full") + } + } + return nil +} + +// cleanupLoop runs periodic cleanup of old events +func (b *SQLiteBackend) cleanupLoop() { + defer b.wg.Done() + + ticker := time.NewTicker(b.config.CleanupInterval) + defer ticker.Stop() + + for { + select { + case <-b.cleanupCtx.Done(): + return + case <-ticker.C: + cutoff := time.Now().Add(-b.config.RetentionPeriod) + if err := b.Cleanup(b.cleanupCtx, cutoff); err != nil { + b.logger.Error("Periodic cleanup failed", zap.Error(err)) + } + } + } +} diff --git a/gateway/gateway-controller/pkg/eventhub/topic.go b/gateway/gateway-controller/pkg/eventhub/topic.go new file mode 100644 index 000000000..459a0db93 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/topic.go @@ -0,0 +1,133 @@ +package eventhub + +import ( + "errors" + "sync" + "time" +) + +var ( + ErrOrganizationNotFound = errors.New("organization not found") + ErrOrganizationAlreadyExists = errors.New("organization already registered") +) + +// organization represents an internal organization with its subscriptions and poll state +type organization struct { + id string + subscribers []chan<- []Event // Registered subscription channels + subscriberMu sync.RWMutex + + // Polling state + knownVersion string // Last known version from organization_states table + lastPolled time.Time // Timestamp of last successful poll +} + +// organizationRegistry manages all registered organizations +type organizationRegistry struct { + orgs map[string]*organization + mu sync.RWMutex +} + +// newOrganizationRegistry creates a new organization registry +func newOrganizationRegistry() *organizationRegistry { + return &organizationRegistry{ + orgs: make(map[string]*organization), + } +} + +// register adds a new organization to the registry +func (r *organizationRegistry) register(id string) error { + r.mu.Lock() + defer r.mu.Unlock() + + if _, exists := r.orgs[id]; exists { + return ErrOrganizationAlreadyExists + } + + r.orgs[id] = &organization{ + id: id, + subscribers: make([]chan<- []Event, 0), + lastPolled: time.Now(), // Start from now, don't replay old events + } + + return nil +} + +// get retrieves an organization by ID +func (r *organizationRegistry) get(id string) (*organization, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + org, exists := r.orgs[id] + if !exists { + return nil, ErrOrganizationNotFound + } + return org, nil +} + +// addSubscriber adds a subscription channel to an organization +func (r *organizationRegistry) addSubscriber(id string, ch chan<- []Event) error { + r.mu.RLock() + org, exists := r.orgs[id] + r.mu.RUnlock() + + if !exists { + return ErrOrganizationNotFound + } + + org.subscriberMu.Lock() + defer org.subscriberMu.Unlock() + org.subscribers = append(org.subscribers, ch) + return nil +} + +// removeSubscriber removes a subscription channel from an organization +func (r *organizationRegistry) removeSubscriber(id string, ch chan<- []Event) error { + r.mu.RLock() + org, exists := r.orgs[id] + r.mu.RUnlock() + + if !exists { + return ErrOrganizationNotFound + } + + org.subscriberMu.Lock() + defer org.subscriberMu.Unlock() + + // Find and remove the subscriber + for i, sub := range org.subscribers { + if sub == ch { + org.subscribers = append(org.subscribers[:i], org.subscribers[i+1:]...) + return nil + } + } + return nil // Not found is not an error +} + +// getAll returns all registered organizations +func (r *organizationRegistry) getAll() []*organization { + r.mu.RLock() + defer r.mu.RUnlock() + + orgs := make([]*organization, 0, len(r.orgs)) + for _, org := range r.orgs { + orgs = append(orgs, org) + } + return orgs +} + +// updatePollState updates the polling state for an organization +func (org *organization) updatePollState(version string, polledAt time.Time) { + org.knownVersion = version + org.lastPolled = polledAt +} + +// getSubscribers returns a copy of the subscribers list +func (org *organization) getSubscribers() []chan<- []Event { + org.subscriberMu.RLock() + defer org.subscriberMu.RUnlock() + + subs := make([]chan<- []Event, len(org.subscribers)) + copy(subs, org.subscribers) + return subs +} diff --git a/gateway/gateway-controller/pkg/eventhub/types.go b/gateway/gateway-controller/pkg/eventhub/types.go new file mode 100644 index 000000000..283d41558 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventhub/types.go @@ -0,0 +1,81 @@ +package eventhub + +import ( + "context" + "time" +) + +// EventType represents the type of event +type EventType string + +// Event type constants +const ( + EventTypeAPI EventType = "API" + EventTypeCertificate EventType = "CERTIFICATE" + EventTypeLLMTemplate EventType = "LLM_TEMPLATE" +) + +// Event represents a single event in the hub +type Event struct { + OrganizationID string // Organization this event belongs to + ProcessedTimestamp time.Time // When event was recorded in DB + OriginatedTimestamp time.Time // When event was created + EventType EventType // Type of event (API, CERTIFICATE, etc.) + Action string // CREATE, UPDATE, or DELETE + EntityID string // ID of the affected entity + CorrelationID string // Correlation ID for request tracing + EventData []byte // JSON serialized payload +} + +// OrganizationState represents the version state for an organization +type OrganizationState struct { + Organization string // Organization ID + VersionID string // UUID that changes on every modification + UpdatedAt time.Time // Last update timestamp +} + +// EventHub is the main interface for the message broker +type EventHub interface { + // Initialize sets up database connections and starts background poller + Initialize(ctx context.Context) error + + // RegisterOrganization registers an organization for event tracking + // Creates entry in organization_states table with empty version + RegisterOrganization(organizationID string) error + + // PublishEvent publishes an event for an organization + // Updates the organization_states and events tables atomically + PublishEvent(ctx context.Context, organizationID string, eventType EventType, + action, entityID, correlationID string, eventData []byte) error + + // Subscribe registers a channel to receive events for an organization + // Events are delivered as batches (arrays) based on poll cycle + // Subscriber receives ALL event types and should filter by EventType if needed + Subscribe(organizationID string, eventChan chan<- []Event) error + + // CleanUpEvents removes events between the specified time range + CleanUpEvents(ctx context.Context, timeFrom, timeEnd time.Time) error + + // Close gracefully shuts down the EventHub + Close() error +} + + +// Config holds EventHub configuration +type Config struct { + // PollInterval is how often to poll for state changes + PollInterval time.Duration + // CleanupInterval is how often to run automatic cleanup + CleanupInterval time.Duration + // RetentionPeriod is how long to keep events (default 1 hour) + RetentionPeriod time.Duration +} + +// DefaultConfig returns sensible defaults +func DefaultConfig() Config { + return Config{ + PollInterval: time.Second * 5, + CleanupInterval: time.Minute * 10, + RetentionPeriod: time.Hour, + } +} diff --git a/gateway/gateway-controller/pkg/eventlistener/api_processor.go b/gateway/gateway-controller/pkg/eventlistener/api_processor.go new file mode 100644 index 000000000..60a16b9ba --- /dev/null +++ b/gateway/gateway-controller/pkg/eventlistener/api_processor.go @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventlistener + +import ( + "context" + "strings" + "time" + + api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" + policyenginev1 "github.com/wso2/api-platform/sdk/gateway/policyengine/v1" + "go.uber.org/zap" +) + +// processAPIEvents handles API events based on action type. +// Works with the generic Event type from any EventSource implementation. +func (el *EventListener) processAPIEvents(event Event) { + log := el.logger.With( + zap.String("api_id", event.EntityID), + zap.String("action", event.Action), + zap.String("correlation_id", event.CorrelationID), + ) + + apiID := event.EntityID + // TODO: (VirajSalaka) Use Context to propogate correlationID + switch event.Action { + case "CREATE", "UPDATE": + el.handleAPICreateOrUpdate(apiID, event.CorrelationID, log) + case "DELETE": + el.handleAPIDelete(apiID, event.CorrelationID, log) + default: + log.Warn("Unknown action type") + } +} + +// handleAPICreateOrUpdate fetches the API from DB and updates XDS +func (el *EventListener) handleAPICreateOrUpdate(apiID string, correlationID string, log *zap.Logger) { + // 1. Fetch API configuration from database + config, err := el.db.GetConfig(apiID) + if err != nil { + log.Error("Failed to fetch API config from database", zap.Error(err)) + return + } + + // 2. Update in-memory store (add or update) + _, err = el.store.Get(apiID) + if err != nil { + // Config doesn't exist in memory - add it + if err := el.store.Add(config); err != nil { + log.Error("Failed to add config to store", zap.Error(err)) + return + } + log.Info("Added API config to in-memory store") + } else { + // Config exists - update it + if err := el.store.Update(config); err != nil { + log.Error("Failed to update config in store", zap.Error(err)) + return + } + log.Info("Updated API config in in-memory store") + } + + var storedPolicy *models.StoredPolicyConfig + + if el.policyManager != nil { + storedPolicy = el.buildStoredPolicyFromAPI(config) + } + + // TODO: (VirajSalaka) Handle failures in policy addition properly (rollback) + // TODO: (VirajSalaka) Use ErrGroup to parallelize XDS update and policy addition + // 3. Trigger async XDS snapshot update + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := el.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { + log.Error("Failed to update xDS snapshot", zap.Error(err)) + } else { + log.Info("xDS snapshot updated successfully") + } + }() + + // 4. Update PolicyManager (if configured) + if storedPolicy != nil { + if err := el.policyManager.AddPolicy(storedPolicy); err != nil { + log.Warn("Failed to add policy to PolicyManager", zap.Error(err)) + } else { + log.Info("Added policy to PolicyManager") + } + } +} + +// handleAPIDelete removes the API from in-memory store and updates XDS +func (el *EventListener) handleAPIDelete(apiID string, correlationID string, log *zap.Logger) { + // 1. Check if config exists in store (for logging/policy removal) + config, err := el.store.Get(apiID) + if err != nil { + log.Warn("Config not found in store, may already be deleted") + // Continue anyway to ensure cleanup + } + + // 2. Remove from in-memory store + if err := el.store.Delete(apiID); err != nil { + log.Warn("Failed to delete config from store", zap.Error(err)) + // Continue - config may not exist + } else { + log.Info("Removed API config from in-memory store") + } + + // 3. Trigger async XDS snapshot update + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := el.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { + log.Error("Failed to update xDS snapshot after delete", zap.Error(err)) + } else { + log.Info("xDS snapshot updated after delete") + } + }() + + // 4. Remove from PolicyManager (if configured) + if el.policyManager != nil && config != nil { + policyID := apiID + "-policies" + if err := el.policyManager.RemovePolicy(policyID); err != nil { + log.Warn("Failed to remove policy from PolicyManager", zap.Error(err)) + } else { + log.Info("Removed policy from PolicyManager") + } + } +} + +// buildStoredPolicyFromAPI constructs a StoredPolicyConfig from an API config +// Merging rules: When operation has policies, they define the order (can reorder, override, or extend API policies). +// Remaining API-level policies not mentioned in operation policies are appended at the end. +// When operation has no policies, API-level policies are used in their declared order. +// RouteKey uses the fully qualified route path (context + operation path) and must match the route name format +// used by the xDS translator for consistency. +func (el *EventListener) buildStoredPolicyFromAPI(cfg *models.StoredConfig) *models.StoredPolicyConfig { + apiCfg := &cfg.Configuration + + // Collect API-level policies + apiPolicies := make(map[string]policyenginev1.PolicyInstance) // name -> policy + if cfg.GetPolicies() != nil { + for _, p := range *cfg.GetPolicies() { + apiPolicies[p.Name] = convertAPIPolicy(p) + } + } + + routes := make([]policyenginev1.PolicyChain, 0) + switch apiCfg.Kind { + case api.Asyncwebsub: + // Build routes with merged policies for WebSub + apiData, err := apiCfg.Spec.AsWebhookAPIData() + if err != nil { + el.logger.Error("Failed to parse WebSub API data", zap.Error(err)) + return nil + } + for _, ch := range apiData.Channels { + var finalPolicies []policyenginev1.PolicyInstance + + if ch.Policies != nil && len(*ch.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*ch.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *ch.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + routeKey := xds.GenerateRouteName("POST", apiData.Context, apiData.Version, ch.Path, el.routerConfig.GatewayHost) + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: routeKey, + Policies: finalPolicies, + }) + } + case api.RestApi: + // Build routes with merged policies for REST API + apiData, err := apiCfg.Spec.AsAPIConfigData() + if err != nil { + el.logger.Error("Failed to parse REST API data", zap.Error(err)) + return nil + } + for _, op := range apiData.Operations { + var finalPolicies []policyenginev1.PolicyInstance + + if op.Policies != nil && len(*op.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*op.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *op.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + // Determine effective vhosts (fallback to global router defaults when not provided) + effectiveMainVHost := el.routerConfig.VHosts.Main.Default + effectiveSandboxVHost := el.routerConfig.VHosts.Sandbox.Default + if apiData.Vhosts != nil { + if strings.TrimSpace(apiData.Vhosts.Main) != "" { + effectiveMainVHost = apiData.Vhosts.Main + } + if apiData.Vhosts.Sandbox != nil && strings.TrimSpace(*apiData.Vhosts.Sandbox) != "" { + effectiveSandboxVHost = *apiData.Vhosts.Sandbox + } + } + + vhosts := []string{effectiveMainVHost} + if apiData.Upstream.Sandbox != nil && apiData.Upstream.Sandbox.Url != nil && + strings.TrimSpace(*apiData.Upstream.Sandbox.Url) != "" { + vhosts = append(vhosts, effectiveSandboxVHost) + } + + for _, vhost := range vhosts { + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: xds.GenerateRouteName(string(op.Method), apiData.Context, apiData.Version, op.Path, vhost), + Policies: finalPolicies, + }) + } + } + } + + // If there are no policies at all, return nil (skip creation) + policyCount := 0 + for _, r := range routes { + policyCount += len(r.Policies) + } + if policyCount == 0 { + return nil + } + + now := time.Now().Unix() + stored := &models.StoredPolicyConfig{ + ID: cfg.ID + "-policies", + Configuration: policyenginev1.Configuration{ + Routes: routes, + Metadata: policyenginev1.Metadata{ + CreatedAt: now, + UpdatedAt: now, + ResourceVersion: 0, + APIName: cfg.GetDisplayName(), + Version: cfg.GetVersion(), + Context: cfg.GetContext(), + }, + }, + Version: 0, + } + return stored +} + +// convertAPIPolicy converts generated api.Policy to policyenginev1.PolicyInstance +func convertAPIPolicy(p api.Policy) policyenginev1.PolicyInstance { + paramsMap := make(map[string]interface{}) + if p.Params != nil { + for k, v := range *p.Params { + paramsMap[k] = v + } + } + return policyenginev1.PolicyInstance{ + Name: p.Name, + Version: p.Version, + Enabled: true, // Default to enabled + ExecutionCondition: p.ExecutionCondition, + Parameters: paramsMap, + } +} diff --git a/gateway/gateway-controller/pkg/eventlistener/event_source.go b/gateway/gateway-controller/pkg/eventlistener/event_source.go new file mode 100644 index 000000000..0dfbbc759 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventlistener/event_source.go @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventlistener + +import ( + "context" + "time" +) + +// Event represents a generic event from any event source. +// This is a simplified, agnostic event structure that can be populated +// by different event source implementations (EventHub, Kafka, RabbitMQ, etc.) +type Event struct { + // OrganizationID identifies which organization this event belongs to + OrganizationID string + + // EventType describes the kind of event (e.g., "API", "CERTIFICATE", "LLM_TEMPLATE") + EventType string + + // Action describes what happened (e.g., "CREATE", "UPDATE", "DELETE") + Action string + + // EntityID identifies the specific entity affected by the event + EntityID string + + // CorrelationID is used to trace requests across services + CorrelationID string + + // EventData contains the serialized event payload (typically JSON) + EventData []byte + + // Timestamp indicates when the event occurred + Timestamp time.Time +} + +// EventSource defines the interface for any event delivery mechanism. +// Implementations can use EventHub, message brokers (Kafka, RabbitMQ, NATS), +// or any other pub/sub system. +// +// Design principles: +// - Simple and focused: only what EventListener needs +// - Technology agnostic: no assumptions about the underlying system +// - Testable: easy to mock for unit tests +type EventSource interface { + // Subscribe registers to receive events for a specific organization. + // Events are delivered as batches via the provided channel. + // + // Parameters: + // - ctx: Context for cancellation and timeout + // - organizationID: The organization to subscribe to + // - eventChan: Channel where event batches will be sent + // + // Returns: + // - error if subscription fails + // + // Notes: + // - The implementation is responsible for managing the subscription lifecycle + // - Events should be delivered until Unsubscribe is called or ctx is cancelled + // - The channel should NOT be closed by the EventSource + Subscribe(ctx context.Context, organizationID string, eventChan chan<- []Event) error + + // Unsubscribe stops receiving events for a specific organization. + // + // Parameters: + // - organizationID: The organization to unsubscribe from + // + // Returns: + // - error if unsubscribe fails + // + // Notes: + // - After calling Unsubscribe, no more events should be sent to the channel + // - It's safe to call Unsubscribe multiple times + Unsubscribe(organizationID string) error + + // Close gracefully shuts down the event source and cleans up resources. + // + // Returns: + // - error if shutdown fails + // + // Notes: + // - Should unsubscribe all active subscriptions + // - Should wait for in-flight events to be delivered + // - Should be idempotent (safe to call multiple times) + Close() error +} diff --git a/gateway/gateway-controller/pkg/eventlistener/eventhub_adapter.go b/gateway/gateway-controller/pkg/eventlistener/eventhub_adapter.go new file mode 100644 index 000000000..7e5f00593 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventlistener/eventhub_adapter.go @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventlistener + +import ( + "context" + "fmt" + "sync" + + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventhub" + "go.uber.org/zap" +) + +// EventHubAdapter adapts the eventhub.EventHub interface to the generic EventSource interface. +// This allows EventListener to work with the existing EventHub implementation while maintaining +// abstraction and testability. +type EventHubAdapter struct { + eventHub eventhub.EventHub + logger *zap.Logger + + // activeSubscriptions tracks which organizations have active subscriptions + // This is used to ensure proper cleanup and prevent duplicate subscriptions + activeSubscriptions sync.Map // map[string]chan<- []Event +} + +// NewEventHubAdapter creates a new adapter that wraps an EventHub instance. +// +// Parameters: +// - eventHub: The EventHub instance to wrap +// - logger: Logger for debugging and error reporting +// +// Returns: +// - EventSource implementation backed by EventHub +func NewEventHubAdapter(eventHub eventhub.EventHub, logger *zap.Logger) EventSource { + return &EventHubAdapter{ + eventHub: eventHub, + logger: logger, + // activeSubscriptions sync.Map zero value is ready to use + } +} + +// Subscribe implements EventSource.Subscribe by delegating to EventHub. +// It handles organization registration and event conversion. +func (a *EventHubAdapter) Subscribe(ctx context.Context, organizationID string, eventChan chan<- []Event) error { + // Check if already subscribed + if _, exists := a.activeSubscriptions.Load(organizationID); exists { + return fmt.Errorf("already subscribed to organization: %s", organizationID) + } + + // Register organization with EventHub (idempotent operation) + if err := a.eventHub.RegisterOrganization(organizationID); err != nil { + a.logger.Debug("Organization may already be registered", + zap.String("organization", organizationID), + zap.Error(err), + ) + // Continue - registration errors are usually not fatal + } + + // Create a bridge channel that receives eventhub.Event and converts to generic Event + bridgeChan := make(chan []eventhub.Event, 10) + + // Subscribe to EventHub + if err := a.eventHub.Subscribe(organizationID, bridgeChan); err != nil { + close(bridgeChan) + return fmt.Errorf("failed to subscribe to eventhub: %w", err) + } + + // Track this subscription + a.activeSubscriptions.Store(organizationID, eventChan) + + // Start goroutine to convert and forward events + go a.bridgeEvents(ctx, organizationID, bridgeChan, eventChan) + + a.logger.Info("Subscribed to event source", + zap.String("organization", organizationID), + zap.String("source", "eventhub"), + ) + + return nil +} + +// bridgeEvents converts eventhub.Event to generic Event and forwards to the listener. +// This goroutine runs until the context is cancelled or the bridge channel is closed. +func (a *EventHubAdapter) bridgeEvents( + ctx context.Context, + organizationID string, + from <-chan []eventhub.Event, + to chan<- []Event, +) { + defer func() { + // Clean up subscription tracking + a.activeSubscriptions.Delete(organizationID) + a.logger.Debug("Bridge goroutine exiting", + zap.String("organization", organizationID), + ) + }() + + for { + select { + case <-ctx.Done(): + return + + case hubEvents, ok := <-from: + if !ok { + // EventHub channel closed + a.logger.Warn("EventHub channel closed unexpectedly", + zap.String("organization", organizationID), + ) + return + } + + // Convert eventhub.Event to generic Event + genericEvents := make([]Event, len(hubEvents)) + for i, hubEvent := range hubEvents { + eventCorrelationID := "event-" + hubEvent.CorrelationID + genericEvents[i] = Event{ + OrganizationID: string(hubEvent.OrganizationID), + EventType: string(hubEvent.EventType), + Action: hubEvent.Action, + EntityID: hubEvent.EntityID, + EventData: hubEvent.EventData, + CorrelationID: eventCorrelationID, + Timestamp: hubEvent.ProcessedTimestamp, + } + } + + // Forward to listener + select { + case to <- genericEvents: + // Successfully forwarded + case <-ctx.Done(): + return + } + } + } +} + +// Unsubscribe implements EventSource.Unsubscribe. +// Note: The current EventHub implementation doesn't have an explicit unsubscribe method, +// so we just stop the bridge goroutine by removing the subscription tracking. +func (a *EventHubAdapter) Unsubscribe(organizationID string) error { + if _, exists := a.activeSubscriptions.Load(organizationID); !exists { + // Not subscribed - this is fine, make it idempotent + return nil + } + + // Remove from tracking - the bridge goroutine will detect context cancellation + // when the listener stops + a.activeSubscriptions.Delete(organizationID) + + a.logger.Info("Unsubscribed from event source", + zap.String("organization", organizationID), + ) + + return nil +} + +// Close implements EventSource.Close by delegating to EventHub.Close. +func (a *EventHubAdapter) Close() error { + // Clean up all subscriptions + a.activeSubscriptions.Clear() + + // Close the underlying EventHub + if err := a.eventHub.Close(); err != nil { + return fmt.Errorf("failed to close eventhub: %w", err) + } + + a.logger.Info("Event source closed", zap.String("source", "eventhub")) + return nil +} diff --git a/gateway/gateway-controller/pkg/eventlistener/listener.go b/gateway/gateway-controller/pkg/eventlistener/listener.go new file mode 100644 index 000000000..db9925e70 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventlistener/listener.go @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventlistener + +import ( + "context" + "sync" + + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" + "go.uber.org/zap" +) + +// EventListener subscribes to an EventSource and processes events to update XDS. +// It uses the generic EventSource interface, allowing it to work with different +// event delivery mechanisms (EventHub, Kafka, RabbitMQ, etc.) and enabling easy mocking for tests. +type EventListener struct { + eventSource EventSource // Generic event source (EventHub, Kafka, etc.) + store *storage.ConfigStore // In-memory config store + db storage.Storage // Persistent storage (SQLite) + snapshotManager *xds.SnapshotManager // XDS snapshot manager + policyManager *policyxds.PolicyManager // Optional: policy manager + routerConfig *config.RouterConfig // Router configuration for vhosts + logger *zap.Logger + + eventChan chan []Event // Buffered channel (size 10) for generic events + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewEventListener creates a new EventListener instance. +// +// Parameters: +// - eventSource: The event source to subscribe to (can be EventHubAdapter, MockEventSource, or any EventSource implementation) +// - store: In-memory configuration store +// - db: Persistent storage (SQLite) +// - snapshotManager: xDS snapshot manager for updating Envoy configuration +// - policyManager: Optional policy manager (can be nil) +// - routerConfig: Router configuration for vhosts +// - logger: Structured logger +// +// Returns: +// - *EventListener ready to be started +func NewEventListener( + eventSource EventSource, + store *storage.ConfigStore, + db storage.Storage, + snapshotManager *xds.SnapshotManager, + policyManager *policyxds.PolicyManager, // Can be nil + routerConfig *config.RouterConfig, + logger *zap.Logger, +) *EventListener { + return &EventListener{ + eventSource: eventSource, + store: store, + db: db, + snapshotManager: snapshotManager, + policyManager: policyManager, + routerConfig: routerConfig, + logger: logger, + } +} + +// Start initializes the event listener and starts processing events. +// Subscribes to the "default" organization and starts the event processing goroutine. +func (el *EventListener) Start(ctx context.Context) error { + el.ctx, el.cancel = context.WithCancel(ctx) + + // Create buffered channel with size 10 for generic events + el.eventChan = make(chan []Event, 10) + + // Subscribe to "default" organization events via the EventSource + organizationID := "default" + if err := el.eventSource.Subscribe(ctx, organizationID, el.eventChan); err != nil { + return err // Let the EventSource adapter handle details + } + + // Start processing goroutine + el.wg.Add(1) + // TODO: (VirajSalaka) Should recover in case of panics + go el.processEvents() + + el.logger.Info("EventListener started", zap.String("organization", organizationID)) + return nil +} + +// processEvents is a goroutine that continuously processes events from the channel +func (el *EventListener) processEvents() { + defer el.wg.Done() + + for { + select { + case <-el.ctx.Done(): + el.logger.Info("EventListener stopping") + return + + case events := <-el.eventChan: + for _, event := range events { + el.handleEvent(event) + } + } + } +} + +// handleEvent processes a single event and delegates based on event type. +// Uses the generic Event type that works with any EventSource implementation. +func (el *EventListener) handleEvent(event Event) { + log := el.logger.With( + zap.String("event_type", event.EventType), + zap.String("action", event.Action), + zap.String("entity_id", event.EntityID), + zap.String("correlation_id", event.CorrelationID), + ) + + switch event.EventType { + case "API": // EventTypeAPI constant + el.processAPIEvents(event) + default: + log.Debug("Ignoring non-API event") + } +} + +// Stop gracefully shuts down the event listener +func (el *EventListener) Stop() { + if el.cancel != nil { + el.cancel() + } + el.wg.Wait() + + if el.eventChan != nil { + close(el.eventChan) + } + + el.logger.Info("EventListener stopped") +} diff --git a/gateway/gateway-controller/pkg/eventlistener/mock_event_source.go b/gateway/gateway-controller/pkg/eventlistener/mock_event_source.go new file mode 100644 index 000000000..4e326c4b2 --- /dev/null +++ b/gateway/gateway-controller/pkg/eventlistener/mock_event_source.go @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package eventlistener + +import ( + "context" + "fmt" + "sync" +) + +// MockEventSource is a mock implementation of EventSource for testing. +// It allows tests to control event delivery and verify subscription behavior. +// +// Usage example: +// +// mock := NewMockEventSource() +// listener := NewEventListener(mock, store, db, ...) +// listener.Start(ctx) +// +// // Publish test events +// mock.PublishEvent("default", Event{...}) +// +// // Verify subscription +// if !mock.IsSubscribed("default") { +// t.Error("Expected subscription to default org") +// } +type MockEventSource struct { + mu sync.RWMutex + subscriptions map[string]chan<- []Event + closed bool + + // PublishedEvents tracks all events published through this mock + PublishedEvents []Event + + // SubscribeCalls tracks Subscribe method invocations + SubscribeCalls []SubscribeCall + + // UnsubscribeCalls tracks Unsubscribe method invocations + UnsubscribeCalls []string + + // Errors can be set to simulate failures + SubscribeError error + UnsubscribeError error + CloseError error +} + +// SubscribeCall records a Subscribe method invocation +type SubscribeCall struct { + OrganizationID string + Context context.Context +} + +// NewMockEventSource creates a new mock event source for testing. +func NewMockEventSource() *MockEventSource { + return &MockEventSource{ + subscriptions: make(map[string]chan<- []Event), + PublishedEvents: make([]Event, 0), + SubscribeCalls: make([]SubscribeCall, 0), + UnsubscribeCalls: make([]string, 0), + } +} + +// Subscribe implements EventSource.Subscribe for testing. +// Records the call and sets up event delivery. +func (m *MockEventSource) Subscribe(ctx context.Context, organizationID string, eventChan chan<- []Event) error { + m.mu.Lock() + defer m.mu.Unlock() + + // Record the call + m.SubscribeCalls = append(m.SubscribeCalls, SubscribeCall{ + OrganizationID: organizationID, + Context: ctx, + }) + + // Return configured error if set + if m.SubscribeError != nil { + return m.SubscribeError + } + + // Check if already subscribed + if _, exists := m.subscriptions[organizationID]; exists { + return fmt.Errorf("already subscribed to organization: %s", organizationID) + } + + // Store subscription + m.subscriptions[organizationID] = eventChan + + return nil +} + +// Unsubscribe implements EventSource.Unsubscribe for testing. +// Records the call and removes the subscription. +func (m *MockEventSource) Unsubscribe(organizationID string) error { + m.mu.Lock() + defer m.mu.Unlock() + + // Record the call + m.UnsubscribeCalls = append(m.UnsubscribeCalls, organizationID) + + // Return configured error if set + if m.UnsubscribeError != nil { + return m.UnsubscribeError + } + + // Remove subscription + delete(m.subscriptions, organizationID) + + return nil +} + +// Close implements EventSource.Close for testing. +func (m *MockEventSource) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + // Return configured error if set + if m.CloseError != nil { + return m.CloseError + } + + m.closed = true + m.subscriptions = make(map[string]chan<- []Event) + + return nil +} + +// PublishEvent publishes an event to subscribers (test helper). +// This simulates an event being delivered from the event source. +// +// Parameters: +// - organizationID: The organization to publish to +// - events: One or more events to publish as a batch +// +// Returns: +// - error if no subscription exists for the organization +func (m *MockEventSource) PublishEvent(organizationID string, events ...Event) error { + m.mu.RLock() + eventChan, exists := m.subscriptions[organizationID] + m.mu.RUnlock() + + if !exists { + return fmt.Errorf("no subscription for organization: %s", organizationID) + } + + // Track published events + m.mu.Lock() + m.PublishedEvents = append(m.PublishedEvents, events...) + m.mu.Unlock() + + // Send events to subscriber + eventChan <- events + + return nil +} + +// IsSubscribed checks if an organization has an active subscription (test helper). +func (m *MockEventSource) IsSubscribed(organizationID string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + _, exists := m.subscriptions[organizationID] + return exists +} + +// IsClosed checks if Close has been called (test helper). +func (m *MockEventSource) IsClosed() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.closed +} + +// Reset clears all recorded calls and state (test helper). +// Useful for resetting state between test cases. +func (m *MockEventSource) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + + m.subscriptions = make(map[string]chan<- []Event) + m.closed = false + m.PublishedEvents = make([]Event, 0) + m.SubscribeCalls = make([]SubscribeCall, 0) + m.UnsubscribeCalls = make([]string, 0) + m.SubscribeError = nil + m.UnsubscribeError = nil + m.CloseError = nil +} + +// GetSubscriptionCount returns the number of active subscriptions (test helper). +func (m *MockEventSource) GetSubscriptionCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + return len(m.subscriptions) +} diff --git a/gateway/gateway-controller/pkg/storage/gateway-controller-db.sql b/gateway/gateway-controller/pkg/storage/gateway-controller-db.sql index 9b9badac0..ac842aa0e 100644 --- a/gateway/gateway-controller/pkg/storage/gateway-controller-db.sql +++ b/gateway/gateway-controller/pkg/storage/gateway-controller-db.sql @@ -150,5 +150,31 @@ CREATE INDEX IF NOT EXISTS idx_api_key_status ON api_keys(status); CREATE INDEX IF NOT EXISTS idx_api_key_expiry ON api_keys(expires_at); CREATE INDEX IF NOT EXISTS idx_created_by ON api_keys(created_by); --- Set schema version to 5 -PRAGMA user_version = 5; +-- EventHub: Organization States Table (added in schema version 6) +-- Tracks version information per organization for change detection +CREATE TABLE IF NOT EXISTS organization_states ( + organization TEXT PRIMARY KEY, + version_id TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_organization_states_updated ON organization_states(updated_at); + +-- EventHub: Unified Events Table (added in schema version 6) +-- Stores all entity change events (APIs, certificates, LLM templates, etc.) +CREATE TABLE IF NOT EXISTS events ( + organization_id TEXT NOT NULL, + processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + originated_timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + action TEXT NOT NULL CHECK(action IN ('CREATE', 'UPDATE', 'DELETE')), + entity_id TEXT NOT NULL, + correlation_id TEXT NOT NULL, + event_data TEXT NOT NULL, + PRIMARY KEY (correlation_id) +); + +CREATE INDEX IF NOT EXISTS idx_events_org_time ON events(organization_id, processed_timestamp); + +-- Set schema version to 6 +PRAGMA user_version = 6; diff --git a/gateway/gateway-controller/pkg/storage/interface.go b/gateway/gateway-controller/pkg/storage/interface.go index fcbc80ef1..844491f48 100644 --- a/gateway/gateway-controller/pkg/storage/interface.go +++ b/gateway/gateway-controller/pkg/storage/interface.go @@ -19,6 +19,8 @@ package storage import ( + "database/sql" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" ) @@ -227,4 +229,7 @@ type Storage interface { // Should be called during graceful shutdown. // Implementations should ensure all pending writes are flushed. Close() error + + // GetDB exposes the underlying *sql.DB for advanced operations. + GetDB() *sql.DB } diff --git a/gateway/gateway-controller/pkg/storage/sqlite.go b/gateway/gateway-controller/pkg/storage/sqlite.go index df4a7168e..94998e873 100644 --- a/gateway/gateway-controller/pkg/storage/sqlite.go +++ b/gateway/gateway-controller/pkg/storage/sqlite.go @@ -214,6 +214,50 @@ func (s *SQLiteStorage) initSchema() error { version = 5 } + if version == 5 { + // Add EventHub tables for multi-replica synchronization + s.logger.Info("Migrating to EventHub schema (version 6)") + + // Create organization_states table + if _, err := s.db.Exec(`CREATE TABLE organization_states ( + organization TEXT PRIMARY KEY, + version_id TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + );`); err != nil { + return fmt.Errorf("failed to create organization_states table: %w", err) + } + + if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_organization_states_updated ON organization_states(updated_at);`); err != nil { + return fmt.Errorf("failed to create organization_states index: %w", err) + } + + // Create unified events table + if _, err := s.db.Exec(`CREATE TABLE events ( + organization_id TEXT NOT NULL, + processed_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + originated_timestamp TIMESTAMP NOT NULL, + event_type TEXT NOT NULL, + action TEXT NOT NULL CHECK(action IN ('CREATE', 'UPDATE', 'DELETE')), + entity_id TEXT NOT NULL, + correlation_id TEXT NOT NULL, + event_data TEXT NOT NULL, + PRIMARY KEY (correlation_id) + );`); err != nil { + return fmt.Errorf("failed to create events table: %w", err) + } + + if _, err := s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_events_org_time ON events(organization_id, processed_timestamp);`); err != nil { + return fmt.Errorf("failed to create events organization-time index: %w", err) + } + + if _, err := s.db.Exec("PRAGMA user_version = 6"); err != nil { + return fmt.Errorf("failed to set schema version to 6: %w", err) + } + + s.logger.Info("Schema migrated to version 6 (EventHub tables)") + version = 6 + } + s.logger.Info("Database schema up to date", zap.Int("version", version)) } @@ -1591,3 +1635,9 @@ func LoadAPIKeysFromDatabase(storage Storage, configStore *ConfigStore, apiKeySt return nil } + +// GetDB returns the underlying *sql.DB instance +// This is used for eventhub initialization +func (s *SQLiteStorage) GetDB() *sql.DB { + return s.db +} diff --git a/gateway/gateway-controller/pkg/utils/api_deployment.go b/gateway/gateway-controller/pkg/utils/api_deployment.go index 46af53f27..09ab2fb02 100644 --- a/gateway/gateway-controller/pkg/utils/api_deployment.go +++ b/gateway/gateway-controller/pkg/utils/api_deployment.go @@ -31,9 +31,12 @@ import ( "github.com/google/uuid" api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/eventhub" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" + policyenginev1 "github.com/wso2/api-platform/sdk/gateway/policyengine/v1" "go.uber.org/zap" ) @@ -52,15 +55,91 @@ type APIDeploymentResult struct { IsUpdate bool } +// APIUpdateParams contains parameters for API update operations +type APIUpdateParams struct { + Handle string // API handle from URL path + Data []byte // Raw configuration data (YAML/JSON) + ContentType string // Content type for parsing + CorrelationID string // Correlation ID for tracking + Logger *zap.Logger // Logger instance +} + +// Custom error types for HTTP status code mapping + +// NotFoundError indicates the requested resource was not found +type NotFoundError struct { + Handle string +} + +func (e *NotFoundError) Error() string { + return fmt.Sprintf("API configuration with handle '%s' not found", e.Handle) +} + +// HandleMismatchError indicates the handle in the URL doesn't match the YAML metadata +type HandleMismatchError struct { + PathHandle string + YamlHandle string +} + +func (e *HandleMismatchError) Error() string { + return fmt.Sprintf("Handle mismatch: path has '%s' but YAML metadata.name has '%s'", e.PathHandle, e.YamlHandle) +} + +// APIValidationError wraps validation errors for API configurations +type APIValidationError struct { + Errors []config.ValidationError +} + +func (e *APIValidationError) Error() string { + return fmt.Sprintf("configuration validation failed with %d errors", len(e.Errors)) +} + +// TopicOperationError indicates WebSub topic operations failed +type TopicOperationError struct { + Message string +} + +func (e *TopicOperationError) Error() string { + return e.Message +} + +// ConflictError indicates a resource conflict (e.g., handle already exists) +type ConflictError struct { + Message string +} + +func (e *ConflictError) Error() string { + return e.Message +} + +// ParseError indicates configuration parsing failed +type ParseError struct { + Message string +} + +func (e *ParseError) Error() string { + return e.Message +} + +// DatabaseUnavailableError indicates the database is not available +type DatabaseUnavailableError struct{} + +func (e *DatabaseUnavailableError) Error() string { + return "Database storage not available" +} + // APIDeploymentService provides utilities for API configuration deployment type APIDeploymentService struct { - store *storage.ConfigStore - db storage.Storage - snapshotManager *xds.SnapshotManager - parser *config.Parser - validator config.Validator - routerConfig *config.RouterConfig - httpClient *http.Client + store *storage.ConfigStore + db storage.Storage + snapshotManager *xds.SnapshotManager + policyManager *policyxds.PolicyManager + parser *config.Parser + validator config.Validator + routerConfig *config.RouterConfig + httpClient *http.Client + eventHub eventhub.EventHub + enableReplicaSync bool } // NewAPIDeploymentService creates a new API deployment service @@ -68,17 +147,23 @@ func NewAPIDeploymentService( store *storage.ConfigStore, db storage.Storage, snapshotManager *xds.SnapshotManager, + policyManager *policyxds.PolicyManager, validator config.Validator, routerConfig *config.RouterConfig, + eventHub eventhub.EventHub, + enableReplicaSync bool, ) *APIDeploymentService { return &APIDeploymentService{ - store: store, - db: db, - snapshotManager: snapshotManager, - parser: config.NewParser(), - validator: validator, - httpClient: &http.Client{Timeout: 10 * time.Second}, - routerConfig: routerConfig, + store: store, + db: db, + snapshotManager: snapshotManager, + policyManager: policyManager, + parser: config.NewParser(), + validator: validator, + httpClient: &http.Client{Timeout: 10 * time.Second}, + routerConfig: routerConfig, + eventHub: eventHub, + enableReplicaSync: enableReplicaSync, } } @@ -164,86 +249,10 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams DeployedVersion: 0, } + // Handle AsyncWebSub topic lifecycle management if apiConfig.Kind == api.Asyncwebsub { - topicsToRegister, topicsToUnregister := s.GetTopicsForUpdate(*storedCfg) - // TODO: Pre configure the dynamic forward proxy rules for event gw - // This was communication bridge will be created on the gw startup - // Can perform internal communication with websub hub without relying on the dynamic rules - // Execute topic operations with wait group and errors tracking - var wg2 sync.WaitGroup - var regErrs int32 - var deregErrs int32 - - if len(topicsToRegister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - params.Logger.Info("Starting topic registration", zap.Int("total_topics", len(list)), zap.String("api_id", apiID)) - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, params.Logger); err != nil { - params.Logger.Error("Failed to register topic with WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", apiID)) - atomic.AddInt32(®Errs, 1) - return - } else { - params.Logger.Info("Successfully registered topic with WebSubHub", - zap.String("topic", topic), - zap.String("api_id", apiID)) - } - }(topic) - } - childWg.Wait() - }(topicsToRegister) - } - - if len(topicsToUnregister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - var childWg sync.WaitGroup - params.Logger.Info("Starting topic deregistration", zap.Int("total_topics", len(list)), zap.String("api_id", apiID)) - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, params.Logger); err != nil { - params.Logger.Error("Failed to deregister topic from WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", apiID)) - atomic.AddInt32(&deregErrs, 1) - return - } else { - params.Logger.Info("Successfully deregistered topic from WebSubHub", - zap.String("topic", topic), - zap.String("api_id", apiID)) - } - }(topic) - } - childWg.Wait() - }(topicsToUnregister) - } - - wg2.Wait() - params.Logger.Info("Topic lifecycle operations completed", - zap.String("api_id", apiID), - zap.Int("registered", len(topicsToRegister)), - zap.Int("deregistered", len(topicsToUnregister)), - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - - // Check if topic operations failed and return error - if regErrs > 0 || deregErrs > 0 { - params.Logger.Error("Topic lifecycle operations failed", - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - return nil, fmt.Errorf("failed to complete topic operations: %d registration error(s), %d deregistration error(s)", regErrs, deregErrs) + if err := s.handleTopicLifecycle(storedCfg, params.Logger); err != nil { + return nil, err } } @@ -269,17 +278,42 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams } // Update xDS snapshot asynchronously - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + if s.enableReplicaSync { + // Multi-replica mode: Publish event to eventhub + if s.eventHub != nil { + // Determine action based on whether it's an update or create + action := "CREATE" + if isUpdate { + action = "UPDATE" + } - if err := s.snapshotManager.UpdateSnapshot(ctx, params.CorrelationID); err != nil { - params.Logger.Error("Failed to update xDS snapshot", - zap.Error(err), - zap.String("api_id", apiID), - zap.String("correlation_id", params.CorrelationID)) + // Use default organization ID (can be made configurable in future) + organizationID := "default" + + // Publish event with empty payload as per requirements + ctx := context.Background() + if err := s.eventHub.PublishEvent(ctx, organizationID, eventhub.EventTypeAPI, action, apiID, params.CorrelationID, []byte{}); err != nil { + params.Logger.Error("Failed to publish event to eventhub", + zap.Error(err), + zap.String("api_id", apiID), + zap.String("action", action), + zap.String("organization_id", string(organizationID)), + zap.String("correlation_id", params.CorrelationID)) + } else { + params.Logger.Info("Event published to eventhub", + zap.String("api_id", apiID), + zap.String("action", action), + zap.String("organization_id", string(organizationID)), + zap.String("correlation_id", params.CorrelationID)) + } + } else { + params.Logger.Warn("Multi-replica mode enabled but eventhub is not initialized", + zap.String("api_id", apiID)) } - }() + } else { + s.triggerXDSSnapshotUpdate(apiID, params.CorrelationID, params.Logger) + s.updatePolicyConfiguration(storedCfg, params.Logger) + } return &APIDeploymentResult{ StoredConfig: storedCfg, @@ -287,6 +321,188 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams }, nil } +// UpdateAPIConfiguration handles the complete API configuration update process +func (s *APIDeploymentService) UpdateAPIConfiguration(params APIUpdateParams) (*APIDeploymentResult, error) { + handle := params.Handle + + // Parse configuration + var apiConfig api.APIConfiguration + err := s.parser.Parse(params.Data, params.ContentType, &apiConfig) + if err != nil { + params.Logger.Error("Failed to parse configuration", zap.Error(err)) + return nil, &ParseError{Message: "Failed to parse configuration"} + } + + // Validate that the handle in the YAML matches the path parameter + if apiConfig.Metadata.Name != "" { + if apiConfig.Metadata.Name != handle { + params.Logger.Warn("Handle mismatch between path and YAML metadata", + zap.String("path_handle", handle), + zap.String("yaml_handle", apiConfig.Metadata.Name)) + return nil, &HandleMismatchError{ + PathHandle: handle, + YamlHandle: apiConfig.Metadata.Name, + } + } + } + + // Validate configuration + validationErrors := s.validator.Validate(&apiConfig) + if len(validationErrors) > 0 { + params.Logger.Warn("Configuration validation failed", + zap.String("handle", handle), + zap.Int("num_errors", len(validationErrors))) + return nil, &APIValidationError{Errors: validationErrors} + } + + if s.db == nil { + return nil, &DatabaseUnavailableError{} + } + + // Check if config exists + existing, err := s.db.GetConfigByHandle(handle) + if err != nil { + params.Logger.Warn("API configuration not found", zap.String("handle", handle)) + return nil, &NotFoundError{Handle: handle} + } + + // Update stored configuration + now := time.Now() + existing.Configuration = apiConfig + existing.Status = models.StatusPending + existing.UpdatedAt = now + existing.DeployedAt = nil + existing.DeployedVersion = 0 + + // Handle AsyncWebSub topic lifecycle management + if apiConfig.Kind == api.Asyncwebsub { + if err := s.handleTopicLifecycle(existing, params.Logger); err != nil { + return nil, err + } + } + + // Update database first (only if persistent mode) + if s.db != nil { + if err := s.db.UpdateConfig(existing); err != nil { + params.Logger.Error("Failed to update config in database", zap.Error(err)) + return nil, fmt.Errorf("failed to persist configuration update: %w", err) + } + } + + // Update in-memory store + if err := s.store.Update(existing); err != nil { + if storage.IsConflictError(err) { + params.Logger.Info("API configuration handle already exists", + zap.String("id", existing.ID), + zap.String("handle", handle)) + return nil, &ConflictError{Message: err.Error()} + } + params.Logger.Error("Failed to update config in memory store", zap.Error(err)) + return nil, fmt.Errorf("failed to update configuration in memory store: %w", err) + } + + params.Logger.Info("API configuration updated", + zap.String("id", existing.ID), + zap.String("handle", handle)) + + // Update xDS snapshot asynchronously + s.triggerXDSSnapshotUpdate(existing.ID, params.CorrelationID, params.Logger) + + // Update derived policy configuration + s.updatePolicyConfiguration(existing, params.Logger) + + return &APIDeploymentResult{ + StoredConfig: existing, + IsUpdate: true, + }, nil +} + +// handleTopicLifecycle manages WebSub topic registration/deregistration for AsyncWebSub APIs +func (s *APIDeploymentService) handleTopicLifecycle(storedCfg *models.StoredConfig, logger *zap.Logger) error { + topicsToRegister, topicsToUnregister := s.GetTopicsForUpdate(*storedCfg) + + var wg sync.WaitGroup + var regErrs int32 + var deregErrs int32 + + // Register new topics + if len(topicsToRegister) > 0 { + wg.Add(1) + go func(list []string) { + defer wg.Done() + logger.Info("Starting topic registration", + zap.Int("total_topics", len(list)), + zap.String("api_id", storedCfg.ID)) + var childWg sync.WaitGroup + for _, topic := range list { + childWg.Add(1) + go func(topic string) { + defer childWg.Done() + if err := s.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, logger); err != nil { + logger.Error("Failed to register topic with WebSubHub", + zap.Error(err), + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + atomic.AddInt32(®Errs, 1) + } else { + logger.Info("Successfully registered topic with WebSubHub", + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + } + }(topic) + } + childWg.Wait() + }(topicsToRegister) + } + + // Deregister removed topics + if len(topicsToUnregister) > 0 { + wg.Add(1) + go func(list []string) { + defer wg.Done() + logger.Info("Starting topic deregistration", + zap.Int("total_topics", len(list)), + zap.String("api_id", storedCfg.ID)) + var childWg sync.WaitGroup + for _, topic := range list { + childWg.Add(1) + go func(topic string) { + defer childWg.Done() + if err := s.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, logger); err != nil { + logger.Error("Failed to deregister topic from WebSubHub", + zap.Error(err), + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + atomic.AddInt32(&deregErrs, 1) + } else { + logger.Info("Successfully deregistered topic from WebSubHub", + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + } + }(topic) + } + childWg.Wait() + }(topicsToUnregister) + } + + wg.Wait() + + logger.Info("Topic lifecycle operations completed", + zap.String("api_id", storedCfg.ID), + zap.Int("registered", len(topicsToRegister)), + zap.Int("deregistered", len(topicsToUnregister)), + zap.Int("register_errors", int(regErrs)), + zap.Int("deregister_errors", int(deregErrs))) + + if regErrs > 0 || deregErrs > 0 { + return &TopicOperationError{ + Message: fmt.Sprintf("Topic lifecycle operations failed: %d registration error(s), %d deregistration error(s)", regErrs, deregErrs), + } + } + + return nil +} + func (s *APIDeploymentService) GetTopicsForUpdate(apiConfig models.StoredConfig) ([]string, []string) { topics := s.store.TopicManager.GetAllByConfig(apiConfig.ID) topicsToRegister := []string{} @@ -351,22 +567,26 @@ func (s *APIDeploymentService) saveOrUpdateConfig(storedCfg *models.StoredConfig } // Try to add to in-memory store - if err := s.store.Add(storedCfg); err != nil { - // Check if it's a conflict (API already exists) - if storage.IsConflictError(err) { - logger.Info("API configuration already exists in memory, updating instead", - zap.String("api_id", storedCfg.ID), - zap.String("displayName", storedCfg.GetDisplayName()), - zap.String("version", storedCfg.GetVersion())) + // Multi-replica mode: in-memory store will be updated via EventListener + // after event processing to ensure consistency across all replicas + if !s.enableReplicaSync { + if err := s.store.Add(storedCfg); err != nil { + // Check if it's a conflict (API already exists) + if storage.IsConflictError(err) { + logger.Info("API configuration already exists in memory, updating instead", + zap.String("api_id", storedCfg.ID), + zap.String("displayName", storedCfg.GetDisplayName()), + zap.String("version", storedCfg.GetVersion())) - // Try to update instead - return s.updateExistingConfig(storedCfg, logger) - } else { - // Rollback database write (only if persistent mode) - if s.db != nil { - _ = s.db.DeleteConfig(storedCfg.ID) + // Try to update instead + return s.updateExistingConfig(storedCfg, logger) + } else { + // Rollback database write (only if persistent mode) + if s.db != nil { + _ = s.db.DeleteConfig(storedCfg.ID) + } + return false, fmt.Errorf("failed to add config to memory store: %w", err) } - return false, fmt.Errorf("failed to add config to memory store: %w", err) } } @@ -496,3 +716,216 @@ func (s *APIDeploymentService) sendTopicRequestToHub(httpClient *http.Client, to func generateUUID() string { return uuid.New().String() } + +// BuildStoredPolicyFromAPI builds a StoredPolicyConfig from an API configuration. +// This builds policy chains for each route based on API-level and operation-level policies. +// RouteKey uses the fully qualified route path (context + operation path) and must match +// the route name format used by the xDS translator for consistency. +func (s *APIDeploymentService) BuildStoredPolicyFromAPI(cfg *models.StoredConfig) *models.StoredPolicyConfig { + apiCfg := &cfg.Configuration + + // Collect API-level policies + apiPolicies := make(map[string]policyenginev1.PolicyInstance) // name -> policy + if cfg.GetPolicies() != nil { + for _, p := range *cfg.GetPolicies() { + apiPolicies[p.Name] = convertAPIPolicy(p) + } + } + + routes := make([]policyenginev1.PolicyChain, 0) + switch apiCfg.Kind { + case api.Asyncwebsub: + // Build routes with merged policies + apiData, err := apiCfg.Spec.AsWebhookAPIData() + if err != nil { + return nil + } + for _, ch := range apiData.Channels { + var finalPolicies []policyenginev1.PolicyInstance + + if ch.Policies != nil && len(*ch.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*ch.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *ch.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + routeKey := xds.GenerateRouteName("POST", apiData.Context, apiData.Version, ch.Path, s.routerConfig.GatewayHost) + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: routeKey, + Policies: finalPolicies, + }) + } + case api.RestApi: + // Build routes with merged policies + apiData, err := apiCfg.Spec.AsAPIConfigData() + if err != nil { + return nil + } + for _, op := range apiData.Operations { + var finalPolicies []policyenginev1.PolicyInstance + + if op.Policies != nil && len(*op.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*op.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *op.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + // Determine effective vhosts (fallback to global router defaults when not provided) + effectiveMainVHost := s.routerConfig.VHosts.Main.Default + effectiveSandboxVHost := s.routerConfig.VHosts.Sandbox.Default + if apiData.Vhosts != nil { + if strings.TrimSpace(apiData.Vhosts.Main) != "" { + effectiveMainVHost = apiData.Vhosts.Main + } + if apiData.Vhosts.Sandbox != nil && strings.TrimSpace(*apiData.Vhosts.Sandbox) != "" { + effectiveSandboxVHost = *apiData.Vhosts.Sandbox + } + } + + vhosts := []string{effectiveMainVHost} + if apiData.Upstream.Sandbox != nil && apiData.Upstream.Sandbox.Url != nil && + strings.TrimSpace(*apiData.Upstream.Sandbox.Url) != "" { + vhosts = append(vhosts, effectiveSandboxVHost) + } + + for _, vhost := range vhosts { + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: xds.GenerateRouteName(string(op.Method), apiData.Context, apiData.Version, op.Path, vhost), + Policies: finalPolicies, + }) + } + } + } + + // If there are no policies at all, return nil (skip creation) + policyCount := 0 + for _, r := range routes { + policyCount += len(r.Policies) + } + if policyCount == 0 { + return nil + } + + now := time.Now().Unix() + stored := &models.StoredPolicyConfig{ + ID: cfg.ID + "-policies", + Configuration: policyenginev1.Configuration{ + Routes: routes, + Metadata: policyenginev1.Metadata{ + CreatedAt: now, + UpdatedAt: now, + ResourceVersion: 0, + APIName: cfg.GetDisplayName(), + Version: cfg.GetVersion(), + Context: cfg.GetContext(), + }, + }, + Version: 0, + } + return stored +} + +// convertAPIPolicy converts generated api.Policy to policyenginev1.PolicyInstance +func convertAPIPolicy(p api.Policy) policyenginev1.PolicyInstance { + paramsMap := make(map[string]interface{}) + if p.Params != nil { + for k, v := range *p.Params { + paramsMap[k] = v + } + } + return policyenginev1.PolicyInstance{ + Name: p.Name, + Version: p.Version, + Enabled: true, // Default to enabled + ExecutionCondition: p.ExecutionCondition, + Parameters: paramsMap, + } +} + +// TODO: (VirajSalaka) Fix working in multi-replica mode +// updatePolicyConfiguration builds and updates/removes derived policy config for an API +func (s *APIDeploymentService) updatePolicyConfiguration(storedCfg *models.StoredConfig, logger *zap.Logger) { + if s.policyManager == nil { + return + } + + storedPolicy := s.BuildStoredPolicyFromAPI(storedCfg) + if storedPolicy != nil { + if err := s.policyManager.AddPolicy(storedPolicy); err != nil { + logger.Error("Failed to update derived policy configuration", zap.Error(err)) + } else { + logger.Info("Derived policy configuration updated", + zap.String("policy_id", storedPolicy.ID), + zap.Int("route_count", len(storedPolicy.Configuration.Routes))) + } + } else { + // API no longer has policies, remove the existing policy configuration + policyID := storedCfg.ID + "-policies" + if err := s.policyManager.RemovePolicy(policyID); err != nil { + // Log at debug level since policy may not exist if API never had policies + logger.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) + } else { + logger.Info("Derived policy configuration removed (API no longer has policies)", + zap.String("policy_id", policyID)) + } + } +} + +// triggerXDSSnapshotUpdate asynchronously updates xDS snapshot +func (s *APIDeploymentService) triggerXDSSnapshotUpdate(apiID, correlationID string, logger *zap.Logger) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := s.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { + logger.Error("Failed to update xDS snapshot", + zap.Error(err), + zap.String("api_id", apiID), + zap.String("correlation_id", correlationID)) + } + }() +} diff --git a/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go b/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go index 2297d9fa1..03875e3a9 100644 --- a/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go +++ b/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go @@ -18,7 +18,7 @@ func TestDeployAPIConfigurationWebSubKindTopicRegistration(t *testing.T) { db := &storage.SQLiteStorage{} snapshotManager := &xds.SnapshotManager{} validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, db, snapshotManager, validator, nil) + service := NewAPIDeploymentService(configStore, db, snapshotManager, nil, validator, nil, nil, false) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub @@ -63,7 +63,7 @@ spec: func TestDeployAPIConfigurationWebSubKindRevisionDeployment(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil, nil, false) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub @@ -145,7 +145,7 @@ spec: func TestTopicRegistrationForConcurrentAPIConfigs(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil, nil, false) // Two different API YAMLs yamlA := `kind: async/websub @@ -249,7 +249,7 @@ spec: func TestTopicDeregistrationOnConfigDeletion(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil, nil, false) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub diff --git a/gateway/gateway-controller/tests/integration/schema_test.go b/gateway/gateway-controller/tests/integration/schema_test.go index f964e61d1..237505f87 100644 --- a/gateway/gateway-controller/tests/integration/schema_test.go +++ b/gateway/gateway-controller/tests/integration/schema_test.go @@ -95,7 +95,7 @@ func TestSchemaInitialization(t *testing.T) { var version int err := rawDB.QueryRow("PRAGMA user_version").Scan(&version) assert.NoError(t, err) - assert.Equal(t, 5, version, "Schema version should be 5") + assert.Equal(t, 6, version, "Schema version should be 6") }) // Verify deployments table exists