Skip to content

Commit b6cd812

Browse files
committed
feat(flagd): add eventing with graceperiod fir inprocess resolver
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 4f4f163 commit b6cd812

File tree

6 files changed

+331
-12
lines changed

6 files changed

+331
-12
lines changed

providers/flagd/e2e/inprocess_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) {
2626
}
2727

2828
// Run tests with in-process specific tags - exclude connection/event issues we won't tackle
29-
tags := "@in-process && ~@unixsocket && ~@targetURI && ~@metadata && ~@grace && ~@customCert && ~@reconnect && ~@contextEnrichment && ~@sync-payload && ~@events"
29+
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload"
3030

3131
if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
3232
t.Fatalf("Gherkin tests failed: %v", err)

providers/flagd/pkg/configuration.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ package flagd
33
import (
44
"errors"
55
"fmt"
6-
"os"
7-
"strconv"
8-
"strings"
9-
106
"github.com/go-logr/logr"
117
"github.com/open-feature/flagd/core/pkg/sync"
128
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
139
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
1410
"google.golang.org/grpc"
11+
"os"
12+
"strconv"
13+
"strings"
14+
"time"
1515
)
1616

1717
type ResolverType string
@@ -26,6 +26,7 @@ const (
2626
defaultCache = cache.LRUValue
2727
defaultHost = "localhost"
2828
defaultResolver = rpc
29+
defaultGracePeriod = 5 * time.Second
2930

3031
rpc ResolverType = "rpc"
3132
inProcess ResolverType = "in-process"
@@ -44,6 +45,7 @@ const (
4445
flagdSourceSelectorEnvironmentVariableName = "FLAGD_SOURCE_SELECTOR"
4546
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
4647
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
48+
flagdGracePeriodVariableName = "FLAGD_GRACE_PERIOD"
4749
)
4850

4951
type ProviderConfiguration struct {
@@ -64,6 +66,7 @@ type ProviderConfiguration struct {
6466
CustomSyncProvider sync.ISync
6567
CustomSyncProviderUri string
6668
GrpcDialOptionsOverride []grpc.DialOption
69+
GracePeriod time.Duration
6770

6871
log logr.Logger
6972
}
@@ -77,6 +80,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
7780
MaxCacheSize: defaultMaxCacheSize,
7881
Resolver: defaultResolver,
7982
Tls: defaultTLS,
83+
GracePeriod: defaultGracePeriod,
8084
}
8185

8286
p.updateFromEnvVar()
@@ -224,6 +228,14 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
224228
if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" {
225229
cfg.TargetUri = targetUri
226230
}
231+
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
232+
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
233+
cfg.GracePeriod = time.Duration(seconds) * time.Second
234+
} else {
235+
// Handle parsing error
236+
cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s': %v", gracePeriod, err))
237+
}
238+
}
227239

