Skip to content

Commit 71745fc

Browse files
fix(flagd): Add forbidden provider support and improve inprocess sync non-retry error handling (#756)
Signed-off-by: Alexandra Oberaigner <[email protected]>
1 parent 10e72b9 commit 71745fc

File tree

4 files changed

+41
-23
lines changed

4 files changed

+41
-23
lines changed

providers/flagd/pkg/service/in_process/grpc_sync.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
55
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
66
"context"
7+
"encoding/json"
78
"fmt"
8-
"github.com/goccy/go-json"
99
"github.com/open-feature/flagd/core/pkg/logger"
1010
"github.com/open-feature/flagd/core/pkg/sync"
1111
grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc/connectivity"
1515
"google.golang.org/grpc/keepalive"
1616
"google.golang.org/grpc/status"
17+
"strings"
1718
msync "sync"
1819
"time"
1920
)
@@ -37,21 +38,8 @@ const (
3738
"MaxBackoff": "5s",
3839
"BackoffMultiplier": 2.0,
3940
"RetryableStatusCodes": [
40-
"CANCELLED",
4141
"UNKNOWN",
42-
"INVALID_ARGUMENT",
43-
"NOT_FOUND",
44-
"ALREADY_EXISTS",
45-
"PERMISSION_DENIED",
46-
"RESOURCE_EXHAUSTED",
47-
"FAILED_PRECONDITION",
48-
"ABORTED",
49-
"OUT_OF_RANGE",
50-
"UNIMPLEMENTED",
51-
"INTERNAL",
52-
"UNAVAILABLE",
53-
"DATA_LOSS",
54-
"UNAUTHENTICATED"
42+
"UNAVAILABLE"
5543
]
5644
}
5745
}
@@ -61,7 +49,7 @@ const (
6149
nonRetryableStatusCodes = `
6250
[
6351
"PermissionDenied",
64-
"Unauthenticated",
52+
"Unauthenticated"
6553
]
6654
`
6755
)
@@ -90,6 +78,7 @@ type Sync struct {
9078
Selector string
9179
URI string
9280
MaxMsgSize int
81+
RetryGracePeriod int
9382

9483
// Runtime state
9584
client FlagSyncServiceClient
@@ -104,7 +93,7 @@ type Sync struct {
10493
// Init initializes the gRPC connection and starts background monitoring
10594
func (g *Sync) Init(ctx context.Context) error {
10695
g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI))
107-
initNonRetryableStatusCodesSet()
96+
g.initNonRetryableStatusCodesSet()
10897

10998
// Initialize channels
11099
g.shutdownComplete = make(chan struct{})
@@ -174,13 +163,16 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) {
174163
}
175164

176165
// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
177-
func initNonRetryableStatusCodesSet() {
166+
func (g *Sync) initNonRetryableStatusCodesSet() {
178167
var codes []string
179168
nonRetryableCodes = make(map[string]struct{})
180-
if err := json.Unmarshal([]byte(nonRetryableStatusCodes), &codes); err == nil {
169+
trimmed := strings.TrimSpace(nonRetryableStatusCodes)
170+
if err := json.Unmarshal([]byte(trimmed), &codes); err == nil {
181171
for _, code := range codes {
182172
nonRetryableCodes[code] = struct{}{}
183173
}
174+
} else {
175+
g.Logger.Debug("parsing non-retryable status codes failed, retrying on all errors")
184176
}
185177
}
186178

@@ -243,11 +235,19 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
243235
if ok {
244236
codeStr := st.Code().String()
245237
if _, found := nonRetryableCodes[codeStr]; found {
246-
g.Logger.Error(fmt.Sprintf("sync cycle failed with non-retryable code: %v", codeStr))
247-
return err
238+
errStr := fmt.Sprintf("sync cycle failed with non-retryable status: %v, " +
239+
"returning provider fatal.", codeStr)
240+
g.Logger.Error(errStr)
241+
return &of.ProviderInitError{
242+
ErrorCode: of.ProviderFatalCode,
243+
Message: errStr,
244+
}
248245
}
249246
}
250247

248+
// Backoff before retrying
249+
time.Sleep(time.Duration(g.RetryGracePeriod))
250+
251251
g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err))
252252
g.sendEvent(ctx, SyncEvent{event: of.ProviderError})
253253

providers/flagd/pkg/service/in_process/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,7 @@ func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, str
569569
ProviderID: cfg.ProviderID,
570570
Selector: cfg.Selector,
571571
URI: uri,
572+
RetryGracePeriod: cfg.RetryGracePeriod,
572573
}, uri
573574
}
574575

providers/multi-provider/pkg/strategies/strategies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func setFlagMetadata(strategyUsed EvaluationStrategy, successProviderName string
112112
func cleanErrorMessage(msg string) string {
113113
codeRegex := strings.Join([]string{
114114
string(of.ProviderNotReadyCode),
115-
// string(of.ProviderFatalCode), // TODO: not available until go-sdk 14
115+
string(of.ProviderFatalCode),
116116
string(of.FlagNotFoundCode),
117117
string(of.ParseErrorCode),
118118
string(of.TypeMismatchCode),

tests/flagd/testframework/provider_steps.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func InitializeProviderSteps(ctx *godog.ScenarioContext) {
2727
// Generic provider step definition - accepts any provider type including "stable"
2828
ctx.Step(`^a (\w+) flagd provider$`,
2929
withState1Arg((*TestState).createSpecializedFlagdProvider))
30+
31+
ctx.Step(`^the client is in (\w+) state$`,
32+
withState1Arg((*TestState).assertClientState))
3033
}
3134

3235
// State methods - these now expect context as first parameter after state
@@ -111,6 +114,13 @@ func (s *TestState) simulateConnectionLoss(ctx context.Context, seconds int) err
111114
return s.Container.Restart(seconds)
112115
}
113116

117+
func (s *TestState) assertClientState(ctx context.Context, state string) error {
118+
if string(s.Client.State()) == strings.ToUpper(state) {
119+
return nil
120+
}
121+
return fmt.Errorf("expected client state %s but got %s", state, s.Client.State())
122+
}
123+
114124
// createSpecializedFlagdProvider creates specialized flagd providers based on type
115125
func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, providerType string) error {
116126
// Apply specialized configuration based on provider type
@@ -128,7 +138,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider
128138
return fmt.Errorf("failed to create instance for %s provider: %w", providerType, err)
129139
}
130140

131-
if providerType != "unavailable" {
141+
if providerType != "unavailable" && providerType != "forbidden" {
132142
if s.ProviderType == RPC {
133143
// Small delay to allow flagd server to fully load flags after connection
134144
time.Sleep(50 * time.Millisecond)
@@ -150,6 +160,7 @@ func (s *TestState) applySpecializedConfig(providerType string) error {
150160
return nil
151161
case "unavailable":
152162
return s.configureUnavailableProvider()
163+
case "forbidden": return s.configureForbiddenProvider()
153164
case "socket":
154165
return s.configureSocketProvider()
155166
case "ssl", "tls":
@@ -173,6 +184,12 @@ func (s *TestState) configureUnavailableProvider() error {
173184
return nil
174185
}
175186

187+
func (s *TestState) configureForbiddenProvider() error {
188+
// Set an Envoy port which always responds with forbidden
189+
s.addProviderOption("port", "Integer", "9212")
190+
return nil
191+
}
192+
176193
func (s *TestState) configureSocketProvider() error {
177194
// Configure for unix socket connection
178195
s.addProviderOption("socketPath", "String", "/tmp/flagd.sock")

0 commit comments

Comments
 (0)