Skip to content

Commit a06246f

Browse files
authored
[FSSDK-9631] feat: add datafile syncer to synchronize datafile across agent nodes for webhook API (#405)
* feat: add redis syncer for webhook * Modify syncer * fix bug * add various fix * refactor code * refactor code * add unit test * improve logging * add unit test for pubsub * add unit test * add unit test * add unit test * add unit test * refactor code * cleanup * update config * add review changes * update config doc
1 parent 9825f2d commit a06246f

File tree

16 files changed

+1107
-132
lines changed

16 files changed

+1107
-132
lines changed

cmd/optimizely/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/spf13/viper"
3434

3535
"github.com/optimizely/agent/config"
36+
"github.com/optimizely/agent/pkg/handlers"
3637
"github.com/optimizely/agent/pkg/metrics"
3738
"github.com/optimizely/agent/pkg/optimizely"
3839
"github.com/optimizely/agent/pkg/routers"
@@ -266,7 +267,10 @@ func main() {
266267
sdkMetricsRegistry := optimizely.NewRegistry(agentMetricsRegistry)
267268

268269
ctx, cancel := context.WithCancel(context.Background()) // Create default service context
269-
sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
270+
defer cancel()
271+
ctx = context.WithValue(ctx, handlers.LoggerKey, &log.Logger)
272+
273+
sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
270274
optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry)
271275
optlyCache.Init(conf.SDKKeys)
272276

@@ -286,7 +290,7 @@ func main() {
286290

287291
log.Info().Str("version", conf.Version).Msg("Starting services.")
288292
sg.GoListenAndServe("api", conf.API.Port, apiRouter)
289-
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(optlyCache, conf.Webhook))
293+
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(ctx, optlyCache, *conf))
290294
sg.GoListenAndServe("admin", conf.Admin.Port, adminRouter) // Admin should be added last.
291295

292296
// wait for server group to shutdown

config.yaml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,20 @@ runtime:
244244
## (For n>1 the details of sampling may change.)
245245
mutexProfileFraction: 0
246246

247-
## synchronization should be enabled when multiple replicas of agent is deployed
248-
## if notification synchronization is enabled, then the active notification event-stream API
249-
## will get the notifications from multiple replicas
247+
## synchronization should be enabled when features for multiple nodes like notification streaming are deployed
250248
synchronization:
251249
pubsub:
252250
redis:
253251
host: "redis.demo.svc:6379"
254252
password: ""
255253
database: 0
256-
channel: "optimizely-sync"
254+
## if notification synchronization is enabled, then the active notification event-stream API
255+
## will get the notifications from available replicas
257256
notification:
258257
enable: false
259258
default: "redis"
259+
## if datafile synchronization is enabled, then for each webhook API call
260+
## the datafile will be sent to all available replicas to achieve better eventual consistency
261+
datafile:
262+
enable: false
263+
default: "redis"

