Skip to content

Commit 3d733ef

Browse files
return fatal only on initialization aka first stream sycle, use grpc numeric code representation for lookup of fatal status codes
Signed-off-by: Alexandra Oberaigner <[email protected]>
1 parent 0f4ea57 commit 3d733ef

File tree

3 files changed

+101
-92
lines changed

3 files changed

+101
-92
lines changed

providers/flagd/pkg/service/in_process/grpc_config.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package process
22

33
import (
44
"encoding/json"
5-
"strings"
5+
"fmt"
6+
"google.golang.org/grpc/codes"
67
"time"
78
)
89

@@ -29,10 +30,10 @@ func (g *Sync) buildRetryPolicy() string {
2930
},
3031
"retryPolicy": RetryPolicy{
3132
MaxAttempts: 3,
32-
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
33-
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
33+
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
34+
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
3435
BackoffMultiplier: 2.0,
35-
RetryableStatusCodes: []string{"UNKNOWN","UNAVAILABLE"},
36+
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
3637
},
3738
},
3839
},
@@ -44,24 +45,22 @@ func (g *Sync) buildRetryPolicy() string {
4445
}
4546

4647
// Set of non-retryable gRPC status codes for faster lookup
47-
var nonRetryableCodes map[string]struct{}
48+
var nonRetryableCodes map[codes.Code]struct{}
4849

