@@ -3,6 +3,7 @@ package stream
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "sync"
6
7
"time"
7
8
8
9
"github.com/harness/ff-golang-server-sdk/cache"
@@ -17,11 +18,12 @@ import (
17
18
18
19
// SSEClient is Server Send Event object
19
20
type SSEClient struct {
20
- api rest.ClientWithResponsesInterface
21
- client * sse.Client
22
- cache cache.Cache
23
- logger logger.Logger
24
- onStreamError func ()
21
+ api rest.ClientWithResponsesInterface
22
+ client * sse.Client
23
+ cache cache.Cache
24
+ logger logger.Logger
25
+ onStreamError func ()
26
+ eventStreamListener EventStreamListener
25
27
}
26
28
27
29
var json = jsoniter .ConfigCompatibleWithStandardLibrary
@@ -35,24 +37,26 @@ func NewSSEClient(
35
37
api rest.ClientWithResponsesInterface ,
36
38
logger logger.Logger ,
37
39
onStreamError func (),
40
+ eventStreamListener EventStreamListener ,
38
41
) * SSEClient {
39
42
client .Headers ["Authorization" ] = fmt .Sprintf ("Bearer %s" , token )
40
43
client .Headers ["API-Key" ] = apiKey
41
44
client .OnDisconnect (func (client * sse.Client ) {
42
45
onStreamError ()
43
46
})
44
47
sseClient := & SSEClient {
45
- client : client ,
46
- cache : cache ,
47
- api : api ,
48
- logger : logger ,
49
- onStreamError : onStreamError ,
48
+ client : client ,
49
+ cache : cache ,
50
+ api : api ,
51
+ logger : logger ,
52
+ onStreamError : onStreamError ,
53
+ eventStreamListener : eventStreamListener ,
50
54
}
51
55
return sseClient
52
56
}
53
57
54
58
// Connect will subscribe to SSE stream
55
- func (c * SSEClient ) Connect (environment string ) {
59
+ func (c * SSEClient ) Connect (environment string , apiKey string ) {
56
60
c .logger .Infof ("Start subscribing to Stream" )
57
61
// don't use the default exponentialBackoff strategy - we have our own disconnect logic
58
62
// of polling the service then re-establishing a new stream once we can connect
@@ -62,6 +66,8 @@ func (c *SSEClient) Connect(environment string) {
62
66
err := c .client .Subscribe ("*" , func (msg * sse.Event ) {
63
67
c .logger .Infof ("Event received: %s" , msg .Data )
64
68
69
+ wg := & sync.WaitGroup {}
70
+
65
71
cfMsg := Message {}
66
72
if len (msg .Data ) > 0 {
67
73
err := json .Unmarshal (msg .Data , & cfMsg )
@@ -76,16 +82,25 @@ func (c *SSEClient) Connect(environment string) {
76
82
// and subscribe to that event
77
83
switch cfMsg .Event {
78
84
case dto .SseDeleteEvent :
85
+ wg .Add (1 )
86
+
79
87
go func (identifier string ) {
88
+ defer wg .Done ()
89
+
80
90
c .cache .Remove (dto.Key {
81
91
Type : dto .KeyFeature ,
82
92
Name : identifier ,
83
93
})
84
94
}(cfMsg .Identifier )
95
+
85
96
case dto .SsePatchEvent , dto .SseCreateEvent :
86
97
fallthrough
87
98
default :
99
+ wg .Add (1 )
100
+
88
101
go func (env , identifier string ) {
102
+ defer wg .Done ()
103
+
89
104
ctx , cancel := context .WithTimeout (context .Background (), time .Second * 60 )
90
105
defer cancel ()
91
106
response , err := c .api .GetFeatureConfigByIdentifierWithResponse (ctx , env , identifier )
@@ -101,20 +116,30 @@ func (c *SSEClient) Connect(environment string) {
101
116
}
102
117
}(environment , cfMsg .Identifier )
103
118
}
119
+
104
120
case dto .KeySegment :
105
121
// need open client spec change
106
122
switch cfMsg .Event {
107
123
case dto .SseDeleteEvent :
124
+ wg .Add (1 )
125
+
108
126
go func (identifier string ) {
127
+ defer wg .Done ()
128
+
109
129
c .cache .Remove (dto.Key {
110
130
Type : dto .KeySegment ,
111
131
Name : identifier ,
112
132
})
113
133
}(cfMsg .Identifier )
134
+
114
135
case dto .SsePatchEvent , dto .SseCreateEvent :
115
136
fallthrough
116
137
default :
138
+ wg .Add (1 )
139
+
117
140
go func (env , identifier string ) {
141
+ defer wg .Done ()
142
+
118
143
ctx , cancel := context .WithTimeout (context .Background (), time .Second * 60 )
119
144
defer cancel ()
120
145
response , err := c .api .GetSegmentByIdentifierWithResponse (ctx , env , identifier )
@@ -131,6 +156,19 @@ func (c *SSEClient) Connect(environment string) {
131
156
}(environment , cfMsg .Identifier )
132
157
}
133
158
}
159
+
160
+ if c .eventStreamListener != nil {
161
+ sendWithTimeout := func () error {
162
+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
163
+ defer cancel ()
164
+ return c .eventStreamListener .Pub (ctx , Event {APIKey : apiKey , Environment : environment , Event : msg })
165
+ }
166
+
167
+ wg .Wait ()
168
+ if err := sendWithTimeout (); err != nil {
169
+ c .logger .Errorf ("error while forwarding SSE Event to change stream: %s" , err )
170
+ }
171
+ }
134
172
}
135
173
})
136
174
if err != nil {
0 commit comments