Skip to content

Commit 33841f1

Browse files
(feat): first pass at using sdk key with processor for notification events. … (#140)
* first pass at using sdk key with processor for notification events. this should be refactored to use the sdk key in the project config. * add setting sdk key to client factory * fix lint errors * lint * remove wrong package that was auto added * cleanup code a little * added some more testing for dispatch * refactor notification to hide log event until registered callback * rename callback parameter
1 parent ba6072b commit 33841f1

File tree

7 files changed

+163
-7
lines changed

7 files changed

+163
-7
lines changed

pkg/client/factory.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
4646
executionCtx: executionCtx,
4747
DecisionService: decision.NewCompositeService(f.SDKKey),
4848
EventProcessor: event.NewEventProcessor(event.BatchSize(event.DefaultBatchSize),
49-
event.QueueSize(event.DefaultEventQueueSize), event.FlushInterval(event.DefaultEventFlushInterval)),
49+
event.QueueSize(event.DefaultEventQueueSize), event.FlushInterval(event.DefaultEventFlushInterval),
50+
event.SDKKey(f.SDKKey)),
5051
}
5152

5253
for _, opt := range clientOptions {

pkg/event/dispatcher.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func (ed *QueueEventDispatcher) flushEvents() {
128128
}
129129
} else {
130130
dispatcherLogger.Error("Error dispatching ", err)
131+
// we failed. Sleep some seconds and try again.
132+
time.Sleep(sleepTime)
133+
// increase retryCount. We exit if we have retried x times.
134+
// we will retry again next event that is added.
135+
retryCount++
131136
}
132137
}
133138
}

pkg/event/dispatcher_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,59 @@ func TestQueueEventDispatcher_DispatchEvent(t *testing.T) {
5858
assert.Equal(t, 0, qd.eventQueue.Size())
5959

6060
}
61+
62+
func TestQueueEventDispatcher_InvalidEvent(t *testing.T) {
63+
ctx := context.TODO()
64+
q := NewQueueEventDispatcher(ctx)
65+
66+
67+
config := TestConfig{}
68+
69+
if qed, ok := q.(*QueueEventDispatcher); ok {
70+
qed.Dispatcher = &MockDispatcher{Events: NewInMemoryQueue(100)}
71+
qed.eventQueue.Add(config)
72+
}
73+
74+
qd, _ := q.(*QueueEventDispatcher)
75+
76+
assert.Equal(t, 1, qd.eventQueue.Size())
77+
78+
// give the queue a chance to run
79+
qd.flushEvents()
80+
81+
// check the queue. bad event type should be removed. but, not sent.
82+
assert.Equal(t, 0, qd.eventQueue.Size())
83+
84+
}
85+
86+
func TestQueueEventDispatcher_FailDispath(t *testing.T) {
87+
ctx := context.TODO()
88+
q := NewQueueEventDispatcher(ctx)
89+
90+
if qed, ok := q.(*QueueEventDispatcher); ok {
91+
qed.Dispatcher = &MockDispatcher{ShouldFail: true, Events: NewInMemoryQueue(100)}
92+
}
93+
94+
eventTags := map[string]interface{}{"revenue": 55.0, "value": 25.1}
95+
config := TestConfig{}
96+
97+
conversionUserEvent := CreateConversionUserEvent(config, entities.Event{ExperimentIds: []string{"15402980349"}, ID: "15368860886", Key: "sample_conversion"}, userContext, eventTags)
98+
99+
batch := createBatchEvent(conversionUserEvent, createVisitorFromUserEvent(conversionUserEvent))
100+
101+
logEvent := createLogEvent(batch)
102+
103+
q.DispatchEvent(logEvent)
104+
105+
qd, _ := q.(*QueueEventDispatcher)
106+
107+
assert.Equal(t, 1, qd.eventQueue.Size())
108+
109+
// give the queue a chance to run
110+
qd.flushEvents()
111+
112+
// check the queue. bad event type should be removed. but, not sent.
113+
assert.Equal(t, 1, qd.eventQueue.Size())
114+
115+
}
116+

pkg/event/processor.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package event
1919

2020
import (
2121
"errors"
22+
"fmt"
23+
"github.com/optimizely/go-sdk/pkg/logging"
24+
"github.com/optimizely/go-sdk/pkg/notification"
25+
"github.com/optimizely/go-sdk/pkg/registry"
26+
"github.com/optimizely/go-sdk/pkg/utils"
2227
"sync"
2328
"time"
24-
25-
"github.com/optimizely/go-sdk/pkg/utils"
26-
27-
"github.com/optimizely/go-sdk/pkg/logging"
2829
)
2930

