Skip to content

Commit a551d0c

Browse files
authored
FFM-9485 Improve SSE Reconnection Logic (#133)
1 parent d3d1765 commit a551d0c

File tree

7 files changed

+178
-98
lines changed

7 files changed

+178
-98
lines changed

client/client.go

Lines changed: 112 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"errors"
77
"fmt"
88
"github.com/harness/ff-golang-server-sdk/sdk_codes"
9+
"golang.org/x/sync/errgroup"
910
"log"
1011
"math/rand"
1112
"net/http"
13+
"strconv"
1214
"strings"
1315
"sync"
1416
"sync/atomic"
@@ -53,6 +55,7 @@ type CfClient struct {
5355
token string
5456
streamConnected bool
5557
streamConnectedLock sync.RWMutex
58+
streamDisconnected chan struct{}
5659
authenticated chan struct{}
5760
postEvalChan chan evaluation.PostEvalData
5861
initializedBool bool
@@ -78,16 +81,17 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
7881
analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger)
7982

8083
client := &CfClient{
81-
sdkKey: sdkKey,
82-
config: config,
83-
authenticated: make(chan struct{}),
84-
analyticsService: analyticsService,
85-
clusterIdentifier: "1",
86-
postEvalChan: make(chan evaluation.PostEvalData),
87-
stop: make(chan struct{}),
88-
stopped: newAtomicBool(false),
89-
initialized: make(chan struct{}),
90-
initializedErr: make(chan error),
84+
sdkKey: sdkKey,
85+
config: config,
86+
authenticated: make(chan struct{}),
87+
analyticsService: analyticsService,
88+
clusterIdentifier: "1",
89+
postEvalChan: make(chan evaluation.PostEvalData),
90+
stop: make(chan struct{}),
91+
stopped: newAtomicBool(false),
92+
initialized: make(chan struct{}),
93+
initializedErr: make(chan error),
94+
streamDisconnected: make(chan struct{}),
9195
}
9296

9397
if sdkKey == "" {
@@ -154,6 +158,9 @@ func (c *CfClient) start() {
154158
}()
155159
go c.setAnalyticsServiceClient(ctx)
156160
go c.pullCronJob(ctx)
161+
if c.config.enableStream {
162+
go c.stream(ctx)
163+
}
157164
}
158165

159166
// PostEvaluateProcessor push the data to the analytics service
@@ -186,47 +193,52 @@ func (c *CfClient) IsInitialized() (bool, error) {
186193
return false, InitializeTimeoutError{}
187194
}
188195

189-
func (c *CfClient) retrieve(ctx context.Context) bool {
190-
ok := true
191-
var wg sync.WaitGroup
192-
wg.Add(2)
193-
go func() {
194-
defer wg.Done()
195-
rCtx, cancel := context.WithTimeout(ctx, time.Minute)
196-
defer cancel()
196+
func (c *CfClient) retrieve(ctx context.Context) {
197+
var g errgroup.Group
198+
199+
rCtx, cancel := context.WithTimeout(ctx, time.Minute)
200+
defer cancel()
201+
202+
// First goroutine for retrieving flags.
203+
g.Go(func() error {
197204
err := c.retrieveFlags(rCtx)
198205
if err != nil {
199-
ok = false
200-
c.config.Logger.Errorf("error while retrieving flags: %v", err.Error())
206+
c.config.Logger.Errorf("error while retrieving flags: %v", err)
207+
return err
201208
}
202-
}()
209+
return nil
210+
})
203211

204-
go func() {
205-
defer wg.Done()
206-
rCtx, cancel := context.WithTimeout(ctx, time.Minute)
207-
defer cancel()
212+
// Second goroutine for retrieving segments.
213+
g.Go(func() error {
208214
err := c.retrieveSegments(rCtx)
209215
if err != nil {
210-
ok = false
211-
c.config.Logger.Errorf("error while retrieving segments: %v", err.Error())
216+
c.config.Logger.Errorf("error while retrieving segments: %v", err)
217+
return err
212218
}
213-
}()
214-
wg.Wait()
215-
if ok {
216-
c.config.Logger.Info("Data poll finished successfully")
219+
return nil
220+
})
221+
222+
err := g.Wait()
223+
224+
if err != nil {
225+
// We just log the error and continue. In the case of initialization, this means we mark the client as initialized
226+
// if we can't poll for initial state, and default evaluations are likely to be returned.
227+
c.config.Logger.Errorf("Data poll finished with errors: %s", err)
217228
} else {
218-
c.config.Logger.Error("Data poll finished with errors")
229+
c.config.Logger.Info("Data poll finished successfully")
219230
}
220231

221-
if ok {
222-
// This flag is used by `IsInitialized` so set to true.
223-
c.initializedBoolLock.Lock()
224-
c.initializedBool = true
225-
c.initializedBoolLock.Unlock()
232+
c.initializedBoolLock.Lock()
233+
defer c.initializedBoolLock.Unlock()
226234

235+
// This function is used to mark the client as "initialized" once flags and segments have been loaded,
236+
// but it's also used for the polling thread, so we check if the client is already initialized before
237+
// marking it as such.
238+
if !c.initializedBool {
239+
c.initializedBool = true
227240
close(c.initialized)
228241
}
229-
return ok
230242
}
231243

232244
func (c *CfClient) streamConnect(ctx context.Context) {
@@ -247,28 +259,12 @@ func (c *CfClient) streamConnect(ctx context.Context) {
247259
// Use the SDKs http client
248260
sseClient.Connection = c.config.httpClient
249261

250-
streamErr := func() {
251-
c.config.Logger.Warnf("%s Stream disconnected. Swapping to polling mode", sdk_codes.StreamDisconnected)
252-
c.mux.RLock()
253-
defer c.mux.RUnlock()
254-
c.streamConnected = false
255-
256-
// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
257-
// to let it know something is up with the stream it has been listening to
258-
if c.config.eventStreamListener != nil {
259-
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
260-
APIKey: c.sdkKey,
261-
Environment: c.environmentID,
262-
Err: stream.ErrStreamDisconnect,
263-
})
264-
}
265-
}
266-
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger, streamErr,
267-
c.config.eventStreamListener, c.config.proxyMode)
262+
conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger,
263+
c.config.eventStreamListener, c.config.proxyMode, c.streamDisconnected)
268264

269265
// Connect kicks off a goroutine that attempts to establish a stream connection
270266
// while this is happening we set streamConnected to true - if any errors happen
271-
// in this process streamConnected will be set back to false by the streamErr function
267+
// in this process streamConnected will be set back to false by the streamDisconnected function
272268
conn.Connect(ctx, c.environmentID, c.sdkKey)
273269
c.streamConnected = true
274270
}
@@ -303,7 +299,14 @@ func (c *CfClient) initAuthentication(ctx context.Context) error {
303299
jitter := time.Duration(rand.Float64() * float64(currentDelay))
304300
delayWithJitter := currentDelay + jitter
305301

306-
c.config.Logger.Errorf("%s Authentication failed with error: '%s'. Retrying in %v.", sdk_codes.AuthAttempt, err, delayWithJitter)
302+
maxAttemptLog := ""
303+
if c.config.maxAuthRetries == -1 {
304+
maxAttemptLog = "∞"
305+
} else {
306+
maxAttemptLog = strconv.Itoa(c.config.maxAuthRetries)
307+
}
308+
309+
c.config.Logger.Errorf("%s Authentication attempt %d of %s failed with error: '%s'. Retrying in %v.", sdk_codes.AuthAttempt, attempts, maxAttemptLog, err, delayWithJitter)
307310
c.config.sleeper.Sleep(delayWithJitter)
308311

309312
currentDelay *= time.Duration(factor)
@@ -321,7 +324,7 @@ func (c *CfClient) authenticate(ctx context.Context) error {
321324
defer c.mux.RUnlock()
322325

323326
// dont check err just retry
324-
httpClient, err := rest.NewClientWithResponses(c.config.url, rest.WithHTTPClient(c.config.httpClient))
327+
httpClient, err := rest.NewClientWithResponses(c.config.url, rest.WithHTTPClient(c.config.authHttpClient))
325328
if err != nil {
326329
return err
327330
}
@@ -420,25 +423,64 @@ func (c *CfClient) makeTicker(interval uint) *time.Ticker {
420423
return time.NewTicker(time.Second * time.Duration(interval))
421424
}
422425

426+
func (c *CfClient) stream(ctx context.Context) {
427+
// wait until initialized with initial state
428+
<-c.initialized
429+
c.config.Logger.Infof("%s Polling Stopped", sdk_codes.PollStop)
430+
c.config.Logger.Info("Attempting to start stream")
431+
c.streamConnect(ctx)
432+
433+
const maxBackoffDuration = 2 * time.Minute
434+
backoffDuration := 2 * time.Second
435+
for {
436+
select {
437+
case <-ctx.Done():
438+
c.config.Logger.Infof("%s Stream stopped", sdk_codes.StreamStop)
439+
return
440+
case <-c.streamDisconnected:
441+
c.config.Logger.Warnf("%s Stream disconnected. Swapping to polling mode", sdk_codes.StreamDisconnected)
442+
c.mux.RLock()
443+
c.streamConnected = false
444+
c.mux.RUnlock()
445+
446+
// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
447+
// to let it know something is up with the stream it has been listening to
448+
if c.config.eventStreamListener != nil {
449+
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
450+
APIKey: c.sdkKey,
451+
Environment: c.environmentID,
452+
Err: stream.ErrStreamDisconnect,
453+
})
454+
}
455+
456+
time.Sleep(backoffDuration)
457+
458+
c.config.Logger.Info("Attempting to restart stream")
459+
c.streamConnect(ctx)
460+
461+
if backoffDuration > maxBackoffDuration {
462+
backoffDuration = maxBackoffDuration
463+
return
464+
}
465+
466+
backoffDuration *= 2
467+
468+
}
469+
}
470+
}
471+
423472
func (c *CfClient) pullCronJob(ctx context.Context) {
424473
poll := func() {
425474
c.mux.RLock()
426-
c.config.Logger.Infof("%s Polling started, interval: %v", sdk_codes.PollStart, c.config.pullInterval)
475+
defer c.mux.RUnlock()
427476
if !c.streamConnected {
428-
ok := c.retrieve(ctx)
429-
// we should only try and start the stream after the poll succeeded to make sure we get the latest changes
430-
if ok && c.config.enableStream {
431-
c.config.Logger.Infof("%s Polling Stopped", sdk_codes.PollStop)
432-
// here stream is enabled but not connected, so we attempt to reconnect
433-
c.config.Logger.Info("Attempting to start stream")
434-
c.streamConnect(ctx)
435-
}
477+
c.retrieve(ctx)
436478
}
437-
c.mux.RUnlock()
438479
}
439480
// wait until authenticated
440481
<-c.authenticated
441482

483+
c.config.Logger.Infof("%s Polling started, interval: %v seconds", sdk_codes.PollStart, c.config.pullInterval)
442484
// pull initial data
443485
poll()
444486

@@ -449,7 +491,6 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
449491
case <-ctx.Done():
450492
pullingTicker.Stop()
451493
c.config.Logger.Infof("%s Polling stopped", sdk_codes.PollStop)
452-
c.config.Logger.Infof("%s Stream stopped", sdk_codes.StreamStop)
453494
return
454495
case <-pullingTicker.C:
455496
poll()
@@ -471,7 +512,7 @@ func (c *CfClient) retrieveFlags(ctx context.Context) error {
471512
}
472513

473514
if flags.JSON200 == nil {
474-
return nil
515+
return fmt.Errorf("%w: `%v`", FetchFlagsError, flags.HTTPResponse.Status)
475516
}
476517

477518
c.repository.SetFlags(true, c.environmentID, *flags.JSON200...)

client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ var FeatureConfigsResponse = func(req *http.Request) (*http.Response, error) {
747747
}
748748

749749
func TestCfClient_Close(t *testing.T) {
750-
registerResponders(AuthResponse(200, ValidAuthToken), TargetSegmentsResponse, TargetSegmentsResponse)
750+
registerResponders(AuthResponse(200, ValidAuthToken), TargetSegmentsResponse, FeatureConfigsResponse)
751751
client, err := newClient(&http.Client{}, ValidSDKKey, WithWaitForInitialized(true))
752752
if err != nil {
753753
t.Error(err)

client/config.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package client
22

33
import (
4+
"fmt"
5+
"github.com/harness/ff-golang-server-sdk/cache"
46
"github.com/harness/ff-golang-server-sdk/evaluation"
7+
"github.com/harness/ff-golang-server-sdk/logger"
8+
"github.com/harness/ff-golang-server-sdk/storage"
59
"github.com/harness/ff-golang-server-sdk/stream"
610
"github.com/harness/ff-golang-server-sdk/types"
11+
"github.com/hashicorp/go-retryablehttp"
712
"net/http"
813
"os"
9-
10-
"github.com/harness/ff-golang-server-sdk/cache"
11-
"github.com/harness/ff-golang-server-sdk/logger"
12-
"github.com/harness/ff-golang-server-sdk/storage"
13-
"github.com/hashicorp/go-retryablehttp"
14+
"time"
1415
)
1516

1617
type config struct {
@@ -21,6 +22,7 @@ type config struct {
2122
Store storage.Storage
2223
Logger logger.Logger
2324
httpClient *http.Client
25+
authHttpClient *http.Client
2426
enableStream bool
2527
enableStore bool
2628
target evaluation.Target
@@ -39,9 +41,33 @@ func newDefaultConfig(log logger.Logger) *config {
3941
defaultStore = storage.NewFileStore("defaultProject", storage.GetHarnessDir(log), log)
4042
}
4143

42-
retryClient := retryablehttp.NewClient()
43-
retryClient.RetryMax = 10
44-
retryClient.Logger = logger.NewRetryableLogger(log)
44+
const requestTimeout = time.Second * 30
45+
46+
// Authentication uses a default http client + timeout as we have our own custom retry logic for authentication.
47+
authHttpClient := &http.Client{}
48+
authHttpClient.Timeout = requestTimeout
49+
50+
// Remaining requests use a go-retryablehttp client to handle retries.
51+
requestHttpClient := retryablehttp.NewClient()
52+
requestHttpClient.Logger = logger.NewRetryableLogger(log)
53+
requestHttpClient.RetryMax = 10
54+
55+
// Assign a custom ErrorHandler. By default, the go-retryablehttp library doesn't return the final
56+
// network error from the server but instead reports that it has exhausted all retry attempts.
57+
requestHttpClient.ErrorHandler = func(resp *http.Response, err error, numTries int) (*http.Response, error) {
58+
message := ""
59+
if resp != nil {
60+
message = fmt.Sprintf("Error after '%d' connection attempts: '%s'", numTries, resp.Status)
61+
}
62+
63+
// In practice, the error is usually nil and the response is used, but include this for any
64+
// edge cases.
65+
if err != nil {
66+
message = fmt.Sprintf("Error after %d connection attempts: %v\n", numTries, err)
67+
}
68+
69+
return resp, fmt.Errorf(message)
70+
}
4571

4672
return &config{
4773
url: "https://config.ff.harness.io/api/1.0",
@@ -50,7 +76,8 @@ func newDefaultConfig(log logger.Logger) *config {
5076
Cache: defaultCache,
5177
Store: defaultStore,
5278
Logger: log,
53-
httpClient: retryClient.StandardClient(),
79+
authHttpClient: authHttpClient,
80+
httpClient: requestHttpClient.StandardClient(),
5481
enableStream: true,
5582
enableStore: true,
5683
enableAnalytics: true,

client/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
var (
99
EmptySDKKeyError = errors.New("default variation was returned")
1010
DefaultVariationReturnedError = errors.New("default variation was returned")
11+
FetchFlagsError = errors.New("fetching flags failed")
1112
)
1213

1314
type NonRetryableAuthError struct {

client/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ func WithStoreEnabled(val bool) ConfigOption {
8383
}
8484
}
8585

86-
// WithHTTPClient set http client for use in interactions with ff server
86+
// WithHTTPClient set auth and http client for use in interactions with ff server
8787
func WithHTTPClient(client *http.Client) ConfigOption {
8888
return func(config *config) {
89+
config.authHttpClient = client
8990
config.httpClient = client
9091
}
9192
}

0 commit comments

Comments
 (0)