Skip to content

Commit 0767148

Browse files
authored
FFM-7044 SSE Fix Race Condition + Add usage headers to requests (#135)
1 parent 3408b8a commit 0767148

File tree

16 files changed

+138
-1593
lines changed

16 files changed

+138
-1593
lines changed

analyticsservice/analytics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const (
2525
variationValueAttribute string = "featureValue"
2626
targetAttribute string = "target"
2727
sdkVersionAttribute string = "SDK_VERSION"
28-
sdkVersion string = "0.1.14"
28+
SdkVersion string = "0.1.16"
2929
sdkTypeAttribute string = "SDK_TYPE"
3030
sdkType string = "server"
3131
sdkLanguageAttribute string = "SDK_LANGUAGE"
@@ -219,7 +219,7 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
219219
},
220220
{
221221
Key: sdkVersionAttribute,
222-
Value: sdkVersion,
222+
Value: SdkVersion,
223223
},
224224
}
225225

client/client.go

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ func (c *CfClient) streamConnect(ctx context.Context) {
266266
// while this is happening we set streamConnectedBool to true - if any errors happen
267267
// in this process streamConnectedBool will be set back to false by the streamDisconnected function
268268
conn.Connect(ctx, c.environmentID, c.sdkKey)
269-
c.streamConnectedBool = true
270269
}
271270