3031
// Processor processes events
@@ -34,6 +35,7 @@ type Processor interface {
3435

3536
// QueueingEventProcessor is used out of the box by the SDK
3637
type QueueingEventProcessor struct {
38+
sdkKey string
3739
MaxQueueSize int // max size of the queue before flush
3840
FlushInterval time.Duration // in milliseconds
3941
BatchSize int
@@ -92,6 +94,14 @@ func PDispatcher(d Dispatcher) QPConfigOption {
9294
}
9395
}
9496

97+
// SDKKey sets the SDKKey used to register for notifications. This should be removed when the project
98+
// config supports sdk key.
99+
func SDKKey(sdkKey string) QPConfigOption {
100+
return func(qp *QueueingEventProcessor) {
101+
qp.sdkKey = sdkKey
102+
}
103+
}
104+
95105
// NewEventProcessor returns a new instance of QueueingEventProcessor with queueSize and flushInterval
96106
func NewEventProcessor(options ...QPConfigOption) *QueueingEventProcessor {
97107
p := &QueueingEventProcessor{}
@@ -242,7 +252,15 @@ func (p *QueueingEventProcessor) FlushEvents() {
242252
}
243253
if batchEventCount > 0 {
244254
// TODO: figure out what to do with the error
245-
if success, _ := p.EventDispatcher.DispatchEvent(createLogEvent(batchEvent)); success {
255+
logEvent := createLogEvent(batchEvent)
256+
notificationCenter := registry.GetNotificationCenter(p.sdkKey)
257+
258+
err := notificationCenter.Send(notification.LogEvent, logEvent)
259+
260+
if err != nil {
261+
pLogger.Error("Send Log Event notification failed.", err)
262+
}
263+
if success, _ := p.EventDispatcher.DispatchEvent(logEvent); success {
246264
pLogger.Debug("Dispatched event successfully")
247265
p.Remove(batchEventCount)
248266
batchEventCount = 0
@@ -255,3 +273,33 @@ func (p *QueueingEventProcessor) FlushEvents() {
255273
}
256274
p.Mux.Unlock()
257275
}
276+
277+
// OnEventDispatch registers a handler for LogEvent notifications
278+
func (p *QueueingEventProcessor) OnEventDispatch(callback func(logEvent LogEvent)) (int, error) {
279+
notificationCenter := registry.GetNotificationCenter(p.sdkKey)
280+
281+
handler := func(payload interface{}) {
282+
if ev, ok := payload.(LogEvent); ok {
283+
callback(ev)
284+
} else {
285+
pLogger.Warning(fmt.Sprintf("Unable to convert notification payload %v into LogEventNotification", payload))
286+
}
287+
}
288+
id, err := notificationCenter.AddHandler(notification.LogEvent, handler)
289+
if err != nil {
290+
pLogger.Error("Problem with adding notification handler.", err)
291+
return 0, err
292+
}
293+
return id, nil
294+
}
295+
296+
// RemoveOnEventDispatch removes handler for LogEvent notification with given id
297+
func (p *QueueingEventProcessor) RemoveOnEventDispatch(id int) error {
298+
notificationCenter := registry.GetNotificationCenter(p.sdkKey)
299+
300+
if err := notificationCenter.RemoveHandler(id, notification.LogEvent); err != nil {
301+
pLogger.Warning("Problem with removing notification handler.")
302+
return err
303+
}
304+
return nil
305+
}

pkg/event/processor_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,40 @@ func (f *MockDispatcher) DispatchEvent(event LogEvent) (bool, error) {
7777
return true, nil
7878
}
7979

80+
func TestDefaultEventProcessor_LogEventNotification(t *testing.T) {
81+
exeCtx := utils.NewCancelableExecutionCtx()
82+
processor := NewEventProcessor(FlushInterval(100), QueueSize(100),
83+
PQ(NewInMemoryQueue(100)), PDispatcher(&MockDispatcher{Events: NewInMemoryQueue(100)}),
84+
SDKKey("fakeSDKKey"))
85+
86+
var logEvent LogEvent
87+
88+
id, _ := processor.OnEventDispatch(func(eventNotification LogEvent) {
89+
logEvent = eventNotification
90+
})
91+
processor.Start(exeCtx)
92+
93+
impression := BuildTestImpressionEvent()
94+
conversion := BuildTestConversionEvent()
95+
96+
processor.ProcessEvent(impression)
97+
processor.ProcessEvent(impression)
98+
processor.ProcessEvent(conversion)
99+
processor.ProcessEvent(conversion)
100+
101+
assert.Equal(t, 4, processor.EventsCount())
102+
103+
exeCtx.TerminateAndWait()
104+
105+
assert.NotNil(t, logEvent)
106+
assert.Equal(t, 4, len(logEvent.Event.Visitors))
107+
108+
err := processor.RemoveOnEventDispatch(id)
109+
110+
assert.Nil(t, err)
111+
112+
}
113+
80114
func TestDefaultEventProcessor_ProcessBatch(t *testing.T) {
81115
exeCtx := utils.NewCancelableExecutionCtx()
82116
processor := NewEventProcessor(FlushInterval(100), QueueSize(100),

pkg/notification/center.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ type DefaultCenter struct {
3535
func NewNotificationCenter() *DefaultCenter {
3636
decisionNotificationManager := NewAtomicManager()
3737
projectConfigUpdateNotificationManager := NewAtomicManager()
38+
processLogEventNotificationManager := NewAtomicManager()
3839
managerMap := make(map[Type]Manager)
3940
managerMap[Decision] = decisionNotificationManager
4041
managerMap[ProjectConfigUpdate] = projectConfigUpdateNotificationManager
42+
managerMap[LogEvent] = processLogEventNotificationManager
4143
return &DefaultCenter{
4244
managerMap: managerMap,
4345
}

pkg/notification/entities.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
// Package notification //
1818
package notification
1919

20-
import "github.com/optimizely/go-sdk/pkg/entities"
20+
import (
21+
"github.com/optimizely/go-sdk/pkg/entities"
22+
)
2123

2224
// Type is the type of notification
2325
type Type string
@@ -35,6 +37,8 @@ const (
3537
ABTest DecisionNotificationType = "ab-test"
3638
// Feature is used when the decision is returned as part of evaluating a feature
3739
Feature DecisionNotificationType = "feature"
40+
// LogEvent notification type
41+
LogEvent Type = "log_event_notification"
3842
)
3943

4044
// DecisionNotification is a notification triggered when a decision is made for either a feature or an experiment
@@ -49,3 +53,9 @@ type ProjectConfigUpdateNotification struct {
4953
Type Type
5054
Revision string
5155
}
56+
57+
// LogEventNotification is the notification triggered before log event is dispatched.
58+
type LogEventNotification struct {
59+
Type Type
60+
LogEvent interface{}
61+
}

0 commit comments

Comments
 (0)