Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions providers/flagd/pkg/service/in_process/grpc_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"context"
"encoding/json"
"fmt"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
Expand All @@ -12,6 +13,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"strings"
msync "sync"
"time"
)
Expand All @@ -35,28 +38,25 @@ const (
"MaxBackoff": "5s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the java impl, we set this to the maxBackoff param: https://github.com/open-feature/java-sdk-contrib/pull/1590/files#diff-bbef645a236a67bc95a5f8aa30fa5a528c6b2d45b4f4137b4f4b1074af197f26R57

See: https://flagd.dev/providers/rust/?h=backoff#configuration-options

That way, there's consistency between the gRPC RPC-level retries and our stream cycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be easier to put this in a small util function in another file along with the nonRetryableCodes var if you do.

"BackoffMultiplier": 2.0,
"RetryableStatusCodes": [
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED"
"UNAVAILABLE"
]
}
}
]
}`

nonRetryableStatusCodes = `
[
"PermissionDenied",
"Unauthenticated"
]
`
)

// Set of non-retryable gRPC status codes for faster lookup
var nonRetryableCodes map[string]struct{}

// Type aliases for interfaces required by this component - needed for mock generation with gomock
type FlagSyncServiceClient interface {
syncv1grpc.FlagSyncServiceClient
Expand All @@ -78,6 +78,7 @@ type Sync struct {
Selector string
URI string
MaxMsgSize int
RetryGracePeriod int

// Runtime state
client FlagSyncServiceClient
Expand All @@ -92,6 +93,7 @@ type Sync struct {
// Init initializes the gRPC connection and starts background monitoring
func (g *Sync) Init(ctx context.Context) error {
g.Logger.Info(fmt.Sprintf("initializing gRPC client for %s", g.URI))
g.initNonRetryableStatusCodesSet()

// Initialize channels
g.shutdownComplete = make(chan struct{})
Expand Down Expand Up @@ -160,6 +162,20 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) {
return dialOptions, nil
}

// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
func (g *Sync) initNonRetryableStatusCodesSet() {
var codes []string
nonRetryableCodes = make(map[string]struct{})
trimmed := strings.TrimSpace(nonRetryableStatusCodes)
if err := json.Unmarshal([]byte(trimmed), &codes); err == nil {
for _, code := range codes {
nonRetryableCodes[code] = struct{}{}
}
} else {
g.Logger.Debug("parsing non-retryable status codes failed, retrying on all errors")
}
}

// ReSync performs a one-time fetch of all flags
func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
g.Logger.Debug("performing ReSync - fetching all flags")
Expand Down Expand Up @@ -207,12 +223,31 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}

// Attempt to create sync stream
if err := g.performSyncCycle(ctx, dataSync); err != nil {
err := g.performSyncCycle(ctx, dataSync)
if err != nil {
if ctx.Err() != nil {
g.Logger.Info("sync cycle failed due to context cancellation")
return ctx.Err()
}

// Check if error is a gRPC status error and if code is retryable
st, ok := status.FromError(err)
if ok {
codeStr := st.Code().String()
if _, found := nonRetryableCodes[codeStr]; found {
errStr := fmt.Sprintf("sync cycle failed with non-retryable status: %v, " +
"returning provider fatal.", codeStr)
g.Logger.Error(errStr)
return &of.ProviderInitError{
ErrorCode: of.ProviderFatalCode,
Message: errStr,
}
}
}

// Backoff before retrying
time.Sleep(time.Duration(g.RetryGracePeriod))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the FLAGD_RETRY_BACKOFF_MAX_MS param here instead, like we did in Java? I think it's a more sensible setting to use here.


g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err))
g.sendEvent(ctx, SyncEvent{event: of.ProviderError})

Expand Down
1 change: 1 addition & 0 deletions providers/flagd/pkg/service/in_process/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, str
ProviderID: cfg.ProviderID,
Selector: cfg.Selector,
URI: uri,
RetryGracePeriod: cfg.RetryGracePeriod,
}, uri
}

Expand Down
2 changes: 1 addition & 1 deletion providers/multi-provider/pkg/strategies/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func setFlagMetadata(strategyUsed EvaluationStrategy, successProviderName string
func cleanErrorMessage(msg string) string {
codeRegex := strings.Join([]string{
string(of.ProviderNotReadyCode),
// string(of.ProviderFatalCode), // TODO: not available until go-sdk 14
string(of.ProviderFatalCode),
string(of.FlagNotFoundCode),
string(of.ParseErrorCode),
string(of.TypeMismatchCode),
Expand Down
19 changes: 18 additions & 1 deletion tests/flagd/testframework/provider_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func InitializeProviderSteps(ctx *godog.ScenarioContext) {
// Generic provider step definition - accepts any provider type including "stable"
ctx.Step(`^a (\w+) flagd provider$`,
withState1Arg((*TestState).createSpecializedFlagdProvider))

ctx.Step(`^the client is in (\w+) state$`,
withState1Arg((*TestState).assertClientState))
}

// State methods - these now expect context as first parameter after state
Expand Down Expand Up @@ -111,6 +114,13 @@ func (s *TestState) simulateConnectionLoss(ctx context.Context, seconds int) err
return s.Container.Restart(seconds)
}

func (s *TestState) assertClientState(ctx context.Context, state string) error {
if string(s.Client.State()) == strings.ToUpper(state) {
return nil
}
return fmt.Errorf("expected client state %s but got %s", state, s.Client.State())
}

// createSpecializedFlagdProvider creates specialized flagd providers based on type
func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, providerType string) error {
// Apply specialized configuration based on provider type
Expand All @@ -128,7 +138,7 @@ func (s *TestState) createSpecializedFlagdProvider(ctx context.Context, provider
return fmt.Errorf("failed to create instance for %s provider: %w", providerType, err)
}

if providerType != "unavailable" {
if providerType != "unavailable" && providerType != "forbidden" {
if s.ProviderType == RPC {
// Small delay to allow flagd server to fully load flags after connection
time.Sleep(50 * time.Millisecond)
Expand All @@ -150,6 +160,7 @@ func (s *TestState) applySpecializedConfig(providerType string) error {
return nil
case "unavailable":
return s.configureUnavailableProvider()
case "forbidden": return s.configureForbiddenProvider()
case "socket":
return s.configureSocketProvider()
case "ssl", "tls":
Expand All @@ -173,6 +184,12 @@ func (s *TestState) configureUnavailableProvider() error {
return nil
}

func (s *TestState) configureForbiddenProvider() error {
// Set an Envoy port which always responds with forbidden
s.addProviderOption("port", "Integer", "9212")
return nil
}

func (s *TestState) configureSocketProvider() error {
// Configure for unix socket connection
s.addProviderOption("socketPath", "String", "/tmp/flagd.sock")
Expand Down