config/config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func NewDefaultConfig() *AgentConfig {
135135
"channel": "optimizely-notifications",
136136
},
137137
},
138-
Notification: NotificationConfig{
138+
Notification: FeatureSyncConfig{
139139
Enable: false,
140140
Default: "redis",
141141
},
@@ -167,11 +167,12 @@ type AgentConfig struct {
167167
// SyncConfig contains Synchronization configuration for the multiple Agent nodes
168168
type SyncConfig struct {
169169
Pubsub map[string]interface{} `json:"pubsub"`
170-
Notification NotificationConfig `json:"notification"`
170+
Notification FeatureSyncConfig `json:"notification"`
171+
Datafile FeatureSyncConfig `json:"datafile"`
171172
}
172173

173-
// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes
174-
type NotificationConfig struct {
174+
// FeatureSyncConfig contains Notification Synchronization configuration for the multiple Agent nodes
175+
type FeatureSyncConfig struct {
175176
Enable bool `json:"enable"`
176177
Default string `json:"default"`
177178
}

pkg/handlers/notification.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"net/http"
2626
"strings"
2727

28-
"github.com/go-redis/redis/v8"
2928
"github.com/optimizely/agent/config"
3029
"github.com/optimizely/agent/pkg/middleware"
3130
"github.com/optimizely/agent/pkg/syncer"
@@ -212,26 +211,22 @@ func DefaultNotificationReceiver(ctx context.Context) (<-chan syncer.Event, erro
212211
return messageChan, nil
213212
}
214213

215-
func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
214+
func SyncedNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc {
216215
return func(ctx context.Context) (<-chan syncer.Event, error) {
217216
sdkKey, ok := ctx.Value(SDKKey).(string)
218217
if !ok || sdkKey == "" {
219218
return nil, errors.New("sdk key not found")
220219
}
221220

222-
redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, conf, sdkKey)
221+
ncSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf)
223222
if err != nil {
224223
return nil, err
225224
}
226225

227-
client := redis.NewClient(&redis.Options{
228-
Addr: redisSyncer.Host,
229-
Password: redisSyncer.Password,
230-
DB: redisSyncer.Database,
231-
})
232-
233-
// Subscribe to a Redis channel
234-
pubsub := client.Subscribe(ctx, syncer.GetChannelForSDKKey(redisSyncer.Channel, sdkKey))
226+
eventCh, err := ncSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey))
227+
if err != nil {
228+
return nil, err
229+
}
235230

236231
dataChan := make(chan syncer.Event)
237232

@@ -244,19 +239,12 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc
244239
for {
245240
select {
246241
case <-ctx.Done():
247-
client.Close()
248-
pubsub.Close()
242+
close(dataChan)
249243
logger.Debug().Msg("context canceled, redis notification receiver is closed")
250244
return
251-
default:
252-
msg, err := pubsub.ReceiveMessage(ctx)
253-
if err != nil {
254-
logger.Err(err).Msg("failed to receive message from redis")
255-
continue
256-
}
257-
245+
case msg := <-eventCh:
258246
var event syncer.Event
259-
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
247+
if err := json.Unmarshal([]byte(msg), &event); err != nil {
260248
logger.Err(err).Msg("failed to unmarshal redis message")
261249
continue
262250
}

pkg/handlers/notification_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func (suite *NotificationTestSuite) TestTrackAndProjectConfigWithSynchronization
216216
"database": 0,
217217
},
218218
},
219-
Notification: config.NotificationConfig{
219+
Notification: config.FeatureSyncConfig{
220220
Enable: true,
221221
Default: "redis",
222222
},
@@ -370,7 +370,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
370370
"database": 0,
371371
},
372372
},
373-
Notification: config.NotificationConfig{
373+
Notification: config.FeatureSyncConfig{
374374
Enable: true,
375375
Default: "redis",
376376
},
@@ -407,7 +407,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
407407
}
408408
for _, tt := range tests {
409409
t.Run(tt.name, func(t *testing.T) {
410-
got := RedisNotificationReceiver(tt.args.conf)
410+
got := SyncedNotificationReceiver(tt.args.conf)
411411
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
412412
t.Errorf("RedisNotificationReceiver() = %v, want %v", got, tt.want)
413413
}

pkg/handlers/webhook.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,25 @@
1818
package handlers
1919

2020
import (
21+
"context"
2122
"crypto/hmac"
2223
"crypto/sha1"
2324
"crypto/subtle"
2425
"encoding/hex"
2526
"encoding/json"
27+
"fmt"
2628
"io"
2729
"net/http"
2830
"strconv"
2931

3032
"github.com/optimizely/agent/config"
3133

3234
"github.com/go-chi/render"
35+
"github.com/rs/zerolog"
3336
"github.com/rs/zerolog/log"
3437

3538
"github.com/optimizely/agent/pkg/optimizely"
39+
"github.com/optimizely/agent/pkg/syncer"
3640
)
3741

3842
const signatureHeader = "X-Hub-Signature"
@@ -56,15 +60,19 @@ type OptlyMessage struct {
5660

5761
// OptlyWebhookHandler handles incoming messages from Optimizely
5862
type OptlyWebhookHandler struct {
59-
optlyCache optimizely.Cache
60-
ProjectMap map[int64]config.WebhookProject
63+
optlyCache optimizely.Cache
64+
ProjectMap map[int64]config.WebhookProject
65+
configSyncer syncer.Syncer
66+
syncEnabled bool
6167
}
6268

6369
// NewWebhookHandler returns a new instance of OptlyWebhookHandler
64-
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject) *OptlyWebhookHandler {
70+
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, configSyncer syncer.Syncer) *OptlyWebhookHandler {
6571
return &OptlyWebhookHandler{
66-
optlyCache: optlyCache,
67-
ProjectMap: projectMap,
72+
optlyCache: optlyCache,
73+
ProjectMap: projectMap,
74+
syncEnabled: configSyncer != nil,
75+
configSyncer: configSyncer,
6876
}
6977
}
7078

@@ -140,7 +148,47 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque
140148

141149
// Iterate through all SDK keys and update config
142150
for _, sdkKey := range webhookConfig.SDKKeys {
143-
h.optlyCache.UpdateConfigs(sdkKey)
151+
if h.syncEnabled {
152+
if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil {
153+
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
154+
log.Error().Msg(errMsg)
155+
h.optlyCache.UpdateConfigs(sdkKey)
156+
}
157+
} else {
158+
h.optlyCache.UpdateConfigs(sdkKey)
159+
}
144160
}
145161
w.WriteHeader(http.StatusNoContent)
146162
}
163+
164+
func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error {
165+
logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger)
166+
if !ok {
167+
logger = &log.Logger
168+
}
169+
170+
if !h.syncEnabled {
171+
logger.Debug().Msg("datafile syncer is not enabled")
172+
return nil
173+
}
174+
175+
dataCh, err := h.configSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel())
176+
if err != nil {
177+
return err
178+
}
179+
180+
go func() {
181+
for {
182+
select {
183+
case <-ctx.Done():
184+
logger.Debug().Msg("context canceled, syncer is stopped")
185+
return
186+
case key := <-dataCh:
187+
h.optlyCache.UpdateConfigs(key)
188+
logger.Info().Msgf("datafile synced successfully for sdkKey: %s", key)
189+
}
190+
}
191+
}()
192+
logger.Debug().Msg("datafile syncer is started")
193+
return nil
194+
}