4950
// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
50-
func (g *Sync) initNonRetryableStatusCodesSet() {
51-
nonRetryableCodes = make(map[string]struct{})
52-
for _, code := range g.FatalStatusCodes {
53-
normalized := toCamelCase(code)
54-
nonRetryableCodes[normalized] = struct{}{}
55-
}
56-
}
51+
func (g *Sync) initNonRetryableStatusCodesSet() {
52+
nonRetryableCodes = make(map[codes.Code]struct{})
53+
54+
for _, codeStr := range g.FatalStatusCodes {
55+
// Wrap the string in quotes to match the expected JSON format
56+
jsonStr := fmt.Sprintf(`"%s"`, codeStr)
5757

58-
// toCamelCase converts a SNAKE_CASE string to CamelCase
59-
func toCamelCase(s string) string {
60-
parts := strings.Split(strings.ToLower(s), "_")
61-
for i, part := range parts {
62-
if len(part) > 0 {
63-
parts[i] = strings.ToUpper(part[:1]) + part[1:]
58+
var code codes.Code
59+
if err := code.UnmarshalJSON([]byte(jsonStr)); err != nil {
60+
g.Logger.Warn(fmt.Sprintf("unknown status code: %s, error: %v", codeStr, err))
61+
continue
6462
}
63+
64+
nonRetryableCodes[code] = struct{}{}
6565
}
66-
return strings.Join(parts, "")
67-
}
66+
}

providers/flagd/pkg/service/in_process/grpc_config_test.go

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package process
22

33
import (
44
"encoding/json"
5-
"reflect"
5+
"github.com/open-feature/flagd/core/pkg/logger"
6+
"go.uber.org/zap"
7+
"google.golang.org/grpc/codes"
68
"strings"
79
"testing"
810
)
911

10-
1112
func TestBuildRetryPolicy(t *testing.T) {
1213
g := &Sync{
1314
RetryBackOffMs: 100,
@@ -68,58 +69,71 @@ func TestBuildRetryPolicy(t *testing.T) {
6869
}
6970
}
7071

71-
type syncTestCase struct {
72-
input []string
73-
expected map[string]struct{}
74-
}
75-
76-
func TestInitNonRetryableStatusCodesSet(t *testing.T) {
77-
testCases := []syncTestCase{
72+
func TestSync_initNonRetryableStatusCodesSet(t *testing.T) {
73+
tests := []struct {
74+
name string
75+
fatalStatusCodes []string
76+
expectedCodes []codes.Code
77+
notExpectedCodes []codes.Code
78+
}{
7879
{
79-
input: []string{"PERMISSION_DENIED", "UNKNOWN"},
80-
expected: map[string]struct{}{"PermissionDenied": {}, "Unknown": {}},
80+
name: "valid status codes",
81+
fatalStatusCodes: []string{"UNAVAILABLE", "INTERNAL", "DEADLINE_EXCEEDED"},
82+
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal, codes.DeadlineExceeded},
83+
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
8184
},
8285
{
83-
input: []string{"ALREADY_EXISTS"},
84-
expected: map[string]struct{}{"AlreadyExists": {}},
86+
name: "empty array",
87+
fatalStatusCodes: []string{},
88+
expectedCodes: []codes.Code{},
89+
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
8590
},
8691
{
87-
input: []string{},
88-
expected: map[string]struct{}{},
92+
name: "invalid status codes",
93+
fatalStatusCodes: []string{"INVALID_CODE", "UNKNOWN_STATUS"},
94+
expectedCodes: []codes.Code{},
95+
notExpectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
96+
},
97+
{
98+
name: "mixed valid and invalid codes",
99+
fatalStatusCodes: []string{"UNAVAILABLE", "INVALID_CODE", "INTERNAL"},
100+
expectedCodes: []codes.Code{codes.Unavailable, codes.Internal},
101+
notExpectedCodes: []codes.Code{codes.OK, codes.Unknown},
89102
},
90103
}
91104

92-
for _, tc := range testCases {
93-
g := &Sync{FatalStatusCodes: tc.input}
94-
nonRetryableCodes = nil // reset global
95-
g.initNonRetryableStatusCodesSet()
96-
if !reflect.DeepEqual(nonRetryableCodes, tc.expected) {
97-
t.Errorf("input: %v, got: %v, want: %v", tc.input, nonRetryableCodes, tc.expected)
98-
}
99-
}
100-
}
105+
for _, tt := range tests {
106+
t.Run(tt.name, func(t *testing.T) {
107+
// Reset the global map before each test
108+
nonRetryableCodes = nil
109+
110+
s := &Sync{
111+
FatalStatusCodes: tt.fatalStatusCodes,
112+
Logger: &logger.Logger{
113+
Logger: zap.NewNop(),
114+
},
115+
}
116+
117+
s.initNonRetryableStatusCodesSet()
101118

102-
func TestToCamelCase(t *testing.T) {
103-
testCases := []struct {
104-
input string
105-
expected string
106-
}{
107-
{"INVALID_ARGUMENT", "InvalidArgument"},
108-
{"NOT_FOUND", "NotFound"},
109-
{"ALREADY_EXISTS", "AlreadyExists"},
110-
{"UNKNOWN", "Unknown"},
111-
{"", ""},
112-
{"SINGLE", "Single"},
113-
{"MULTI_WORD_EXAMPLE", "MultiWordExample"},
114-
{"_LEADING_UNDERSCORE", "LeadingUnderscore"},
115-
{"TRAILING_UNDERSCORE_", "TrailingUnderscore"},
116-
{"__DOUBLE__UNDERSCORES__", "DoubleUnderscores"},
117-
}
119+
// Verify expected codes are present
120+
for _, code := range tt.expectedCodes {
121+
if _, exists := nonRetryableCodes[code]; !exists {
122+
t.Errorf("expected code %v to be in nonRetryableCodes, but it was not found", code)
123+
}
124+
}
118125

119-
for _, tc := range testCases {
120-
got := toCamelCase(tc.input)
121-
if got != tc.expected {
122-
t.Errorf("toCamelCase(%q) = %q; want %q", tc.input, got, tc.expected)
123-
}
124-
}
125-
}
126+
// Verify not expected codes are absent
127+
for _, code := range tt.notExpectedCodes {
128+
if _, exists := nonRetryableCodes[code]; exists {
129+
t.Errorf("did not expect code %v to be in nonRetryableCodes, but it was found", code)
130+
}
131+
}
132+
133+
// Verify the map size matches expected
134+
if len(nonRetryableCodes) != len(tt.expectedCodes) {
135+
t.Errorf("expected map size %d, got %d", len(tt.expectedCodes), len(nonRetryableCodes))
136+
}
137+
})
138+
}
139+
}

providers/flagd/pkg/service/in_process/grpc_sync.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ type Sync struct {
3838
Selector string
3939
URI string
4040
MaxMsgSize int
41-
RetryGracePeriod int
42-
RetryBackOffMs int
43-
RetryBackOffMaxMs int
44-
FatalStatusCodes []string
41+
RetryGracePeriod int
42+
RetryBackOffMs int
43+
RetryBackOffMaxMs int
44+
FatalStatusCodes []string
4545

4646
// Runtime state
4747
client FlagSyncServiceClient
@@ -125,8 +125,6 @@ func (g *Sync) buildDialOptions() ([]grpc.DialOption, error) {
125125
return dialOptions, nil
126126
}
127127

128-
129-
130128
// ReSync performs a one-time fetch of all flags
131129
func (g *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
132130
g.Logger.Debug("performing ReSync - fetching all flags")
@@ -160,8 +158,6 @@ func (g *Sync) IsReady() bool {
160158
func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
161159
g.Logger.Info("starting continuous flag synchronization")
162160

163-
time.Sleep(500 * time.Millisecond)
164-
165161
// Ensure shutdown completion is signaled when THIS method exits
166162
defer g.markShutdownComplete()
167163

@@ -183,17 +179,18 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
183179
return ctx.Err()
184180
}
185181

186-
// Check if error is a gRPC status error and if code is retryable
187-
st, ok := status.FromError(err)
188-
if ok {
189-
codeStr := st.Code().String()
190-
if _, found := nonRetryableCodes[codeStr]; found {
191-
errStr := fmt.Sprintf("sync cycle failed with non-retryable status: %v, " +
192-
"returning provider fatal.", codeStr)
193-
g.Logger.Error(errStr)
194-
return &of.ProviderInitError{
195-
ErrorCode: of.ProviderFatalCode,
196-
Message: errStr,
182+
// check for non-retryable errors during initialization, if found return with FATAL
183+
if !g.IsReady() {
184+
st, ok := status.FromError(err)
185+
if ok {
186+
if _, found := nonRetryableCodes[st.Code()]; found {
187+
errStr := fmt.Sprintf("first sync cycle failed with non-retryable status: %v, "+
188+
"returning provider fatal.", st.Code().String())
189+
g.Logger.Error(errStr)
190+
return &of.ProviderInitError{
191+
ErrorCode: of.ProviderFatalCode,
192+
Message: errStr,
193+
}
197194
}
198195
}
199196
}
@@ -236,12 +233,6 @@ func (g *Sync) performSyncCycle(ctx context.Context, dataSync chan<- sync.DataSy
236233

237234
// handleFlagSync processes messages from the sync stream with proper context handling
238235
func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncService_SyncFlagsClient, dataSync chan<- sync.DataSync) error {
239-
// Mark as ready on first successful stream
240-
g.initializer.Do(func() {
241-
g.ready = true
242-
g.Logger.Info("sync service is now ready")
243-
})
244-
245236
// Create channels for stream communication
246237
streamChan := make(chan *v1.SyncFlagsResponse, 1)
247238
errChan := make(chan error, 1)
@@ -281,6 +272,11 @@ func (g *Sync) handleFlagSync(ctx context.Context, stream syncv1grpc.FlagSyncSer
281272
return err
282273
}
283274

275+
// Mark as ready on first successful stream
276+
g.initializer.Do(func() {
277+
g.ready = true
278+
g.Logger.Info("sync service is now ready")
279+
})
284280
case err := <-errChan:
285281
return fmt.Errorf("stream error: %w", err)
286282

0 commit comments

Comments
 (0)