272271
func (c *CfClient) initAuthentication(ctx context.Context) error {
@@ -390,6 +389,31 @@ func (c *CfClient) authenticate(ctx context.Context) error {
390389
if bearerTokenProviderErr != nil {
391390
return bearerTokenProviderErr
392391
}
392+
393+
// Use a custom transport which adds headers for tracking usage
394+
// The `WithRequestEditorFn` cannot be used for SSE requests, so we need to provide a custom transport to the
395+
// http client so that these headers can be added to all requests.
396+
getHeadersFn := func() (map[string]string, error) {
397+
return map[string]string{
398+
"User-Agent": "GoSDK/" + analyticsservice.SdkVersion,
399+
"Harness-SDK-Info": fmt.Sprintf("Go %s Server", analyticsservice.SdkVersion),
400+
"Harness-EnvironmentID": c.environmentID,
401+
}, nil
402+
}
403+
404+
// Wrap the httpClient's transport with our own custom transport, which currently just adds extra headers
405+
// for analytics purposes.
406+
// If the httpClient doesn't have a Transport we can honour, then just use a default transport.
407+
var baseTransport http.RoundTripper
408+
if c.config.httpClient.Transport != nil {
409+
baseTransport = c.config.httpClient.Transport
410+
} else {
411+
baseTransport = http.DefaultTransport
412+
}
413+
customTrans := NewCustomTransport(baseTransport, getHeadersFn)
414+
415+
c.config.httpClient.Transport = customTrans
416+
393417
restClient, err := rest.NewClientWithResponses(c.config.url,
394418
rest.WithRequestEditorFn(bearerTokenProvider.Intercept),
395419
rest.WithRequestEditorFn(c.InterceptAddCluster),
@@ -425,70 +449,68 @@ func (c *CfClient) stream(ctx context.Context) {
425449
c.streamConnect(ctx)
426450

427451
streamingRetryStrategy := c.config.streamingRetryStrategy
428-
// Create an initial ticker to handle the case where we don't open a succesful connection on our first attempt
429-
ticker := backoff.NewTicker(streamingRetryStrategy)
430452

431-
defer ticker.Stop()
432453
reconnectionAttempt := 1
454+
433455
for {
434456
select {
435457
case <-ctx.Done():
436458
c.config.Logger.Infof("%s Stream stopped", sdk_codes.StreamStop)
437-
if ticker != nil {
438-
ticker.Stop()
439-
}
440459
return
441460

442461
case <-c.streamConnectedChan:
443462
c.config.Logger.Infof("%s Stream successfully connected", sdk_codes.StreamStarted)
444463
c.config.Logger.Infof("%s Polling Stopped", sdk_codes.PollStop)
445464

446-
// Reset the reconnection attempt and ticker
447-
reconnectionAttempt = 1
465+
// Ensure reconnection strategy is reset
448466
streamingRetryStrategy.Reset()
449-
ticker.Stop()
450-
ticker = nil
467+
reconnectionAttempt = 1
451468

452-
case err := <-c.streamDisconnectedChan:
453-
c.config.Logger.Warnf("%s Stream disconnected: %s", sdk_codes.StreamDisconnected, err)
454-
c.config.Logger.Infof("%s Polling started, interval: %v seconds", sdk_codes.PollStart, c.config.pullInterval)
455469
c.mux.RLock()
456-
c.streamConnectedBool = false
470+
c.streamConnectedBool = true
457471
c.mux.RUnlock()
458472

459-
// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
460-
// to let it know something is up with the stream it has been listening to
461-
if c.config.eventStreamListener != nil {
462-
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
463-
APIKey: c.sdkKey,
464-
Environment: c.environmentID,
465-
Err: stream.ErrStreamDisconnect,
466-
})
467-
}
468-
469-
if ticker == nil {
470-
ticker = backoff.NewTicker(streamingRetryStrategy)
471-
}
472-
473-
// Note, the retry interval logged here is just an approximate value.
474-
// NextBackOff() will not be exactly the same value when used by the underlying ticker.
475-
// There is currently no way to get the interval that the ticker has used.
476-
c.config.Logger.Infof("%s Retrying stream connection in %fs (attempt %d)", sdk_codes.StreamRetry, streamingRetryStrategy.NextBackOff().Seconds(), reconnectionAttempt)
473+
case err := <-c.streamDisconnectedChan:
474+
c.notifyStreamDisconnect(err)
475+
476+
nextBackOff := streamingRetryStrategy.NextBackOff()
477+
c.config.Logger.Infof("%s Retrying stream connection in %fs (attempt %d)", sdk_codes.StreamRetry, nextBackOff.Seconds(), reconnectionAttempt)
478+
c.handleStreamDisconnect(ctx, nextBackOff)
479+
477480
reconnectionAttempt += 1
478481

479-
// Backoff before retrying
480-
select {
481-
case <-ticker.C:
482-
case <-ctx.Done():
483-
c.config.Logger.Infof("%s Stream stopped during reconnection", sdk_codes.StreamStop)
484-
ticker.Stop()
485-
return
486-
}
487-
c.streamConnect(ctx)
488482
}
489483
}
490484
}
491485

486+
func (c *CfClient) handleStreamDisconnect(ctx context.Context, nextBackOff time.Duration) {
487+
select {
488+
case <-time.After(nextBackOff):
489+
c.streamConnect(ctx)
490+
case <-ctx.Done():
491+
// Context was cancelled, stop trying to reconnect
492+
c.config.Logger.Infof("%s Stream stopped during reconnection", sdk_codes.StreamStop)
493+
return
494+
}
495+
}
496+
497+
func (c *CfClient) notifyStreamDisconnect(err error) {
498+
c.mux.RLock()
499+
c.streamConnectedBool = false
500+
c.mux.RUnlock()
501+
// If an eventStreamListener has been passed to the Proxy lets notify it of the disconnected
502+
// to let it know something is up with the stream it has been listening to
503+
if c.config.eventStreamListener != nil {
504+
c.config.eventStreamListener.Pub(context.Background(), stream.Event{
505+
APIKey: c.sdkKey,
506+
Environment: c.environmentID,
507+
Err: stream.ErrStreamDisconnect,
508+
})
509+
}
510+
c.config.Logger.Warnf("%s Stream disconnected: %s", sdk_codes.StreamDisconnected, err)
511+
c.config.Logger.Infof("%s Polling started, interval: %v seconds", sdk_codes.PollStart, c.config.pullInterval)
512+
}
513+
492514
func (c *CfClient) pullCronJob(ctx context.Context) {
493515
poll := func() {
494516
c.mux.RLock()

client/transport.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package client
2+
3+
import "net/http"
4+
5+
// HeadersFn is a function type that provides headers dynamically.
6+
type HeadersFn func() (map[string]string, error)
7+
8+
// customTransport wraps an http.RoundTripper and allows adding headers dynamically. This means we can still use
9+
// the goretryable-http client's transport, or a user's transport if they've provided their own http client.
10+
type customTransport struct {
11+
baseTransport http.RoundTripper
12+
getHeaders HeadersFn
13+
}
14+
15+
func NewCustomTransport(baseTransport http.RoundTripper, getHeaderFn HeadersFn) *customTransport {
16+
customTransport := &customTransport{
17+
baseTransport: baseTransport,
18+
getHeaders: getHeaderFn,
19+
}
20+
return customTransport
21+
}
22+
23+
func (t *customTransport) RoundTrip(req *http.Request) (*http.Response, error) {
24+
// Retrieve the headers using the provided function.
25+
headers, err := t.getHeaders()
26+
if err != nil {
27+
return nil, err
28+
}
29+
30+
// Add the headers to the request.
31+
for key, value := range headers {
32+
req.Header.Set(key, value)
33+
}
34+
35+
// Call the base transport's RoundTrip method.
36+
return t.baseTransport.RoundTrip(req)
37+
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ require (
3535
github.com/pmezard/go-difflib v1.0.0 // indirect
3636
go.uber.org/atomic v1.7.0 // indirect
3737
go.uber.org/multierr v1.6.0 // indirect
38-
golang.org/x/net v0.7.0 // indirect
38+
golang.org/x/net v0.17.0 // indirect
3939
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
4040
gopkg.in/yaml.v2 v2.4.0 // indirect
41-
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
41+
gopkg.in/yaml.v3 v3.0.1 // indirect
4242
)

go.sum

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y
153153
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
154154
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
155155
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
156-
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9 h1:NUzdAbFtCJSXU20AOXgeqaUwg8Ypg4MPYmL+d+rsB5c=
157156
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
157+
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
158158
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
159159
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
160160
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
@@ -170,8 +170,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
170170
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
171171
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
172172
golang.org/x/net v0.0.0-20220513224357-95641704303c/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
173-
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
174-
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
173+
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
174+
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
175175
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
176176
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
177177
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -188,15 +188,15 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc
188188
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
189189
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
190190
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
191-
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
191+
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
192192
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
193193
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
194194
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
195195
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
196196
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
197197
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
198198
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
199-
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
199+
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
200200
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
201201
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
202202
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -230,7 +230,8 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
230230
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
231231
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
232232
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
233-
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
234233
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
234+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
235+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
235236
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
236237
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

stream/sse.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package stream
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"github.com/harness/ff-golang-server-sdk/sdk_codes"
78
"time"
@@ -77,38 +78,41 @@ func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey st
7778
// of polling the service then re-establishing a new stream once we can connect
7879
c.client.ReconnectStrategy = &backoff.StopBackOff{}
7980

80-
onConnect := func(s *sse.Client) {
81-
c.streamConnected <- struct{}{}
82-
}
83-
c.client.OnConnect(onConnect)
84-
8581
// If we haven't received a change event or heartbeat in 30 seconds, we consider the stream to be "dead" and force a
8682
// reconnection
8783
const timeout = 30 * time.Second
8884
deadStreamTimer := time.NewTimer(timeout)
85+
// Stop the timer immediately, it will only start when the connection is established
86+
deadStreamTimer.Stop()
8987

88+
onConnect := func(s *sse.Client) {
89+
// Start the dead stream timer
90+
deadStreamTimer.Reset(timeout)
91+
c.streamConnected <- struct{}{}
92+
}
93+
c.client.OnConnect(onConnect)
9094
out := make(chan Event)
9195
go func() {
9296
defer close(out)
9397

98+
// Create another context off of the main SDK context, so we can close dead streams.
99+
deadStreamCtx, deadStreamCancel := context.WithCancel(ctx)
100+
defer deadStreamCancel()
101+
94102
go func() {
95103
select {
96104
case <-ctx.Done():
97105
return
98106
case <-deadStreamTimer.C:
99-
// Just stop the timer, no need to drain its channel here.
100107
deadStreamTimer.Stop()
101-
c.streamDisconnected <- fmt.Errorf("no SSE events received for 30 seconds. Assuming stream is dead and restarting")
108+
deadStreamCancel()
102109
return
103110
}
104111
}()
105112

106-
err := c.client.SubscribeWithContext(ctx, "*", func(msg *sse.Event) {
113+
err := c.client.SubscribeWithContext(deadStreamCtx, "*", func(msg *sse.Event) {
107114

108-
// if timer already expired, drain the channel
109-
if !deadStreamTimer.Stop() {
110-
<-deadStreamTimer.C
111-
}
115+
deadStreamTimer.Stop()
112116
deadStreamTimer.Reset(timeout)
113117

114118
// Heartbeat event
@@ -134,22 +138,25 @@ func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey st
134138

135139
})
136140
if err != nil {
137-
if !deadStreamTimer.Stop() {
138-
<-deadStreamTimer.C
139-
}
141+
deadStreamTimer.Stop()
140142
c.streamDisconnected <- err
141143
return
142144
}
143145

144-
if !deadStreamTimer.Stop() {
145-
<-deadStreamTimer.C
146+
deadStreamTimer.Stop()
147+
148+
// When we cancel the deadStreamContext, we exit the `SubscribeWithContext` function with a nil error.
149+
// So we need an explicit check to see if the reason was
150+
if errors.Is(deadStreamCtx.Err(), context.Canceled) {
151+
c.streamDisconnected <- fmt.Errorf("no SSE events received for 30 seconds. Assuming stream is dead and restarting")
152+
return
146153
}
147154

148-
// Even though we handle the error above, The SSE library we use currently returns a nil error for io.EOF errors, which can
155+
// The SSE library we use currently returns a nil error for io.EOF errors, which can
149156
// happen if ff-server closes the connection at the 24 hours point.
150157
// So we need to signal the stream disconnected channel any time we've exited SubscribeWithContext.
151158
// If we don't do this and the server closes the connection the Go SDK will still think it's connected to the stream
152-
// even though it isn't
159+
// even though it isn't.
153160
c.streamDisconnected <- fmt.Errorf("server closed the connection")
154161
}()
155162

test_wrapper/Makefile

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)