diff --git a/.gitmodules b/.gitmodules index 0e2445c7d..3f69f7394 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,4 @@ [submodule "providers/flagd/flagd-testbed"] path = providers/flagd/flagd-testbed url = https://github.com/open-feature/flagd-testbed.git + branch = v2.11.1 diff --git a/providers/flagd/e2e/inprocess_test.go b/providers/flagd/e2e/inprocess_test.go index fe3d3265e..d4767e4e3 100644 --- a/providers/flagd/e2e/inprocess_test.go +++ b/providers/flagd/e2e/inprocess_test.go @@ -25,8 +25,8 @@ func TestInProcessProviderE2E(t *testing.T) { "./", } - // Run tests with in-process specific tags - exclude connection/event issues we won't tackle - tags := "@in-process && ~@unixsocket && ~@metadata && ~@grace && ~@customCert && ~@reconnect && ~@contextEnrichment && ~@sync-payload && ~@events" + // Run tests with in-process specific tags + tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload" if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil { t.Fatalf("Gherkin tests failed: %v", err) diff --git a/providers/flagd/pkg/configuration.go b/providers/flagd/pkg/configuration.go index 4303d4165..ca758d38d 100644 --- a/providers/flagd/pkg/configuration.go +++ b/providers/flagd/pkg/configuration.go @@ -3,15 +3,14 @@ package flagd import ( "errors" "fmt" - "os" - "strconv" - "strings" - "github.com/go-logr/logr" "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache" "github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger" "google.golang.org/grpc" + "os" + "strconv" + "strings" ) type ResolverType string @@ -26,6 +25,7 @@ const ( defaultCache = cache.LRUValue defaultHost = "localhost" defaultResolver = rpc + defaultGracePeriod = 5 rpc ResolverType = "rpc" inProcess ResolverType = "in-process" @@ -44,6 +44,7 @@ const ( flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR" flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI" + flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD" ) type ProviderConfiguration struct { @@ -64,6 +65,7 @@ type ProviderConfiguration struct { CustomSyncProvider sync.ISync CustomSyncProviderUri string GrpcDialOptionsOverride []grpc.DialOption + RetryGracePeriod int log logr.Logger } @@ -77,6 +79,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration { MaxCacheSize: defaultMaxCacheSize, Resolver: defaultResolver, Tls: defaultTLS, + RetryGracePeriod: defaultGracePeriod, } p.updateFromEnvVar() @@ -224,6 +227,14 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() { if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" { cfg.TargetUri = targetUri } + if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" { + if seconds, err := strconv.Atoi(gracePeriod); err == nil { + cfg.RetryGracePeriod = seconds + } else { + // Handle parsing error + cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod)) + } + } } @@ -397,3 +408,10 @@ func WithGrpcDialOptionsOverride(grpcDialOptionsOverride []grpc.DialOption) Prov p.GrpcDialOptionsOverride = grpcDialOptionsOverride } } + +// WithRetryGracePeriod allows to set a time window for the transition from stale to error state +func WithRetryGracePeriod(gracePeriod int) ProviderOption { + return func(p *ProviderConfiguration) { + p.RetryGracePeriod = gracePeriod + } +} diff --git a/providers/flagd/pkg/provider.go b/providers/flagd/pkg/provider.go index d7078de94..1742f2f15 100644 --- a/providers/flagd/pkg/provider.go +++ b/providers/flagd/pkg/provider.go @@ -73,6 +73,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) { CustomSyncProvider: provider.providerConfiguration.CustomSyncProvider, CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri, GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride, + RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod, }) default: service = process.NewInProcessService(process.Configuration{ diff --git a/providers/flagd/pkg/service/in_process/grpc_sync.go b/providers/flagd/pkg/service/in_process/grpc_sync.go new file mode 100644 index 000000000..9b6b93caa --- /dev/null +++ b/providers/flagd/pkg/service/in_process/grpc_sync.go @@ -0,0 +1,432 @@ +package process + +import ( + "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1" + "context" + "fmt" + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" + of "github.com/open-feature/go-sdk/openfeature" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/keepalive" + msync "sync" + "time" +) + +const ( + // Default timeouts and retry intervals + defaultKeepaliveTime = 30 * time.Second + defaultKeepaliveTimeout = 5 * time.Second + + retryPolicy = `{ + "methodConfig": [ + { + "name": [ + { + "service": "flagd.sync.v1.FlagSyncService" + } + ], + "retryPolicy": { + "MaxAttempts": 3, + "InitialBackoff": "1s", + "MaxBackoff": "5s", + "BackoffMultiplier": 2.0, + "RetryableStatusCodes": [ + "CANCELLED", + "UNKNOWN", + "INVALID_ARGUMENT", + "NOT_FOUND", + "ALREADY_EXISTS", + "PERMISSION_DENIED", + "RESOURCE_EXHAUSTED", + "FAILED_PRECONDITION", + "ABORTED", + "OUT_OF_RANGE", + "UNIMPLEMENTED", + "INTERNAL", + "UNAVAILABLE", + "DATA_LOSS", + "UNAUTHENTICATED" + ] + } + } + ] + }` +) + +// Type aliases for interfaces required by this component - needed for mock generation with gomock +type FlagSyncServiceClient interface { + syncv1grpc.FlagSyncServiceClient +} + +type FlagSyncServiceClientResponse interface { + syncv1grpc.FlagSyncService_SyncFlagsClient +} + +// Sync implements gRPC-based flag synchronization with improved context cancellation and error handling +type Sync struct { + // Configuration + GrpcDialOptionsOverride []grpc.DialOption + CertPath string + CredentialBuilder grpccredential.Builder + Logger *logger.Logger + ProviderID string + Secure bool + Selector string + URI string + MaxMsgSize int + + // Runtime state + client FlagSyncServiceClient + connection *grpc.ClientConn + ready bool + events chan SyncEvent + shutdownComplete chan struct{} + shutdownOnce msync.Once + initializer msync.Once +} + +// Init initializes the gRPC connection and starts background monitoring +func (g *Sync) Init(ctx context.Context) error { + g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI)) + + // Initialize channels + g.shutdownComplete = make(chan struct{}) + g.events = make(chan SyncEvent, 10) // Buffered to prevent blocking + + // Establish gRPC connection + conn, err := g.createConnection() + if err != nil { + return fmt.Errorf("failed to create gRPC connection: %w", err) + } + + g.connection = conn + g.client = syncv1grpc.NewFlagSyncServiceClient(conn) + + // Start connection state monitoring in background + go g.monitorConnectionState(ctx) + + g.Logger.Info(fmt.Sprintf("gRPC client initialized successfully for %s", g.URI)) + return nil +} + +// createConnection creates and configures the gRPC connection +func (g *Sync) createConnection() (*grpc.ClientConn, error) { + if len(g.GrpcDialOptionsOverride) > 0 { + g.Logger.Debug("using provided gRPC DialOptions override") + return grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...) + } + + // Build standard dial options + dialOptions, err := g.buildDialOptions() + if err != nil { + return nil, fmt.Errorf("failed to build dial options: %w", err) + } + + return grpc.NewClient(g.URI, dialOptions...) +} + +// buildDialOptions constructs the standard gRPC dial options +func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) { + var dialOptions []grpc.DialOption + + // Transport credentials + tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath) + if err != nil { + return nil, fmt.Errorf("failed to build transport credentials: %w", err) + } + dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials)) + + // Call options for message size + if g.MaxMsgSize > 0 { + callOptions := []grpc.CallOption{grpc.MaxCallRecvMsgSize(g.MaxMsgSize)} + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...)) + g.Logger.Info(fmt.Sprintf("setting max receive message size to %d bytes", g.MaxMsgSize)) + } + + // Keepalive settings for connection health + keepaliveParams := keepalive.ClientParameters{ + Time: defaultKeepaliveTime, + Timeout: defaultKeepaliveTimeout, + PermitWithoutStream: true, + } + dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) + + dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(retryPolicy)) + + return dialOptions, nil +} + +// ReSync performs a one-time fetch of all flags +func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { + g.Logger.Debug("performing ReSync - fetching all flags") + + res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ + ProviderId: g.ProviderID, + Selector: g.Selector, + }) + if err != nil { + return fmt.Errorf("failed to fetch all flags: %w", err) + } + + select { + case dataSync <- sync.DataSync{ + FlagData: res.GetFlagConfiguration(), + Source: g.URI, + }: + g.Logger.Debug("ReSync completed successfully") + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// IsReady returns whether the sync is ready to serve requests +func (g *Sync) IsReady() bool { + return g.ready +} + +// Sync starts the continuous flag synchronization process with improved context handling +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + g.Logger.Info("starting continuous flag synchronization") + + // Ensure shutdown completion is signaled when THIS method exits + defer g.markShutdownComplete() + + for { + // Check for cancellation before each iteration + select { + case <-ctx.Done(): + g.Logger.Info("sync stopped due to context cancellation") + return ctx.Err() + default: + // Continue with sync logic + } + + // Attempt to create sync stream + if err := g.performSyncCycle(ctx, dataSync); err != nil { + if ctx.Err() != nil { + g.Logger.Info("sync cycle failed due to context cancellation") + return ctx.Err() + } + + g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err)) + g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) + + if ctx.Err() != nil { + return ctx.Err() + } + } + } +} + +// performSyncCycle handles a single sync cycle (create stream, handle messages, cleanup) +func (g *Sync) performSyncCycle(ctx context.Context, dataSync chan<- sync.DataSync) error { + g.Logger.Debug("creating new sync stream") + + // Create sync stream with wait-for-ready to handle connection issues gracefully + stream, err := g.client.SyncFlags( + ctx, + &v1.SyncFlagsRequest{ + ProviderId: g.ProviderID, + Selector: g.Selector, + }, + grpc.WaitForReady(true), + ) + if err != nil { + return fmt.Errorf("failed to create sync stream: %w", err) + } + + g.Logger.Info("sync stream established, starting to receive flags") + + // Handle the stream with proper context cancellation + return g.handleFlagSync(ctx, stream, dataSync) +} + +// handleFlagSync processes messages from the sync stream with proper context handling +func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { + // Mark as ready on first successful stream + g.initializer.Do(func() { + g.ready = true + g.Logger.Info("sync service is now ready") + }) + + // Create channels for stream communication + streamChan := make(chan *v1.SyncFlagsResponse, 1) + errChan := make(chan error, 1) + + // Start goroutine to receive from stream + go func() { + defer close(streamChan) + defer close(errChan) + + for { + data, err := stream.Recv() + if err != nil { + select { + case errChan <- err: + case <-ctx.Done(): + } + return + } + + select { + case streamChan <- data: + case <-ctx.Done(): + return + } + } + }() + + // Main message handling loop with proper cancellation support + for { + select { + case data, ok := <-streamChan: + if !ok { + return fmt.Errorf("stream channel closed") + } + + if err := g.processFlagData(ctx, data, dataSync); err != nil { + return err + } + + case err := <-errChan: + return fmt.Errorf("stream error: %w", err) + + case <-ctx.Done(): + g.Logger.Info("handleFlagSync stopped due to context cancellation") + return ctx.Err() + } + } +} + +// processFlagData handles individual flag configuration updates +func (g *Sync) processFlagData(ctx context.Context, data *v1.SyncFlagsResponse, dataSync chan<- sync.DataSync) error { + syncData := sync.DataSync{ + FlagData: data.FlagConfiguration, + SyncContext: data.SyncContext, + Source: g.URI, + Selector: g.Selector, + } + + select { + case dataSync <- syncData: + g.Logger.Debug("successfully processed flag configuration update") + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// monitorConnectionState monitors gRPC connection state changes with improved cancellation handling +func (g *Sync) monitorConnectionState(ctx context.Context) { + if g.connection == nil { + g.Logger.Warn("no connection available for state monitoring") + return + } + + currentState := g.connection.GetState() + g.Logger.Debug(fmt.Sprintf("starting connection state monitoring, initial state: %s", currentState)) + + for { + // Wait for state change with context support + if !g.connection.WaitForStateChange(ctx, currentState) { + g.Logger.Debug("connection state monitoring stopped due to context cancellation") + return + } + + // Check for cancellation + select { + case <-ctx.Done(): + g.Logger.Debug("connection state monitoring stopped due to context cancellation") + return + default: + } + + newState := g.connection.GetState() + g.Logger.Debug(fmt.Sprintf("connection state changed: %s -> %s", currentState, newState)) + + // Handle state-specific logic + g.handleConnectionState(ctx, newState) + currentState = newState + } +} + +// handleConnectionState processes specific connection state changes +func (g *Sync) handleConnectionState(ctx context.Context, state connectivity.State) { + switch state { + case connectivity.TransientFailure: + g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI)) + g.sendEvent(ctx, SyncEvent{event: of.ProviderError}) + + case connectivity.Shutdown: + g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI)) + + case connectivity.Ready: + g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) + + case connectivity.Idle: + g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) + + case connectivity.Connecting: + g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI)) + } +} + +// sendEvent safely sends events with cancellation support +func (g *Sync) sendEvent(ctx context.Context, event SyncEvent) { + select { + case g.events <- event: + // Event sent successfully + case <-ctx.Done(): + // Context cancelled, don't block + default: + // Channel full, log warning but don't block + g.Logger.Warn("event channel full, dropping event") + } +} + +// markShutdownComplete signals that shutdown has completed +func (g *Sync) markShutdownComplete() { + g.shutdownOnce.Do(func() { + close(g.shutdownComplete) + g.Logger.Debug("shutdown completion signaled") + }) +} + +// Events returns the channel for sync events +func (g *Sync) Events() chan SyncEvent { + return g.events +} + +// Shutdown gracefully shuts down the sync service +func (g *Sync) Shutdown() error { + g.Logger.Info("shutting down gRPC sync service") + + // Wait for shutdown completion with timeout + select { + case <-g.shutdownComplete: + g.Logger.Info("sync operations completed gracefully") + case <-time.After(5 * time.Second): + g.Logger.Warn("shutdown timeout exceeded - forcing close") + } + + // Close events channel + if g.events != nil { + close(g.events) + } + + // Close gRPC connection + if g.connection != nil { + if err := g.connection.Close(); err != nil { + g.Logger.Error(fmt.Sprintf("error closing gRPC connection: %v", err)) + return err + } + } + + g.Logger.Info("gRPC sync service shutdown completed successfully") + return nil +} diff --git a/providers/flagd/pkg/service/in_process/service.go b/providers/flagd/pkg/service/in_process/service.go index 468b1c64b..6cb6d0e1b 100644 --- a/providers/flagd/pkg/service/in_process/service.go +++ b/providers/flagd/pkg/service/in_process/service.go @@ -3,9 +3,9 @@ package process import ( "context" "fmt" - "regexp" - parallel "sync" + "sync" + "time" "go.uber.org/zap" googlegrpc "google.golang.org/grpc" @@ -14,7 +14,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/model" "github.com/open-feature/flagd/core/pkg/store" - "github.com/open-feature/flagd/core/pkg/sync" + isync "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" @@ -22,19 +22,83 @@ import ( "golang.org/x/exp/maps" ) +const ( + // Channel buffer sizes + eventChannelBuffer = 5 + syncChannelBuffer = 1 + + // Provider name for events + providerName = "flagd" +) + // InProcess service implements flagd flag evaluation in-process. // Flag configurations are obtained from supported sources. type InProcess struct { - evaluator evaluator.IEvaluator - events chan of.Event - listenerShutdown chan interface{} - logger *logger.Logger - serviceMetadata model.Metadata - sync sync.ISync - syncEnd context.CancelFunc - wg parallel.WaitGroup + // Core components + evaluator evaluator.IEvaluator + syncProvider isync.ISync + logger *logger.Logger + configuration Configuration + serviceMetadata model.Metadata + + // Event handling + events chan of.Event + eventSync EventSync + + // Shutdown coordination + ctx context.Context + cancelFunc context.CancelFunc + shutdownChannels *shutdownChannels + wg sync.WaitGroup + shutdownOnce sync.Once + + // Stateless coordination using sync.Once + initOnce sync.Once + sendReadyOnNextData sync.Once + staleTimer *staleTimer +} + +// shutdownChannels groups all shutdown-related channels +type shutdownChannels struct { + listenerShutdown chan struct{} + syncData chan isync.DataSync + initSuccess chan struct{} + initError chan error +} + +// staleTimer manages the stale connection timer with thread safety +type staleTimer struct { + timer *time.Timer + mu sync.Mutex } +// newStaleTimer creates a new thread-safe stale timer +func newStaleTimer() *staleTimer { + return &staleTimer{} +} + +// start starts or restarts the stale timer +func (st *staleTimer) start(duration time.Duration, callback func()) { + st.mu.Lock() + defer st.mu.Unlock() + + if st.timer == nil { + st.timer = time.AfterFunc(duration, callback) + } +} + +// stop stops the stale timer +func (st *staleTimer) stop() { + st.mu.Lock() + defer st.mu.Unlock() + + if st.timer != nil { + st.timer.Stop() + st.timer = nil + } +} + +// Configuration holds all configuration for the InProcess service type Configuration struct { Host any Port any @@ -43,120 +107,309 @@ type Configuration struct { Selector string TLSEnabled bool OfflineFlagSource string - CustomSyncProvider sync.ISync + CustomSyncProvider isync.ISync CustomSyncProviderUri string GrpcDialOptionsOverride []googlegrpc.DialOption CertificatePath string + RetryGracePeriod int +} + +// EventSync interface for sync providers that support events +type EventSync interface { + isync.ISync + Events() chan SyncEvent +} + +// SyncEvent represents an event from the sync provider +type SyncEvent struct { + event of.EventType } +// Shutdowner interface for graceful shutdown type Shutdowner interface { Shutdown() error } +// NewInProcessService creates a new InProcess service with the given configuration func NewInProcessService(cfg Configuration) *InProcess { log := logger.NewLogger(NewRaw(), false) + syncProvider, uri := createSyncProvider(cfg, log) - iSync, uri := makeSyncProvider(cfg, log) + flagStore := store.NewFlags() + flagStore.FlagSources = append(flagStore.FlagSources, uri) - // service specific metadata - svcMetadata := make(model.Metadata, 2) + return &InProcess{ + evaluator: evaluator.NewJSON(log, flagStore), + syncProvider: syncProvider, + logger: log, + configuration: cfg, + serviceMetadata: createServiceMetadata(cfg), + events: make(chan of.Event, eventChannelBuffer), + staleTimer: newStaleTimer(), + sendReadyOnNextData: sync.Once{}, // Armed and ready to fire on first data + } +} + +// createServiceMetadata builds the service metadata from configuration +func createServiceMetadata(cfg Configuration) model.Metadata { + metadata := make(model.Metadata, 2) if cfg.Selector != "" { - svcMetadata["scope"] = cfg.Selector + metadata["scope"] = cfg.Selector } if cfg.ProviderID != "" { - svcMetadata["providerID"] = cfg.ProviderID + metadata["providerID"] = cfg.ProviderID + } + return metadata +} + +// Init initializes the service and starts all background processes +func (i *InProcess) Init() error { + i.logger.Info("initializing InProcess service") + + // Setup context and shutdown channels + i.setupShutdownInfrastructure() + + // Initialize sync provider + if err := i.syncProvider.Init(i.ctx); err != nil { + return fmt.Errorf("failed to initialize sync provider: %w", err) } - flagStore := store.NewFlags() - flagStore.FlagSources = append(flagStore.FlagSources, uri) - return &InProcess{ - evaluator: evaluator.NewJSON(log, flagStore), - events: make(chan of.Event, 5), - logger: log, - listenerShutdown: make(chan interface{}), - serviceMetadata: svcMetadata, - sync: iSync, + // Start background processes + i.startEventSyncMonitor() + i.startDataSyncProcess() + i.startDataSyncListener() + + // Wait for initialization to complete + return i.waitForInitialization() +} + +// setupShutdownInfrastructure initializes context and channels for coordinated shutdown +func (i *InProcess) setupShutdownInfrastructure() { + i.ctx, i.cancelFunc = context.WithCancel(context.Background()) + i.shutdownChannels = &shutdownChannels{ + listenerShutdown: make(chan struct{}), + syncData: make(chan isync.DataSync, syncChannelBuffer), + initSuccess: make(chan struct{}), + initError: make(chan error, 1), } } -func (i *InProcess) Init() error { - var ctx context.Context - ctx, i.syncEnd = context.WithCancel(context.Background()) +// startEventSyncMonitor starts monitoring events from EventSync providers +func (i *InProcess) startEventSyncMonitor() { + eventSync, ok := i.syncProvider.(EventSync) + if !ok { + return // No event monitoring needed + } - err := i.sync.Init(ctx) - if err != nil { - return err + i.eventSync = eventSync + go i.runEventSyncMonitor() +} + +// runEventSyncMonitor handles events from the sync provider +func (i *InProcess) runEventSyncMonitor() { + i.logger.Debug("starting event sync monitor") + defer i.logger.Debug("event sync monitor stopped") + + for { + select { + case <-i.ctx.Done(): + return + case <-i.shutdownChannels.listenerShutdown: + return + case msg := <-i.eventSync.Events(): + i.handleSyncEvent(msg) + } + } +} + +// handleSyncEvent processes individual sync events +func (i *InProcess) handleSyncEvent(event SyncEvent) { + switch event.event { + case of.ProviderError: + i.handleProviderError() + // Reset the sync.Once so it can fire again on recovery + i.sendReadyOnNextData = sync.Once{} + case of.ProviderReady: + i.handleProviderReady() + } +} + +// handleProviderError handles provider error events by starting stale timer +func (i *InProcess) handleProviderError() { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderStale, + ProviderEventDetails: of.ProviderEventDetails{Message: "connection error"}, } - initOnce := parallel.Once{} - syncInitSuccess := make(chan interface{}) - syncInitErr := make(chan error) + // Start stale timer - when it expires, send error event + i.staleTimer.start(time.Duration(i.configuration.RetryGracePeriod)*time.Second, func() { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderError, + ProviderEventDetails: of.ProviderEventDetails{Message: "provider error"}, + } + }) +} - syncChan := make(chan sync.DataSync, 1) +// handleProviderReady handles provider ready events by stopping stale timer +func (i *InProcess) handleProviderReady() { + i.staleTimer.stop() +} - // start data sync +// startDataSyncProcess starts the main data synchronization goroutine +func (i *InProcess) startDataSyncProcess() { i.wg.Add(1) - go func() { - defer i.wg.Done() - err := i.sync.Sync(ctx, syncChan) - if err != nil { - syncInitErr <- err + go i.runDataSyncProcess() +} + +// runDataSyncProcess runs the main sync process and handles errors appropriately +func (i *InProcess) runDataSyncProcess() { + defer i.wg.Done() + i.logger.Debug("starting data sync process") + defer i.logger.Debug("data sync process stopped") + + err := i.syncProvider.Sync(i.ctx, i.shutdownChannels.syncData) + if err != nil && i.ctx.Err() == nil { + // Only report non-cancellation errors + select { + case i.shutdownChannels.initError <- err: + default: + // Don't block if channel is full or no reader } - }() + } +} - // start data sync listener and listen to listener shutdown hook +// startDataSyncListener starts the data sync listener goroutine +func (i *InProcess) startDataSyncListener() { i.wg.Add(1) - go func() { - defer i.wg.Done() - for { - select { - case data := <-syncChan: - // re-syncs are ignored as we only support single flag sync source - changes, _, err := i.evaluator.SetState(data) - if err != nil { - i.events <- of.Event{ - ProviderName: "flagd", EventType: of.ProviderError, - ProviderEventDetails: of.ProviderEventDetails{Message: "Error from flag sync " + err.Error()}} - } - initOnce.Do(func() { - i.events <- of.Event{ProviderName: "flagd", EventType: of.ProviderReady} - syncInitSuccess <- nil - }) - i.events <- of.Event{ - ProviderName: "flagd", EventType: of.ProviderConfigChange, - ProviderEventDetails: of.ProviderEventDetails{Message: "New flag sync", FlagChanges: maps.Keys(changes)}} - case <-i.listenerShutdown: - i.logger.Info("Shutting down data sync listener") - if shutdowner, ok := i.sync.(Shutdowner); ok { - err := shutdowner.Shutdown() - if err != nil { - i.logger.Error("Error shutdown sync provider", zap.Error(err)) - } - } - return - } + go i.runDataSyncListener() +} + +// runDataSyncListener processes incoming sync data and handles shutdown +func (i *InProcess) runDataSyncListener() { + defer i.wg.Done() + i.logger.Debug("starting data sync listener") + defer i.logger.Debug("data sync listener stopped") + + for { + select { + case data := <-i.shutdownChannels.syncData: + i.processSyncData(data) + + case <-i.ctx.Done(): + i.logger.Info("data sync listener stopping due to context cancellation") + i.shutdownSyncProvider() + return + + case <-i.shutdownChannels.listenerShutdown: + i.logger.Info("data sync listener stopping due to shutdown signal") + i.shutdownSyncProvider() + return } - }() + } +} - // wait for initialization or error +// processSyncData handles individual sync data updates +func (i *InProcess) processSyncData(data isync.DataSync) { + changes, _, err := i.evaluator.SetState(data) + if err != nil { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderError, + ProviderEventDetails: of.ProviderEventDetails{Message: "Error from flag sync " + err.Error()}, + } + return + } + + i.logger.Info("staletimer stop") + // Stop stale timer - we've successfully received and processed data + i.staleTimer.stop() + + // Send ready event using sync.Once - handles initial ready and recovery automatically + i.sendReadyOnNextData.Do(func() { + i.events <- of.Event{ProviderName: providerName, EventType: of.ProviderReady} + }) + + // Handle initialization completion (only happens once ever) + i.initOnce.Do(func() { + close(i.shutdownChannels.initSuccess) + }) + + // Send config change event for data updates + if len(changes) > 0 { + i.events <- of.Event{ + ProviderName: providerName, + EventType: of.ProviderConfigChange, + ProviderEventDetails: of.ProviderEventDetails{ + Message: "New flag sync", + FlagChanges: maps.Keys(changes), + }, + } + } +} + +// shutdownSyncProvider gracefully shuts down the sync provider +func (i *InProcess) shutdownSyncProvider() { + if shutdowner, ok := i.syncProvider.(Shutdowner); ok { + if err := shutdowner.Shutdown(); err != nil { + i.logger.Error("error shutting down sync provider", zap.Error(err)) + } + } +} + +// waitForInitialization waits for the service to initialize or fail +func (i *InProcess) waitForInitialization() error { select { - case <-syncInitSuccess: + case <-i.shutdownChannels.initSuccess: + i.logger.Info("InProcess service initialized successfully") return nil - case err := <-syncInitErr: - return err + case err := <-i.shutdownChannels.initError: + return fmt.Errorf("initialization failed: %w", err) } } +// Shutdown gracefully shuts down the service func (i *InProcess) Shutdown() { - i.syncEnd() - close(i.listenerShutdown) - i.wg.Wait() + i.shutdownOnce.Do(func() { + i.logger.Info("starting InProcess service shutdown") + + // Stop stale timer + i.staleTimer.stop() + + // Cancel context to signal all goroutines + if i.cancelFunc != nil { + i.cancelFunc() + } + + // Close shutdown channels + if i.shutdownChannels != nil { + close(i.shutdownChannels.listenerShutdown) + } + + i.logger.Info("waiting for background processes to complete") + i.wg.Wait() + i.logger.Info("InProcess service shutdown completed successfully") + }) } -func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue bool, - evalCtx map[string]interface{}) of.BoolResolutionDetail { +// EventChannel returns the event channel for external consumers +func (i *InProcess) EventChannel() <-chan of.Event { + return i.events +} + +// appendMetadata adds service metadata to evaluation metadata +func (i *InProcess) appendMetadata(evalMetadata model.Metadata) { + for k, v := range i.serviceMetadata { + evalMetadata[k] = v + } +} + +// ResolveBoolean resolves a boolean flag value +func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue bool, evalCtx map[string]interface{}) of.BoolResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveBooleanValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.BoolResolutionDetail{ Value: defaultValue, @@ -179,10 +432,11 @@ func (i *InProcess) ResolveBoolean(ctx context.Context, key string, defaultValue } } -func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue string, - evalCtx map[string]interface{}) of.StringResolutionDetail { +// ResolveString resolves a string flag value +func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue string, evalCtx map[string]interface{}) of.StringResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveStringValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.StringResolutionDetail{ Value: defaultValue, @@ -205,10 +459,11 @@ func (i *InProcess) ResolveString(ctx context.Context, key string, defaultValue } } -func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue float64, - evalCtx map[string]interface{}) of.FloatResolutionDetail { +// ResolveFloat resolves a float flag value +func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue float64, evalCtx map[string]interface{}) of.FloatResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveFloatValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.FloatResolutionDetail{ Value: defaultValue, @@ -231,10 +486,11 @@ func (i *InProcess) ResolveFloat(ctx context.Context, key string, defaultValue f } } -func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int64, - evalCtx map[string]interface{}) of.IntResolutionDetail { +// ResolveInt resolves an integer flag value +func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int64, evalCtx map[string]interface{}) of.IntResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveIntValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.IntResolutionDetail{ Value: defaultValue, @@ -257,10 +513,11 @@ func (i *InProcess) ResolveInt(ctx context.Context, key string, defaultValue int } } -func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue interface{}, - evalCtx map[string]interface{}) of.InterfaceResolutionDetail { +// ResolveObject resolves an object flag value +func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue interface{}, evalCtx map[string]interface{}) of.InterfaceResolutionDetail { value, variant, reason, metadata, err := i.evaluator.ResolveObjectValue(ctx, "", key, evalCtx) i.appendMetadata(metadata) + if err != nil { return of.InterfaceResolutionDetail{ Value: defaultValue, @@ -283,44 +540,27 @@ func (i *InProcess) ResolveObject(ctx context.Context, key string, defaultValue } } -func (i *InProcess) EventChannel() <-chan of.Event { - return i.events -} - -func (i *InProcess) appendMetadata(evalMetadata model.Metadata) { - // For a nil slice, the number of iterations is 0 - for k, v := range i.serviceMetadata { - evalMetadata[k] = v - } -} - -// makeSyncProvider is a helper to create sync.ISync and return the underlying uri used by it to the caller -func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string) { +// createSyncProvider creates the appropriate sync provider based on configuration +func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, string) { if cfg.CustomSyncProvider != nil { - log.Info("operating in in-process mode with a custom sync provider at " + cfg.CustomSyncProviderUri) + log.Info("using custom sync provider at " + cfg.CustomSyncProviderUri) return cfg.CustomSyncProvider, cfg.CustomSyncProviderUri } if cfg.OfflineFlagSource != "" { - // file sync provider - log.Info("operating in in-process mode with offline flags sourced from " + cfg.OfflineFlagSource) + log.Info("using file sync provider with source: " + cfg.OfflineFlagSource) return &file.Sync{ URI: cfg.OfflineFlagSource, Logger: log, - Mux: ¶llel.RWMutex{}, + Mux: &sync.RWMutex{}, }, cfg.OfflineFlagSource } - // grpc sync provider (default uri based on `dns`) - uri := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) - - if cfg.TargetUri != "" && isValidTargetScheme(cfg.TargetUri) { - uri = cfg.TargetUri - } - - log.Info("operating in in-process mode with flags sourced from " + uri) + // Default to gRPC sync provider + uri := buildGrpcUri(cfg) + log.Info("using gRPC sync provider with URI: " + uri) - return &grpc.Sync{ + return &Sync{ CredentialBuilder: &credentials.CredentialBuilder{}, GrpcDialOptionsOverride: cfg.GrpcDialOptionsOverride, Logger: log, @@ -332,7 +572,15 @@ func makeSyncProvider(cfg Configuration, log *logger.Logger) (sync.ISync, string }, uri } -// mapError is a helper to map evaluation errors to OF errors +// buildGrpcUri constructs the gRPC URI from configuration +func buildGrpcUri(cfg Configuration) string { + if cfg.TargetUri != "" && isValidTargetScheme(cfg.TargetUri) { + return cfg.TargetUri + } + return fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) +} + +// mapError maps evaluation errors to OpenFeature errors func mapError(flagKey string, err error) of.ResolutionError { switch err.Error() { case model.FlagNotFoundErrorCode: @@ -348,6 +596,7 @@ func mapError(flagKey string, err error) of.ResolutionError { } } +// isValidTargetScheme validates the gRPC target URI scheme func isValidTargetScheme(targetUri string) bool { regx := regexp.MustCompile("^" + grpc.SupportedScheme) return regx.Match([]byte(targetUri)) diff --git a/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go b/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go index 2be9c1801..5471a6a1d 100644 --- a/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go +++ b/providers/flagd/pkg/service/in_process/service_custom_sync_provider_test.go @@ -9,7 +9,7 @@ func TestInProcessWithCustomSyncProvider(t *testing.T) { service := NewInProcessService(Configuration{CustomSyncProvider: customSyncProvider, CustomSyncProviderUri: "not tested here"}) // If custom sync provider is supplied the in-process service should use it. - if service.sync != customSyncProvider { - t.Fatalf("Expected service.sync to be the mockCustomSyncProvider, but got %s", service.sync) + if service.syncProvider != customSyncProvider { + t.Fatalf("Expected service.sync to be the mockCustomSyncProvider, but got %s", service.syncProvider) } } diff --git a/tests/flagd/testframework/config_steps.go b/tests/flagd/testframework/config_steps.go index c24f4e8b0..ddf7ac492 100644 --- a/tests/flagd/testframework/config_steps.go +++ b/tests/flagd/testframework/config_steps.go @@ -19,7 +19,6 @@ var ignoredOptions = []string{ "keepAliveTime", "retryBackoffMs", "retryBackoffMaxMs", - "retryGracePeriod", "offlinePollIntervalMs", } diff --git a/tests/flagd/testframework/event_steps.go b/tests/flagd/testframework/event_steps.go index c428192d0..71ce012fc 100644 --- a/tests/flagd/testframework/event_steps.go +++ b/tests/flagd/testframework/event_steps.go @@ -183,7 +183,7 @@ func (s *TestState) waitForEvents(eventType string, maxWait time.Duration) error } } -// assertEventOccurred checks if a specific event occurred (with immediate timeout) +// assertEventOccurred checks if a specific event occurred func (s *TestState) assertEventOccurred(eventType string) error { return s.waitForEvents(eventType, 10*time.Second) } diff --git a/tests/flagd/testframework/provider_steps.go b/tests/flagd/testframework/provider_steps.go index 57159cf5a..165853b2e 100644 --- a/tests/flagd/testframework/provider_steps.go +++ b/tests/flagd/testframework/provider_steps.go @@ -40,6 +40,11 @@ func (s *TestState) createProviderInstance() error { var provider openfeature.FeatureProvider var err error + s.ProviderOptions = append(s.ProviderOptions, ProviderOption{ + Option: "RetryGracePeriod", + ValueType: "Integer", + Value: "1", + }) switch s.ProviderType { case RPC: if RPCProviderSupplier == nil { @@ -82,13 +87,13 @@ func (s *TestState) createProviderInstance() error { } // waitForProviderReady waits for the provider to be in READY state -func (s *TestState) waitForProviderReady(timeout time.Duration) error { +func (s *TestState) waitForProviderReady(ctx context.Context, timeout time.Duration) error { if s.Client == nil { return fmt.Errorf("no client available to wait for provider ready") } // Use generic event handler infrastructure - if err := s.addGenericEventHandler(context.Background(), "ready"); err != nil { + if err := s.addGenericEventHandler(ctx, "ready"); err != nil { return fmt.Errorf("failed to add ready event handler: %w", err) } @@ -130,7 +135,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider } // Wait for provider to be ready - return s.waitForProviderReady(15 * time.Second) + return s.waitForProviderReady(ctx, 15*time.Second) } return nil } diff --git a/tests/flagd/testframework/step_definitions.go b/tests/flagd/testframework/step_definitions.go index e976562f0..3a76e088e 100644 --- a/tests/flagd/testframework/step_definitions.go +++ b/tests/flagd/testframework/step_definitions.go @@ -47,9 +47,9 @@ func InitializeScenario(ctx *godog.ScenarioContext) { scenarioMutex.Lock() defer scenarioMutex.Unlock() if state, ok := ctx.Value(TestStateKey{}).(*TestState); ok { + state.clearEvents() state.CleanupEnvironmentVariables() state.cleanupProvider() - state.clearEvents() } return ctx, nil }) @@ -95,11 +95,11 @@ func (s *TestState) cleanupProvider() { if s.Provider != nil { // Try to cast to common provider interfaces that might have shutdown methods // This is defensive - not all providers will have explicit shutdown - go func() { - if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { - shutdownable.Shutdown() - } - }() + + if shutdownable, ok := s.Provider.(interface{ Shutdown() }); ok { + shutdownable.Shutdown() + } + s.Provider = nil } }