Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test:

# call with TESTCONTAINERS_RYUK_DISABLED="true" to avoid problems with podman on Macs
e2e:
go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=2m -tags=e2e {}
go clean -testcache && go list -f '{{.Dir}}/...' -m | xargs -I{} go test -timeout=3m -tags=e2e {}

lint:
go install -v github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
Expand Down
8 changes: 4 additions & 4 deletions providers/flagd/e2e/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ func TestConfiguration(t *testing.T) {
testCases := []configTestCase{
{
name: "All",
tags: "",
tags: "~@sync-port",
},
{
name: "RPC",
tags: "@rpc",
tags: "@rpc && ~@sync-port",
},
{
name: "InProcess",
tags: "@in-process",
tags: "@in-process && ~@sync-port",
},
{
name: "File",
tags: "@file",
tags: "@file && ~@sync-port",
},
}

Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestInProcessProviderE2E(t *testing.T) {
}

// Run tests with in-process specific tags
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload"
tags := "@in-process && ~@unixsocket && ~@metadata && ~@customCert && ~@contextEnrichment && ~@sync-payload && ~@sync-port"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/e2e/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRPCProviderE2E(t *testing.T) {
}

// Run tests with RPC-specific tags - exclude unimplemented scenarios
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching"
tags := "@rpc && ~@unixsocket && ~@targetURI && ~@sync && ~@metadata && ~@grace && ~@customCert && ~@caching && ~@forbidden && ~@sync-port"

if err := runner.RunGherkinTestsWithSubtests(t, featurePaths, tags); err != nil {
t.Fatalf("Gherkin tests failed: %v", err)
Expand Down
74 changes: 45 additions & 29 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package flagd
import (
"errors"
"fmt"
"os"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
"google.golang.org/grpc"
"os"
"strconv"
"strings"
)

type ResolverType string
Expand All @@ -26,6 +27,7 @@ const (
defaultHost = "localhost"
defaultResolver = rpc
defaultGracePeriod = 5
defaultFatalStatusCodes = ""

rpc ResolverType = "rpc"
inProcess ResolverType = "in-process"
Expand All @@ -45,6 +47,7 @@ const (
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD"
flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES"
)

type ProviderConfiguration struct {
Expand All @@ -66,6 +69,7 @@ type ProviderConfiguration struct {
CustomSyncProviderUri string
GrpcDialOptionsOverride []grpc.DialOption
RetryGracePeriod int
FatalStatusCodes []string

log logr.Logger
}
Expand All @@ -80,6 +84,7 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
Resolver: defaultResolver,
Tls: defaultTLS,
RetryGracePeriod: defaultGracePeriod,
FatalStatusCodes: strings.Split(defaultFatalStatusCodes, ","),
}

p.updateFromEnvVar()
Expand Down Expand Up @@ -130,6 +135,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {

// updateFromEnvVar is a utility to update configurations based on current environment variables
func (cfg *ProviderConfiguration) updateFromEnvVar() {

portS := os.Getenv(flagdPortEnvironmentVariableName)
if portS != "" {
port, err := strconv.Atoi(portS)
Expand Down Expand Up @@ -159,17 +165,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
cfg.CertPath = certificatePath
}

if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" {
maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize,
))
} else {
cfg.MaxCacheSize = maxCacheSizeFromEnv
}
}
cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log)

if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" {
switch cache.Type(cacheValue) {
Expand All @@ -185,18 +181,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
}
}

if maxEventStreamRetriesS := os.Getenv(
flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" {

maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries))
} else {
cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries
}
}
cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault(
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log)

if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" {
switch strings.ToLower(resolver) {
Expand Down Expand Up @@ -230,12 +216,34 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
cfg.RetryGracePeriod = seconds
} else {
// Handle parsing error
cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod))
cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
}
}

if fatalStatusCodes := os.Getenv(flagdFatalStatusCodesVariableName); fatalStatusCodes != "" {
fatalStatusCodesArr := strings.Split(fatalStatusCodes, ",")
for i, fatalStatusCode := range fatalStatusCodesArr {
fatalStatusCodesArr[i] = strings.TrimSpace(fatalStatusCode)
}
cfg.FatalStatusCodes = fatalStatusCodesArr
}
}

// Helper

func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int {
if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" {
intValue, err := strconv.Atoi(valueFromEnv)
if err != nil {
log.Error(err,
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
envVarName, defaultValue,
))
} else {
return intValue
}
}
return defaultValue
}