228240
}
229241

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package process
2+
3+
import (
4+
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
5+
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
6+
"context"
7+
"fmt"
8+
"github.com/open-feature/flagd/core/pkg/logger"
9+
"github.com/open-feature/flagd/core/pkg/sync"
10+
grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
11+
of "github.com/open-feature/go-sdk/openfeature"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/connectivity"
14+
"google.golang.org/grpc/keepalive"
15+
msync "sync"
16+
"time"
17+
)
18+
19+
const (
20+
// Prefix for GRPC URL inputs. GRPC does not define a standard prefix. This prefix helps to differentiate remote
21+
// URLs for REST APIs (i.e - HTTP) from GRPC endpoints.
22+
Prefix = "grpc://"
23+
PrefixSecure = "grpcs://"
24+
SupportedScheme = "(envoy|dns|uds|xds)"
25+
)
26+
27+
// type aliases for interfaces required by this component - needed for mock generation with gomock
28+
29+
type FlagSyncServiceClient interface {
30+
syncv1grpc.FlagSyncServiceClient
31+
}
32+
type FlagSyncServiceClientResponse interface {
33+
syncv1grpc.FlagSyncService_SyncFlagsClient
34+
}
35+
36+
var once msync.Once
37+
38+
type Sync struct {
39+
GrpcDialOptionsOverride []grpc.DialOption
40+
CertPath string
41+
CredentialBuilder grpccredential.Builder
42+
Logger *logger.Logger
43+
ProviderID string
44+
Secure bool
45+
Selector string
46+
URI string
47+
MaxMsgSize int
48+
49+
client FlagSyncServiceClient
50+
connection *grpc.ClientConn
51+
ready bool
52+
events chan SyncEvent
53+
}
54+
55+
func (g *Sync) Init(ctx context.Context) error {
56+
var rpcCon *grpc.ClientConn
57+
var err error
58+
59+
g.events = make(chan SyncEvent)
60+
61+
if len(g.GrpcDialOptionsOverride) > 0 {
62+
g.Logger.Debug("GRPC DialOptions override provided")
63+
rpcCon, err = grpc.NewClient(g.URI, g.GrpcDialOptionsOverride...)
64+
} else {
65+
// Build dial options with enhanced features
66+
var dialOptions []grpc.DialOption
67+
68+
// Transport credentials
69+
tCredentials, err := g.CredentialBuilder.Build(g.Secure, g.CertPath)
70+
if err != nil {
71+
err = fmt.Errorf("error building transport credentials: %w", err)
72+
g.Logger.Error(err.Error())
73+
return err
74+
}
75+
dialOptions = append(dialOptions, grpc.WithTransportCredentials(tCredentials))
76+
77+
// Call options
78+
var callOptions []grpc.CallOption
79+
if g.MaxMsgSize > 0 {
80+
callOptions = append(callOptions, grpc.MaxCallRecvMsgSize(g.MaxMsgSize))
81+
g.Logger.Info(fmt.Sprintf("setting max receive message size %d bytes", g.MaxMsgSize))
82+
}
83+
if len(callOptions) > 0 {
84+
dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(callOptions...))
85+
}
86+
87+
// Keepalive settings
88+
keepaliveParams := keepalive.ClientParameters{
89+
Time: 30 * time.Second, // Send ping every 30 seconds
90+
Timeout: 5 * time.Second, // Wait 5 seconds for ping response
91+
PermitWithoutStream: true, // Allow pings when no streams active
92+
}
93+
dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepaliveParams))
94+
95+
// Create connection
96+
rpcCon, err = grpc.NewClient(g.URI, dialOptions...)
97+
}
98+
99+
if err != nil {
100+
err := fmt.Errorf("error initiating grpc client connection: %w", err)
101+
g.Logger.Error(err.Error())
102+
return err
103+
}
104+
105+
// Store connection for state tracking
106+
g.connection = rpcCon
107+
108+
// Setup service client
109+
g.client = syncv1grpc.NewFlagSyncServiceClient(rpcCon)
110+
111+
// Start connection state monitoring in background
112+
go g.monitorConnectionState(ctx)
113+
114+
g.Logger.Info(fmt.Sprintf("gRPC client initialized for %s", g.URI))
115+
return nil
116+
}
117+
118+
func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
119+
res, err := g.client.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{ProviderId: g.ProviderID, Selector: g.Selector})
120+
if err != nil {
121+
err = fmt.Errorf("error fetching all flags: %w", err)
122+
g.Logger.Error(err.Error())
123+
return err
124+
}
125+
dataSync <- sync.DataSync{
126+
FlagData: res.GetFlagConfiguration(),
127+
Source: g.URI,
128+
}
129+
return nil
130+
}
131+
132+
func (g *Sync) IsReady() bool {
133+
return g.ready
134+
}
135+
136+
func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
137+
for {
138+
g.Logger.Debug("creating sync stream...")
139+
140+
// Create sync stream with wait-for-ready - let gRPC handle the connection waiting
141+
syncClient, err := g.client.SyncFlags(
142+
ctx,
143+
&v1.SyncFlagsRequest{
144+
ProviderId: g.ProviderID,
145+
Selector: g.Selector,
146+
},
147+
grpc.WaitForReady(true), // gRPC will wait for connection to be ready
148+
)
149+
if err != nil {
150+
// Check if context is cancelled
151+
if ctx.Err() != nil {
152+
return ctx.Err()
153+
}
154+
155+
g.Logger.Warn(fmt.Sprintf("failed to create sync stream: %v", err))
156+
157+
// Brief pause before retry
158+
select {
159+
case <-time.After(time.Second):
160+
continue
161+
case <-ctx.Done():
162+
return ctx.Err()
163+
}
164+
}
165+
166+
g.Logger.Info("sync stream established, starting to receive flags...")
167+
168+
// Handle the stream - when it breaks, we'll create a new one
169+
err = g.handleFlagSync(syncClient, dataSync)
170+
if err != nil {
171+
if ctx.Err() != nil {
172+
return ctx.Err()
173+
}
174+
175+
g.Logger.Warn(fmt.Sprintf("stream closed: %v", err))
176+
// Loop will automatically create a new stream with wait-for-ready
177+
}
178+
}
179+
}
180+
181+
// monitorConnectionState monitors connection state changes and logs errors
182+
func (g *Sync) monitorConnectionState(ctx context.Context) {
183+
if g.connection == nil {
184+
return
185+
}
186+
187+
currentState := g.connection.GetState()
188+
g.Logger.Debug(fmt.Sprintf("starting connection state monitoring, initial state: %s", currentState))
189+
190+
for {
191+
// Wait for next state change
192+
if !g.connection.WaitForStateChange(ctx, currentState) {
193+
// Context cancelled, exit monitoring
194+
g.Logger.Debug("connection state monitoring stopped")
195+
return
196+
}
197+
198+
newState := g.connection.GetState()
199+
g.Logger.Debug(fmt.Sprintf("connection state changed: %s -> %s", currentState, newState))
200+
201+
// Log error states
202+
switch newState {
203+
case connectivity.TransientFailure:
204+
g.events <- SyncEvent{event: of.ProviderError}
205+
g.Logger.Error(fmt.Sprintf("gRPC connection entered TransientFailure state for %s", g.URI))
206+
case connectivity.Shutdown:
207+
g.Logger.Error(fmt.Sprintf("gRPC connection shutdown for %s", g.URI))
208+
//return // Exit monitoring on shutdown
209+
case connectivity.Ready:
210+
g.Logger.Info(fmt.Sprintf("gRPC connection ready for %s", g.URI))
211+
case connectivity.Idle:
212+
g.Logger.Debug(fmt.Sprintf("gRPC connection idle for %s", g.URI))
213+
case connectivity.Connecting:
214+
g.Logger.Debug(fmt.Sprintf("gRPC connection attempting to connect to %s", g.URI))
215+
}
216+
217+
currentState = newState
218+
}
219+
}
220+
221+
// handleFlagSync wraps the stream listening and push updates through dataSync channel
222+
func (g *Sync) handleFlagSync(stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
223+
once.Do(func() {
224+
g.ready = true
225+
})
226+
227+
// Stream message handling loop - receives each individual message from the stream
228+
for {
229+
data, err := stream.Recv()
230+
if err != nil {
231+
return fmt.Errorf("error receiving payload from stream: %w", err)
232+
}
233+
234+
dataSync <- sync.DataSync{
235+
FlagData: data.FlagConfiguration,
236+
SyncContext: data.SyncContext,
237+
Source: g.URI,
238+
Selector: g.Selector,
239+
}
240+
241+
g.Logger.Debug("received full configuration payload")
242+
}
243+
}
244+
245+
func (g *Sync) Events() chan SyncEvent {
246+
return g.events
247+
}

0 commit comments

Comments
 (0)