@@ -37,20 +37,21 @@ import (
37
37
// that any pending analytics events have been delivered.
38
38
//
39
39
type CfClient struct {
40
- mux sync.RWMutex
41
- api rest.ClientWithResponsesInterface
42
- metricsapi metricsclient.ClientWithResponsesInterface
43
- sdkKey string
44
- auth rest.AuthenticationRequest
45
- config * config
46
- environmentID string
47
- token string
48
- persistence cache.Persistence
49
- cancelFunc context.CancelFunc
50
- streamConnected bool
51
- authenticated chan struct {}
52
- initialized chan bool
53
- analyticsService * analyticsservice.AnalyticsService
40
+ mux sync.RWMutex
41
+ api rest.ClientWithResponsesInterface
42
+ metricsapi metricsclient.ClientWithResponsesInterface
43
+ sdkKey string
44
+ auth rest.AuthenticationRequest
45
+ config * config
46
+ environmentID string
47
+ token string
48
+ persistence cache.Persistence
49
+ cancelFunc context.CancelFunc
50
+ streamConnected bool
51
+ authenticated chan struct {}
52
+ initialized chan bool
53
+ analyticsService * analyticsservice.AnalyticsService
54
+ clusterIdentifier string
54
55
}
55
56
56
57
// NewCfClient creates a new client instance that connects to CF with the default configuration.
@@ -71,11 +72,12 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
71
72
analyticsService := analyticsservice .NewAnalyticsService (time .Minute , config .Logger )
72
73
73
74
client := & CfClient {
74
- sdkKey : sdkKey ,
75
- config : config ,
76
- authenticated : make (chan struct {}),
77
- initialized : make (chan bool ),
78
- analyticsService : analyticsService ,
75
+ sdkKey : sdkKey ,
76
+ config : config ,
77
+ authenticated : make (chan struct {}),
78
+ initialized : make (chan bool ),
79
+ analyticsService : analyticsService ,
80
+ clusterIdentifier : "1" ,
79
81
}
80
82
ctx , client .cancelFunc = context .WithCancel (context .Background ())
81
83
@@ -158,7 +160,7 @@ func (c *CfClient) streamConnect() {
158
160
c .mux .RLock ()
159
161
defer c .mux .RUnlock ()
160
162
c .config .Logger .Info ("Registering SSE consumer" )
161
- sseClient := sse .NewClient (fmt .Sprintf ("%s/stream" , c .config .url ))
163
+ sseClient := sse .NewClient (fmt .Sprintf ("%s/stream?cluster=%s " , c .config .url , c . clusterIdentifier ))
162
164
conn := stream .NewSSEClient (c .sdkKey , c .token , sseClient , c .config .Cache , c .api )
163
165
err := conn .Connect (c .environmentID )
164
166
if err != nil {
@@ -240,6 +242,12 @@ func (c *CfClient) authenticate(ctx context.Context, target evaluation.Target) {
240
242
return
241
243
}
242
244
245
+ c .clusterIdentifier , ok = claims ["clusterIdentifier" ].(string )
246
+ if ! ok {
247
+ c .config .Logger .Error (errors .New ("cluster identifier not present" ))
248
+ c .clusterIdentifier = "1"
249
+ }
250
+
243
251
// network layer setup
244
252
bearerTokenProvider , bearerTokenProviderErr := securityprovider .NewSecurityProviderBearerToken (c .token )
245
253
if bearerTokenProviderErr != nil {
@@ -248,6 +256,7 @@ func (c *CfClient) authenticate(ctx context.Context, target evaluation.Target) {
248
256
}
249
257
restClient , err := rest .NewClientWithResponses (c .config .url ,
250
258
rest .WithRequestEditorFn (bearerTokenProvider .Intercept ),
259
+ rest .WithRequestEditorFn (c .InterceptAddCluster ),
251
260
rest .WithHTTPClient (c .config .httpClient ),
252
261
)
253
262
if err != nil {
@@ -539,6 +548,14 @@ func (c *CfClient) Environment() string {
539
548
return c .environmentID
540
549
}
541
550
551
+ // InterceptAddCluster adds cluster ID to calls
552
+ func (c * CfClient ) InterceptAddCluster (ctx context.Context , req * http.Request ) error {
553
+ q := req .URL .Query ()
554
+ q .Add ("cluster" , c .clusterIdentifier )
555
+ req .URL .RawQuery = q .Encode ()
556
+ return nil
557
+ }
558
+
542
559
// contains determines if the string variation is in the slice of variations.
543
560
// returns true if found, otherwise false.
544
561
func contains (variations []string , variation string ) bool {
0 commit comments