Skip to content

Commit fb0e102

Browse files
committed
refactor
1 parent 3f6a13d commit fb0e102

File tree

3 files changed

+151
-107
lines changed

3 files changed

+151
-107
lines changed

client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (c *CfClient) retrieve(ctx context.Context) bool {
163163
return ok
164164
}
165165

166-
func (c *CfClient) streamConnect() {
166+
func (c *CfClient) streamConnect(ctx context.Context) {
167167
// we only ever want one stream to be setup - other threads must wait before trying to establish a connection
168168
c.streamConnectedLock.Lock()
169169
defer c.streamConnectedLock.Unlock()
@@ -189,7 +189,7 @@ func (c *CfClient) streamConnect() {
189189
// Connect kicks off a goroutine that attempts to establish a stream connection
190190
// while this is happening we set streamConnected to true - if any errors happen
191191
// in this process streamConnected will be set back to false by the streamErr function
192-
conn.Connect(c.environmentID, c.sdkKey)
192+
conn.Connect(ctx, c.environmentID, c.sdkKey)
193193
c.streamConnected = true
194194
}
195195

@@ -307,7 +307,7 @@ func (c *CfClient) pullCronJob(ctx context.Context) {
307307
if ok && c.config.enableStream {
308308
// here stream is enabled but not connected, so we attempt to reconnect
309309
c.config.Logger.Info("Attempting to start stream")
310-
c.streamConnect()
310+
c.streamConnect(ctx)
311311
}
312312
}
313313
c.mux.RUnlock()

stream/sse.go

Lines changed: 146 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package stream
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
"github.com/harness/ff-golang-server-sdk/cache"
@@ -56,124 +55,169 @@ func NewSSEClient(
5655
}
5756

5857
// Connect will subscribe to SSE stream
59-
func (c *SSEClient) Connect(environment string, apiKey string) {
58+
func (c *SSEClient) Connect(ctx context.Context, environment string, apiKey string) {
59+
go func() {
60+
for event := range orDone(ctx, c.subscribe(ctx, environment, apiKey)) {
61+
c.handleEvent(event)
62+
}
63+
}()
64+
}
65+
66+
// Connect will subscribe to SSE stream
67+
func (c *SSEClient) subscribe(ctx context.Context, environment string, apiKey string) <-chan Event {
6068
c.logger.Infof("Start subscribing to Stream")
6169
// don't use the default exponentialBackoff strategy - we have our own disconnect logic
6270
// of polling the service then re-establishing a new stream once we can connect
6371
c.client.ReconnectStrategy = &backoff.StopBackOff{}
6472
// it is blocking operation, it needs to go in go routine
73+
74+
out := make(chan Event)
6575
go func() {
66-
err := c.client.Subscribe("*", func(msg *sse.Event) {
76+
defer close(out)
77+
78+
err := c.client.SubscribeWithContext(ctx, "*", func(msg *sse.Event) {
6779
c.logger.Infof("Event received: %s", msg.Data)
6880

69-
wg := &sync.WaitGroup{}
81+
if len(msg.Data) <= 0 {
82+
return
83+
}
84+
85+
event := Event{
86+
APIKey: apiKey,
87+
Environment: environment,
88+
SSEEvent: msg,
89+
}
90+
91+
select {
92+
case <-ctx.Done():
93+
return
94+
case out <- event:
95+
}
96+
97+
})
98+
if err != nil {
99+
c.logger.Errorf("Error initializing stream: %s", err.Error())
100+
c.onStreamError()
101+
}
102+
}()
103+
104+
return out
105+
}
106+
107+
func (c *SSEClient) handleEvent(event Event) {
108+
cfMsg := Message{}
109+
err := json.Unmarshal(event.SSEEvent.Data, &cfMsg)
110+
if err != nil {
111+
c.logger.Errorf("%s", err.Error())
112+
return
113+
}
114+
115+
switch cfMsg.Domain {
116+
case dto.KeyFeature:
117+
// maybe is better to send event on memory bus that we get new message
118+
// and subscribe to that event
119+
switch cfMsg.Event {
120+
case dto.SseDeleteEvent:
121+
122+
c.cache.Remove(dto.Key{
123+
Type: dto.KeyFeature,
124+
Name: cfMsg.Identifier,
125+
})
126+
127+
case dto.SsePatchEvent, dto.SseCreateEvent:
128+
fallthrough
129+
default:
130+
updateWithTimeout := func() {
131+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
132+
defer cancel()
133+
134+
response, err := c.api.GetFeatureConfigByIdentifierWithResponse(ctx, event.Environment, cfMsg.Identifier)
135+
if err != nil {
136+
c.logger.Errorf("error while pulling flag, err: %s", err.Error())
137+
return
138+
}
139+
140+
if response.JSON200 != nil {
141+
c.cache.Set(dto.Key{
142+
Type: dto.KeyFeature,
143+
Name: cfMsg.Identifier,
144+
}, *response.JSON200.Convert())
145+
}
146+
}
147+
148+
updateWithTimeout()
149+
}
150+
151+
case dto.KeySegment:
152+
// need open client spec change
153+
switch cfMsg.Event {
154+
case dto.SseDeleteEvent:
70155

71-
cfMsg := Message{}
72-
if len(msg.Data) > 0 {
73-
err := json.Unmarshal(msg.Data, &cfMsg)
156+
c.cache.Remove(dto.Key{
157+
Type: dto.KeySegment,
158+
Name: cfMsg.Identifier,
159+
})
160+
161+
case dto.SsePatchEvent, dto.SseCreateEvent:
162+
fallthrough
163+
default:
164+
updateWithTimeout := func() {
165+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
166+
defer cancel()
167+
168+
response, err := c.api.GetSegmentByIdentifierWithResponse(ctx, event.Environment, cfMsg.Identifier)
74169
if err != nil {
75-
c.logger.Errorf("%s", err.Error())
170+
c.logger.Errorf("error while pulling segment, err: %s", err.Error())
76171
return
77172
}
173+
if response.JSON200 != nil {
174+
c.cache.Set(dto.Key{
175+
Type: dto.KeySegment,
176+
Name: cfMsg.Identifier,
177+
}, response.JSON200.Convert())
178+
}
179+
}
180+
updateWithTimeout()
181+
}
182+
}
78183

79-
switch cfMsg.Domain {
80-
case dto.KeyFeature:
81-
// maybe is better to send event on memory bus that we get new message
82-
// and subscribe to that event
83-
switch cfMsg.Event {
84-
case dto.SseDeleteEvent:
85-
wg.Add(1)
86-
87-
go func(identifier string) {
88-
defer wg.Done()
89-
90-
c.cache.Remove(dto.Key{
91-
Type: dto.KeyFeature,
92-
Name: identifier,
93-
})
94-
}(cfMsg.Identifier)
95-
96-
case dto.SsePatchEvent, dto.SseCreateEvent:
97-
fallthrough
98-
default:
99-
wg.Add(1)
100-
101-
go func(env, identifier string) {
102-
defer wg.Done()
103-
104-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
105-
defer cancel()
106-
response, err := c.api.GetFeatureConfigByIdentifierWithResponse(ctx, env, identifier)
107-
if err != nil {
108-
c.logger.Errorf("error while pulling flag, err: %s", err.Error())
109-
return
110-
}
111-
if response.JSON200 != nil {
112-
c.cache.Set(dto.Key{
113-
Type: dto.KeyFeature,
114-
Name: identifier,
115-
}, *response.JSON200.Convert())
116-
}
117-
}(environment, cfMsg.Identifier)
118-
}
119-
120-
case dto.KeySegment:
121-
// need open client spec change
122-
switch cfMsg.Event {
123-
case dto.SseDeleteEvent:
124-
wg.Add(1)
125-
126-
go func(identifier string) {
127-
defer wg.Done()
128-
129-
c.cache.Remove(dto.Key{
130-
Type: dto.KeySegment,
131-
Name: identifier,
132-
})
133-
}(cfMsg.Identifier)
134-
135-
case dto.SsePatchEvent, dto.SseCreateEvent:
136-
fallthrough
137-
default:
138-
wg.Add(1)
139-
140-
go func(env, identifier string) {
141-
defer wg.Done()
142-
143-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
144-
defer cancel()
145-
response, err := c.api.GetSegmentByIdentifierWithResponse(ctx, env, identifier)
146-
if err != nil {
147-
c.logger.Errorf("error while pulling segment, err: %s", err.Error())
148-
return
149-
}
150-
if response.JSON200 != nil {
151-
c.cache.Set(dto.Key{
152-
Type: dto.KeySegment,
153-
Name: identifier,
154-
}, response.JSON200.Convert())
155-
}
156-
}(environment, cfMsg.Identifier)
157-
}
184+
if c.eventStreamListener != nil {
185+
sendWithTimeout := func() error {
186+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
187+
defer cancel()
188+
return c.eventStreamListener.Pub(ctx, Event{APIKey: event.APIKey, Environment: event.Environment, SSEEvent: event.SSEEvent})
189+
}
190+
191+
if err := sendWithTimeout(); err != nil {
192+
c.logger.Errorf("error while forwarding SSE Event to change stream: %s", err)
193+
}
194+
}
195+
}
196+
197+
// orDone is a helper that encapsulates the logic for reading from a channel
198+
// whilst waiting for a cancellation.
199+
func orDone(ctx context.Context, c <-chan Event) <-chan Event {
200+
out := make(chan Event)
201+
202+
go func() {
203+
defer close(out)
204+
205+
for {
206+
select {
207+
case <-ctx.Done():
208+
return
209+
case cp, ok := <-c:
210+
if !ok {
211+
return
158212
}
159213

160-
if c.eventStreamListener != nil {
161-
sendWithTimeout := func() error {
162-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
163-
defer cancel()
164-
return c.eventStreamListener.Pub(ctx, Event{APIKey: apiKey, Environment: environment, Event: msg})
165-
}
166-
167-
wg.Wait()
168-
if err := sendWithTimeout(); err != nil {
169-
c.logger.Errorf("error while forwarding SSE Event to change stream: %s", err)
170-
}
214+
select {
215+
case <-ctx.Done():
216+
case out <- cp:
171217
}
172218
}
173-
})
174-
if err != nil {
175-
c.logger.Errorf("Error initializing stream: %s", err.Error())
176-
c.onStreamError()
177219
}
178220
}()
221+
222+
return out
179223
}

stream/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ type Event struct {
2626
APIKey string
2727
// Environment is the ID of the environment that the event occured for
2828
Environment string
29-
// Event is the SSEEvent that was sent from the FeatureFlags server to the SDK
30-
Event *sse.Event
29+
// SSEEvent is the SSEEvent that was sent from the FeatureFlags server to the SDK
30+
SSEEvent *sse.Event
3131
}

0 commit comments

Comments
 (0)