|
| 1 | +package process |
| 2 | + |
| 3 | +import ( |
| 4 | + "buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc" |
| 5 | + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1" |
| 6 | + "context" |
| 7 | + "fmt" |
| 8 | + "github.com/open-feature/flagd/core/pkg/logger" |
| 9 | + "github.com/open-feature/flagd/core/pkg/sync" |
| 10 | + grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" |
| 11 | + of "github.com/open-feature/go-sdk/openfeature" |
| 12 | + "google.golang.org/grpc" |
| 13 | + "google.golang.org/grpc/connectivity" |
| 14 | + "google.golang.org/grpc/keepalive" |
| 15 | + msync "sync" |
| 16 | + "time" |
| 17 | +) |
| 18 | + |
| 19 | +const ( |
| 20 | + // Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote |
| 21 | + // URLs for REST APIs (i.e - HTTP) from GRPC endpoints. |
| 22 | + Prefix = "grpc://" |
| 23 | + PrefixSecure = "grpcs://" |
| 24 | + SupportedScheme = "(envoy|dns|uds|xds)" |
| 25 | +) |
| 26 | + |
| 27 | +// type aliases for interfaces required by this component - needed for mock generation with gomock |
| 28 | + |
| 29 | +type FlagSyncServiceClient interface { |
| 30 | + syncv1grpc.FlagSyncServiceClient |
| 31 | +} |
| 32 | +type FlagSyncServiceClientResponse interface { |
| 33 | + syncv1grpc.FlagSyncService_SyncFlagsClient |
| 34 | +} |
| 35 | + |
| 36 | +var once msync.Once |
| 37 | + |
| 38 | +type Sync struct { |
| 39 | + GrpcDialOptionsOverride []grpc.DialOption |
| 40 | + CertPath string |
| 41 | + CredentialBuilder grpccredential.Builder |
| 42 | + Logger *logger.Logger |
| 43 | + ProviderID string |
| 44 | + Secure bool |
| 45 | + Selector string |
| 46 | + URI string |
| 47 | + MaxMsgSize int |
| 48 | + |
| 49 | + client FlagSyncServiceClient |
| 50 | + connection *grpc.ClientConn |
| 51 | + ready bool |
| 52 | + events chan SyncEvent |
| 53 | +} |
| 54 | + |
| 55 | +func (g *Sync) Init(ctx context.Context) error { |
| 56 | + var rpcCon *grpc.ClientConn |
| 57 | + var err error |
| 58 | + |
| 59 | + g.events = make(chan SyncEvent) |
| 60 | + |
| 61 | + if len(g.GrpcDialOptionsOverride) > 0 { |
| 62 | + g.Logger.Debug("GRPC DialOptions override provided") |
| 63 | + rpcCon, err = grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...) |
| 64 | + } else { |
| 65 | + // Build dial options with enhanced features |
| 66 | + var dialOptions []grpc.DialOption |
| 67 | + |
| 68 | + // Transport credentials |
| 69 | + tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath) |
| 70 | + if err != nil { |
| 71 | + err = fmt.Errorf("error building transport credentials: %w", err) |
| 72 | + g.Logger.Error(err.Error()) |
| 73 | + return err |
| 74 | + } |
| 75 | + dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials)) |
| 76 | + |
| 77 | + // Call options |
| 78 | + var callOptions []grpc.CallOption |
| 79 | + if g.MaxMsgSize > 0 { |
| 80 | + callOptions = append(callOptions, grpc.MaxCallRecvMsgSize(g.MaxMsgSize)) |
| 81 | + g.Logger.Info(fmt.Sprintf("setting max receive message size %d bytes", g.MaxMsgSize)) |
| 82 | + } |
| 83 | + if len(callOptions) > 0 { |
| 84 | + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...)) |
| 85 | + } |
| 86 | + |
| 87 | + // Keepalive settings |
| 88 | + keepaliveParams := keepalive.ClientParameters{ |
| 89 | + Time: 30 * time.Second, // Send ping every 30 seconds |
| 90 | + Timeout: 5 * time.Second, // Wait 5 seconds for ping response |
| 91 | + PermitWithoutStream: true, // Allow pings when no streams active |
| 92 | + } |
| 93 | + dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams)) |
| 94 | + |
| 95 | + // Create connection |
| 96 | + rpcCon, err = grpc.NewClient(g.URI, dialOptions...) |
| 97 | + } |
| 98 | + |
| 99 | + if err != nil { |
| 100 | + err := fmt.Errorf("error initiating grpc client connection: %w", err) |
| 101 | + g.Logger.Error(err.Error()) |
| 102 | + return err |
| 103 | + } |
| 104 | + |
| 105 | + // Store connection for state tracking |
| 106 | + g.connection = rpcCon |
| 107 | + |
| 108 | + // Setup service client |
| 109 | + g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon) |
| 110 | + |
| 111 | + // Start connection state monitoring in background |
| 112 | + go g.monitorConnectionState(ctx) |
| 113 | + |
| 114 | + g.Logger.Info(fmt.Sprintf("gRPC client initialized for %s", g.URI)) |
| 115 | + return nil |
| 116 | +} |
| 117 | + |
| 118 | +func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { |
| 119 | + res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector}) |
| 120 | + if err != nil { |
| 121 | + err = fmt.Errorf("error fetching all flags: %w", err) |
| 122 | + g.Logger.Error(err.Error()) |
| 123 | + return err |
| 124 | + } |
| 125 | + dataSync <- sync.DataSync{ |
| 126 | + FlagData: res.GetFlagConfiguration(), |
| 127 | + Source: g.URI, |
| 128 | + } |
| 129 | + return nil |
| 130 | +} |
| 131 | + |
| 132 | +func (g *Sync) IsReady() bool { |
| 133 | + return g.ready |
| 134 | +} |
| 135 | + |
| 136 | +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { |
| 137 | + for { |
| 138 | + g.Logger.Debug("creating sync stream...") |
| 139 | + |
| 140 | + // Create sync stream with wait-for-ready - let gRPC handle the connection waiting |
| 141 | + syncClient, err := g.client.SyncFlags( |
| 142 | + ctx, |
| 143 | + &v1.SyncFlagsRequest{ |
| 144 | + ProviderId: g.ProviderID, |
| 145 | + Selector: g.Selector, |
| 146 | + }, |
| 147 | + grpc.WaitForReady(true), // gRPC will wait for connection to be ready |
| 148 | + ) |
| 149 | + if err != nil { |
| 150 | + // Check if context is cancelled |
| 151 | + if ctx.Err() != nil { |
| 152 | + return ctx.Err() |
| 153 | + } |
| 154 | + |
| 155 | + g.Logger.Warn(fmt.Sprintf("failed to create sync stream: %v", err)) |
| 156 | + |
| 157 | + // Brief pause before retry |
| 158 | + select { |
| 159 | + case <-time.After(time.Second): |
| 160 | + continue |
| 161 | + case <-ctx.Done(): |
| 162 | + return ctx.Err() |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + g.Logger.Info("sync stream established, starting to receive flags...") |
| 167 | + |
| 168 | + // Handle the stream - when it breaks, we'll create a new one |
| 169 | + err = g.handleFlagSync(syncClient, dataSync) |
| 170 | + if err != nil { |
| 171 | + if ctx.Err() != nil { |
| 172 | + return ctx.Err() |
| 173 | + } |
| 174 | + |
| 175 | + g.Logger.Warn(fmt.Sprintf("stream closed: %v", err)) |
| 176 | + // Loop will automatically create a new stream with wait-for-ready |
| 177 | + } |
| 178 | + } |
| 179 | +} |
| 180 | + |
| 181 | +// monitorConnectionState monitors connection state changes and logs errors |
| 182 | +func (g *Sync) monitorConnectionState(ctx context.Context) { |
| 183 | + if g.connection == nil { |
| 184 | + return |
| 185 | + } |
| 186 | + |
| 187 | + currentState := g.connection.GetState() |
| 188 | + g.Logger.Debug(fmt.Sprintf("starting connection state monitoring, initial state: %s", currentState)) |
| 189 | + |
| 190 | + for { |
| 191 | + // Wait for next state change |
| 192 | + if !g.connection.WaitForStateChange(ctx, currentState) { |
| 193 | + // Context cancelled, exit monitoring |
| 194 | + g.Logger.Debug("connection state monitoring stopped") |
| 195 | + return |
| 196 | + } |
| 197 | + |
| 198 | + newState := g.connection.GetState() |
| 199 | + g.Logger.Debug(fmt.Sprintf("connection state changed: %s -> %s", currentState, newState)) |
| 200 | + |
| 201 | + // Log error states |
| 202 | + switch newState { |
| 203 | + case connectivity.TransientFailure: |
| 204 | + g.events <- SyncEvent{event: of.ProviderError} |
| 205 | + g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI)) |
| 206 | + case connectivity.Shutdown: |
| 207 | + g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI)) |
| 208 | + //return // Exit monitoring on shutdown |
| 209 | + case connectivity.Ready: |
| 210 | + g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI)) |
| 211 | + case connectivity.Idle: |
| 212 | + g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI)) |
| 213 | + case connectivity.Connecting: |
| 214 | + g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI)) |
| 215 | + } |
| 216 | + |
| 217 | + currentState = newState |
| 218 | + } |
| 219 | +} |
| 220 | + |
| 221 | +// handleFlagSync wraps the stream listening and push updates through dataSync channel |
| 222 | +func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error { |
| 223 | + once.Do(func() { |
| 224 | + g.ready = true |
| 225 | + }) |
| 226 | + |
| 227 | + // Stream message handling loop - receives each individual message from the stream |
| 228 | + for { |
| 229 | + data, err := stream.Recv() |
| 230 | + if err != nil { |
| 231 | + return fmt.Errorf("error receiving payload from stream: %w", err) |
| 232 | + } |
| 233 | + |
| 234 | + dataSync <- sync.DataSync{ |
| 235 | + FlagData: data.FlagConfiguration, |
| 236 | + SyncContext: data.SyncContext, |
| 237 | + Source: g.URI, |
| 238 | + Selector: g.Selector, |
| 239 | + } |
| 240 | + |
| 241 | + g.Logger.Debug("received full configuration payload") |
| 242 | + } |
| 243 | +} |
| 244 | + |
| 245 | +func (g *Sync) Events() chan SyncEvent { |
| 246 | + return g.events |
| 247 | +} |
0 commit comments