Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/flagd/e2e/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) {
}

// Run tests with in-process specific tags
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload"
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@sync-port"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) {
}

// Run tests with RPC-specific tags - exclude unimplemented scenarios
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching"
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
74 changes: 45 additions & 29 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package flagd
import (
"errors"
"fmt"
"os"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
"google.golang.org/grpc"
"os"
"strconv"
"strings"
)

type ResolverType string
Expand All @@ -26,6 +27,7 @@ const (
defaultHost = "localhost"
defaultResolver = rpc
defaultGracePeriod = 5
defaultFatalStatusCodes = ""

rpc ResolverType = "rpc"
inProcess ResolverType = "in-process"
Expand All @@ -45,6 +47,7 @@ const (
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD"
flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES"
)

type ProviderConfiguration struct {
Expand All @@ -66,6 +69,7 @@ type ProviderConfiguration struct {
CustomSyncProviderUri string
GrpcDialOptionsOverride []grpc.DialOption
RetryGracePeriod int
FatalStatusCodes []string

log logr.Logger
}
Expand All @@ -80,6 +84,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
Resolver: defaultResolver,
Tls: defaultTLS,
RetryGracePeriod: defaultGracePeriod,
FatalStatusCodes: strings.Split(defaultFatalStatusCodes, ","),
}

p.updateFromEnvVar()
Expand Down Expand Up @@ -130,6 +135,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {

// updateFromEnvVar is a utility to update configurations based on current environment variables
func (cfg *ProviderConfiguration) updateFromEnvVar() {

portS := os.Getenv(flagdPortEnvironmentVariableName)
if portS != "" {
port, err := strconv.Atoi(portS)
Expand Down Expand Up @@ -159,17 +165,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
cfg.CertPath = certificatePath
}

if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" {
maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize,
))
} else {
cfg.MaxCacheSize = maxCacheSizeFromEnv
}
}
cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log)

if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" {
switch cache.Type(cacheValue) {
Expand All @@ -185,18 +181,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
}
}

if maxEventStreamRetriesS := os.Getenv(
flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" {

maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries))
} else {
cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries
}
}
cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault(
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log)

if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" {
switch strings.ToLower(resolver) {
Expand Down Expand Up @@ -230,12 +216,34 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
cfg.RetryGracePeriod = seconds
} else {
// Handle parsing error
cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod))
cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
}
}

if fatalStatusCodes := os.Getenv(flagdFatalStatusCodesVariableName); fatalStatusCodes != "" {
fatalStatusCodesArr := strings.Split(fatalStatusCodes, ",")
for i, fatalStatusCode := range fatalStatusCodesArr {
fatalStatusCodesArr[i] = strings.TrimSpace(fatalStatusCode)
}
cfg.FatalStatusCodes = fatalStatusCodesArr
}
}

// Helper

func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int {
if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" {
intValue, err := strconv.Atoi(valueFromEnv)
if err != nil {
log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
envVarName, defaultValue,
))
} else {
return intValue
}
}
return defaultValue
}

// ProviderOptions
Expand Down Expand Up @@ -415,3 +423,11 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption {
p.RetryGracePeriod = gracePeriod
}
}

// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up
// and put the provider in a PROVIDER_FATAL state
func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption {
return func(p *ProviderConfiguration) {
p.FatalStatusCodes = fatalStatusCodes
}
}
1 change: 1 addition & 0 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod,
FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes,
})
default:
service = process.NewInProcessService(process.Configuration{
Expand Down
66 changes: 66 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package process

import (
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"time"
)

const (
// Default timeouts and retry intervals
defaultKeepaliveTime = 30 * time.Second
defaultKeepaliveTimeout = 5 * time.Second
)

type RetryPolicy struct {
MaxAttempts int `json:"MaxAttempts"`
InitialBackoff string `json:"InitialBackoff"`
MaxBackoff string `json:"MaxBackoff"`
BackoffMultiplier float64 `json:"BackoffMultiplier"`
RetryableStatusCodes []string `json:"RetryableStatusCodes"`
}

func (g *Sync) buildRetryPolicy() string {
var policy = map[string]interface{}{
"methodConfig": []map[string]interface{}{
{
"name": []map[string]string{
{"service": "flagd.sync.v1.FlagSyncService"},
},
"retryPolicy": RetryPolicy{
MaxAttempts: 3,
InitialBackoff: "1s",
MaxBackoff: "5s",
BackoffMultiplier: 2.0,
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
},
},
},
}
retryPolicyBytes, _ := json.Marshal(policy)
retryPolicy := string(retryPolicyBytes)

return retryPolicy
}

// Set of non-retryable gRPC status codes for faster lookup
var nonRetryableCodes map[codes.Code]struct{}

// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
func (g *Sync) initNonRetryableStatusCodesSet() {
nonRetryableCodes = make(map[codes.Code]struct{})

for _, codeStr := range g.FatalStatusCodes {
// Wrap the string in quotes to match the expected JSON format
jsonStr := fmt.Sprintf(`"%s"`, codeStr)

var code codes.Code
if err := code.UnmarshalJSON([]byte(jsonStr)); err != nil {
g.Logger.Warn(fmt.Sprintf("unknown status code: %s, error: %v", codeStr, err))
continue
}

nonRetryableCodes[code] = struct{}{}
}
}
77 changes: 77 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package process

import (
"github.com/open-feature/flagd/core/pkg/logger"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"testing"
)

func TestSync_initNonRetryableStatusCodesSet(t *testing.T) {
tests := []struct {
name string
fatalStatusCodes []string
expectedCodes []codes.Code
notExpectedCodes []codes.Code
}{
{
name: "valid status codes",
fatalStatusCodes: []string{"UNAVAILABLE", "INTERNAL", "DEADLINE_EXCEEDED"},
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal, codes.DeadlineExceeded},
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
},
{
name: "empty array",
fatalStatusCodes: []string{},
expectedCodes: []codes.Code{},
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
},
{
name: "invalid status codes",
fatalStatusCodes: []string{"INVALID_CODE", "UNKNOWN_STATUS"},
expectedCodes: []codes.Code{},
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
},
{
name: "mixed valid and invalid codes",
fatalStatusCodes: []string{"UNAVAILABLE", "INVALID_CODE", "INTERNAL"},
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Reset the global map before each test
nonRetryableCodes = nil

s := &Sync{
FatalStatusCodes: tt.fatalStatusCodes,
Logger: &logger.Logger{
Logger: zap.NewNop(),
},
}

s.initNonRetryableStatusCodesSet()

// Verify expected codes are present
for _, code := range tt.expectedCodes {
if _, exists := nonRetryableCodes[code]; !exists {
t.Errorf("expected code %v to be in nonRetryableCodes, but it was not found", code)
}
}

// Verify not expected codes are absent
for _, code := range tt.notExpectedCodes {
if _, exists := nonRetryableCodes[code]; exists {
t.Errorf("did not expect code %v to be in nonRetryableCodes, but it was found", code)
}
}

// Verify the map size matches expected
if len(nonRetryableCodes) != len(tt.expectedCodes) {
t.Errorf("expected map size %d, got %d", len(tt.expectedCodes), len(nonRetryableCodes))
}
})
}
}
Loading
Loading