pkg/handlers/webhook_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handlers
1919

2020
import (
2121
"bytes"
22+
"context"
2223
"encoding/json"
2324
"net/http"
2425
"net/http/httptest"
@@ -46,6 +47,28 @@ func NewCache() *TestCache {
4647
}
4748
}
4849

50+
type TestDFSyncer struct {
51+
syncCalled bool
52+
subscribeCalled bool
53+
}
54+
55+
func NewTestDFSyncer() *TestDFSyncer {
56+
return &TestDFSyncer{
57+
syncCalled: false,
58+
subscribeCalled: false,
59+
}
60+
}
61+
62+
func (t *TestDFSyncer) Sync(_ context.Context, _ string, _ string) error {
63+
t.syncCalled = true
64+
return nil
65+
}
66+
67+
func (t *TestDFSyncer) Subscribe(_ context.Context, _ string) (chan string, error) {
68+
t.subscribeCalled = true
69+
return make(chan string), nil
70+
}
71+
4972
// GetClient returns a default OptlyClient for testing
5073
func (tc *TestCache) GetClient(sdkKey string) (*optimizely.OptlyClient, error) {
5174
return &optimizely.OptlyClient{
@@ -111,7 +134,7 @@ func TestHandleWebhookValidMessageInvalidSignature(t *testing.T) {
111134
Secret: "I am secret",
112135
},
113136
}
114-
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs)
137+
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, nil)
115138
webhookMsg := OptlyMessage{
116139
ProjectID: 42,
117140
Timestamp: 42424242,
@@ -146,7 +169,7 @@ func TestHandleWebhookSkippedCheckInvalidSignature(t *testing.T) {
146169
SkipSignatureCheck: true,
147170
},
148171
}
149-
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs)
172+
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil)
150173
webhookMsg := OptlyMessage{
151174
ProjectID: 42,
152175
Timestamp: 42424242,
@@ -181,7 +204,7 @@ func TestHandleWebhookValidMessage(t *testing.T) {
181204
Secret: "I am secret",
182205
},
183206
}
184-
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs)
207+
optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil)
185208
webhookMsg := OptlyMessage{
186209
ProjectID: 42,
187210
Timestamp: 42424242,
@@ -208,3 +231,55 @@ func TestHandleWebhookValidMessage(t *testing.T) {
208231
assert.Equal(t, http.StatusNoContent, rec.Code)
209232
assert.Equal(t, true, testCache.updateConfigsCalled)
210233
}
234+
235+
func TestHandleWebhookWithDatafileSyncer(t *testing.T) {
236+
var testWebhookConfigs = map[int64]config.WebhookProject{
237+
42: {
238+
SDKKeys: []string{"myDatafile"},
239+
Secret: "I am secret",
240+
},
241+
}
242+
syncer := NewTestDFSyncer()
243+
244+
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer)
245+
webhookMsg := OptlyMessage{
246+
ProjectID: 42,
247+
Timestamp: 42424242,
248+
Event: "project.datafile_updated",
249+
Data: DatafileUpdateData{
250+
Revision: 101,
251+
OriginURL: "origin.optimizely.com/datafiles/myDatafile",
252+
CDNUrl: "cdn.optimizely.com/datafiles/myDatafile",
253+
Environment: "Production",
254+
},
255+
}
256+
257+
validWebhookMessage, _ := json.Marshal(webhookMsg)
258+
259+
req := httptest.NewRequest("POST", "/webhooks/optimizely", bytes.NewBuffer(validWebhookMessage))
260+
261+
// This sha1 has been computed from the Optimizely application
262+
req.Header.Set(signatureHeader, "sha1=e0199de63fb7192634f52136d4ceb7dc6f191da3")
263+
264+
rec := httptest.NewRecorder()
265+
handler := http.HandlerFunc(optlyHandler.HandleWebhook)
266+
handler.ServeHTTP(rec, req)
267+
268+
assert.Equal(t, http.StatusNoContent, rec.Code)
269+
assert.Equal(t, true, syncer.syncCalled)
270+
}
271+
272+
func TestWebhookStartSyncer(t *testing.T) {
273+
var testWebhookConfigs = map[int64]config.WebhookProject{
274+
42: {
275+
SDKKeys: []string{"myDatafile"},
276+
Secret: "I am secret",
277+
},
278+
}
279+
syncer := NewTestDFSyncer()
280+
281+
optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer)
282+
err := optlyHandler.StartSyncer(context.Background())
283+
assert.NoError(t, err)
284+
assert.Equal(t, true, syncer.subscribeCalled)
285+
}

0 commit comments

Comments
 (0)