// ProviderOptions
Expand Down Expand Up @@ -415,3 +423,11 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption {
p.RetryGracePeriod = gracePeriod
}
}

// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up
// and put the provider in a PROVIDER_FATAL state
func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption {
return func(p *ProviderConfiguration) {
p.FatalStatusCodes = fatalStatusCodes
}
}
1 change: 1 addition & 0 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod,
FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes,
})
default:
service = process.NewInProcessService(process.Configuration{
Expand Down
66 changes: 66 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package process

import (
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"time"
)

const (
// Default timeouts and retry intervals
defaultKeepaliveTime = 30 * time.Second
defaultKeepaliveTimeout = 5 * time.Second
)

type RetryPolicy struct {
MaxAttempts int `json:"MaxAttempts"`
InitialBackoff string `json:"InitialBackoff"`
MaxBackoff string `json:"MaxBackoff"`
BackoffMultiplier float64 `json:"BackoffMultiplier"`
RetryableStatusCodes []string `json:"RetryableStatusCodes"`
}

func (g *Sync) buildRetryPolicy() string {
var policy = map[string]interface{}{
"methodConfig": []map[string]interface{}{
{
"name": []map[string]string{
{"service": "flagd.sync.v1.FlagSyncService"},
},
"retryPolicy": RetryPolicy{
MaxAttempts: 3,
InitialBackoff: "1s",
MaxBackoff: "5s",
BackoffMultiplier: 2.0,
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
},
},
},
}
retryPolicyBytes, _ := json.Marshal(policy)
retryPolicy := string(retryPolicyBytes)

return retryPolicy
}

// Set of non-retryable gRPC status codes for faster lookup
var nonRetryableCodes map[codes.Code]struct{}

// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
func (g *Sync) initNonRetryableStatusCodesSet() {
nonRetryableCodes = make(map[codes.Code]struct{})

for _, codeStr := range g.FatalStatusCodes {
// Wrap the string in quotes to match the expected JSON format
jsonStr := fmt.Sprintf(`"%s"`, codeStr)

var code codes.Code
if err := code.UnmarshalJSON([]byte(jsonStr)); err != nil {
g.Logger.Warn(fmt.Sprintf("unknown status code: %s, error: %v", codeStr, err))
continue
}

nonRetryableCodes[code] = struct{}{}
}
}
77 changes: 77 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package process

import (
"github.com/open-feature/flagd/core/pkg/logger"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"testing"
)

func TestSync_initNonRetryableStatusCodesSet(t *testing.T) {
tests := []struct {
name string
fatalStatusCodes []string
expectedCodes []codes.Code
notExpectedCodes []codes.Code
}{
{
name: "valid status codes",
fatalStatusCodes: []string{"UNAVAILABLE", "INTERNAL", "DEADLINE_EXCEEDED"},
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal, codes.DeadlineExceeded},
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
},
{
name: "empty array",
fatalStatusCodes: []string{},
expectedCodes: []codes.Code{},
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
},
{
name: "invalid status codes",
fatalStatusCodes: []string{"INVALID_CODE", "UNKNOWN_STATUS"},
expectedCodes: []codes.Code{},
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
},
{
name: "mixed valid and invalid codes",
fatalStatusCodes: []string{"UNAVAILABLE", "INVALID_CODE", "INTERNAL"},
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Reset the global map before each test
nonRetryableCodes = nil

s := &Sync{
FatalStatusCodes: tt.fatalStatusCodes,
Logger: &logger.Logger{
Logger: zap.NewNop(),
},
}

s.initNonRetryableStatusCodesSet()

// Verify expected codes are present
for _, code := range tt.expectedCodes {
if _, exists := nonRetryableCodes[code]; !exists {
t.Errorf("expected code %v to be in nonRetryableCodes, but it was not found", code)
}
}

// Verify not expected codes are absent
for _, code := range tt.notExpectedCodes {
if _, exists := nonRetryableCodes[code]; exists {
t.Errorf("did not expect code %v to be in nonRetryableCodes, but it was found", code)
}
}

// Verify the map size matches expected
if len(nonRetryableCodes) != len(tt.expectedCodes) {
t.Errorf("expected map size %d, got %d", len(tt.expectedCodes), len(nonRetryableCodes))
}
})
}
}
Loading
Loading