Skip to content

Commit d1b332c

Browse files
feat: Add observability metrics for event queue (#223)
* feat: Add observability metrics for event queue
1 parent 7f87380 commit d1b332c

File tree

11 files changed

+272
-193
lines changed

11 files changed

+272
-193
lines changed

pkg/client/factory.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/optimizely/go-sdk/pkg/config"
2626
"github.com/optimizely/go-sdk/pkg/decision"
2727
"github.com/optimizely/go-sdk/pkg/event"
28+
"github.com/optimizely/go-sdk/pkg/metrics"
2829
"github.com/optimizely/go-sdk/pkg/registry"
2930
"github.com/optimizely/go-sdk/pkg/utils"
3031
)
@@ -41,6 +42,7 @@ type OptimizelyFactory struct {
4142
eventProcessor event.Processor
4243
userProfileService decision.UserProfileService
4344
overrideStore decision.ExperimentOverrideStore
45+
metricsRegistry metrics.Registry
4446
}
4547

4648
// OptionFunc is used to provide custom client configuration to the OptimizelyFactory.
@@ -57,6 +59,13 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
5759
return nil, errors.New("unable to instantiate client: no project config manager, SDK key, or a Datafile provided")
5860
}
5961

62+
var metricsRegistry metrics.Registry
63+
if f.metricsRegistry != nil {
64+
metricsRegistry = f.metricsRegistry
65+
} else {
66+
metricsRegistry = metrics.NewNoopRegistry()
67+
}
68+
6069
var ctx context.Context
6170
if f.ctx != nil {
6271
ctx = f.ctx
@@ -85,6 +94,7 @@ func (f OptimizelyFactory) Client(clientOptions ...OptionFunc) (*OptimizelyClien
8594
if f.eventDispatcher != nil {
8695
eventProcessorOptions = append(eventProcessorOptions, event.WithEventDispatcher(f.eventDispatcher))
8796
}
97+
eventProcessorOptions = append(eventProcessorOptions, event.WithEventDispatcherMetrics(metricsRegistry))
8898
appClient.EventProcessor = event.NewBatchEventProcessor(eventProcessorOptions...)
8999
}
90100

@@ -180,6 +190,13 @@ func WithContext(ctx context.Context) OptionFunc {
180190
}
181191
}
182192

193+
// WithMetricsRegistry allows user to pass in their own implementation of a metrics collector
194+
func WithMetricsRegistry(metricsRegistry metrics.Registry) OptionFunc {
195+
return func(f *OptimizelyFactory) {
196+
f.metricsRegistry = metricsRegistry
197+
}
198+
}
199+
183200
// StaticClient returns a client initialized with a static project config.
184201
func (f OptimizelyFactory) StaticClient() (*OptimizelyClient, error) {
185202
var configManager config.ProjectConfigManager

pkg/client/factory_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/optimizely/go-sdk/pkg/config/datafileprojectconfig"
2929
"github.com/optimizely/go-sdk/pkg/decision"
3030
"github.com/optimizely/go-sdk/pkg/event"
31+
"github.com/optimizely/go-sdk/pkg/metrics"
3132
"github.com/optimizely/go-sdk/pkg/utils"
3233

3334
"github.com/stretchr/testify/assert"
@@ -53,10 +54,6 @@ func (f *MockDispatcher) DispatchEvent(event event.LogEvent) (bool, error) {
5354
return true, nil
5455
}
5556

56-
func (f *MockDispatcher) GetMetrics() event.Metrics {
57-
return nil
58-
}
59-
6057
func TestFactoryClientReturnsDefaultClient(t *testing.T) {
6158
factory := OptimizelyFactory{}
6259

@@ -175,3 +172,16 @@ func TestClientWithEventDispatcher(t *testing.T) {
175172
dispatcher := optimizelyClient.EventProcessor.(*event.BatchEventProcessor).EventDispatcher
176173
assert.Equal(t, dispatcher, mockEventDispatcher)
177174
}
175+
176+
func TestClientMetrics(t *testing.T) {
177+
factory := OptimizelyFactory{SDKKey: "1212"}
178+
179+
metricsRegistry := metrics.NewNoopRegistry()
180+
181+
mockEventDispatcher := new(MockDispatcher)
182+
optimizelyClient, err := factory.Client(WithEventDispatcher(mockEventDispatcher), WithMetricsRegistry(metricsRegistry))
183+
assert.NoError(t, err)
184+
185+
eventProcessor := optimizelyClient.EventProcessor.(*event.BatchEventProcessor)
186+
assert.NotNil(t, eventProcessor)
187+
}

pkg/event/dispatcher.go

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/optimizely/go-sdk/pkg/logging"
27+
"github.com/optimizely/go-sdk/pkg/metrics"
2728
"github.com/optimizely/go-sdk/pkg/utils"
2829
)
2930

@@ -36,19 +37,13 @@ var dispatcherLogger = logging.GetLogger("EventDispatcher")
3637
// Dispatcher dispatches events
3738
type Dispatcher interface {
3839
DispatchEvent(event LogEvent) (bool, error)
39-
GetMetrics() Metrics
4040
}
4141

4242
// HTTPEventDispatcher is the HTTP implementation of the Dispatcher interface
4343
type HTTPEventDispatcher struct {
4444
requester *utils.HTTPRequester
4545
}
4646

47-
// GetMetrics is the metric accessor
48-
func (ed *HTTPEventDispatcher) GetMetrics() Metrics {
49-
return nil
50-
}
51-
5247
// DispatchEvent dispatches event with callback
5348
func (ed *HTTPEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
5449

@@ -77,7 +72,11 @@ type QueueEventDispatcher struct {
7772
eventFlushLock sync.Mutex
7873
Dispatcher Dispatcher
7974

80-
metrics Metrics
75+
// metrics
76+
queueSize metrics.Gauge
77+
sucessFlush metrics.Counter
78+
failFlushCounter metrics.Counter
79+
retryFlushCounter metrics.Counter
8180
}
8281

8382
// DispatchEvent queues event with callback and calls flush in a go routine.
@@ -89,17 +88,6 @@ func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
8988
return true, nil
9089
}
9190

92-
// GetMetrics is the metric accessor
93-
func (ed *QueueEventDispatcher) GetMetrics() Metrics {
94-
ed.eventFlushLock.Lock()
95-
96-
defer func() {
97-
ed.eventFlushLock.Unlock()
98-
}()
99-
return ed.metrics
100-
101-
}
102-
10391
// flush the events
10492
func (ed *QueueEventDispatcher) flushEvents() {
10593

@@ -110,12 +98,11 @@ func (ed *QueueEventDispatcher) flushEvents() {
11098
}()
11199

112100
retryCount := 0
113-
114-
ed.metrics.SetQueueSize(ed.eventQueue.Size())
101+
ed.queueSize.Set(float64(ed.eventQueue.Size()))
115102
for ed.eventQueue.Size() > 0 {
116103
if retryCount > maxRetries {
117104
dispatcherLogger.Error(fmt.Sprintf("event failed to send %d times. It will retry on next event sent", maxRetries), nil)
118-
ed.metrics.IncrFailFlushCount()
105+
ed.failFlushCounter.Add(1)
119106
break
120107
}
121108

@@ -129,7 +116,7 @@ func (ed *QueueEventDispatcher) flushEvents() {
129116
// remove it
130117
dispatcherLogger.Error("invalid type passed to event Dispatcher", nil)
131118
ed.eventQueue.Remove(1)
132-
ed.metrics.IncrFailFlushCount()
119+
ed.failFlushCounter.Add(1)
133120
continue
134121
}
135122

@@ -140,16 +127,15 @@ func (ed *QueueEventDispatcher) flushEvents() {
140127
dispatcherLogger.Debug(fmt.Sprintf("Dispatched log event %+v", event))
141128
ed.eventQueue.Remove(1)
142129
retryCount = 0
143-
ed.metrics.IncrSuccessFlushCount()
130+
ed.sucessFlush.Add(1)
144131
} else {
145132
dispatcherLogger.Warning("dispatch event failed")
146133
// we failed. Sleep some seconds and try again.
147134
time.Sleep(sleepTime)
148135
// increase retryCount. We exit if we have retried x times.
149136
// we will retry again next event that is added.
150137
retryCount++
151-
ed.metrics.IncrRetryFlushCount()
152-
138+
ed.retryFlushCounter.Add(1)
153139
}
154140
} else {
155141
dispatcherLogger.Error("Error dispatching ", err)
@@ -158,17 +144,29 @@ func (ed *QueueEventDispatcher) flushEvents() {
158144
// increase retryCount. We exit if we have retried x times.
159145
// we will retry again next event that is added.
160146
retryCount++
161-
ed.metrics.IncrRetryFlushCount()
147+
ed.retryFlushCounter.Add(1)
162148
}
163149
}
164-
ed.metrics.SetQueueSize(ed.eventQueue.Size())
150+
ed.queueSize.Set(float64(ed.eventQueue.Size()))
165151
}
166152

167153
// NewQueueEventDispatcher creates a Dispatcher that queues in memory and then sends via go routine.
168-
func NewQueueEventDispatcher() *QueueEventDispatcher {
154+
func NewQueueEventDispatcher(metricsRegistry metrics.Registry) *QueueEventDispatcher {
155+
156+
var dispatcherMetricsRegistry metrics.Registry
157+
if metricsRegistry != nil {
158+
dispatcherMetricsRegistry = metricsRegistry
159+
} else {
160+
dispatcherMetricsRegistry = metrics.NewNoopRegistry() // protective code to set
161+
}
162+
169163
return &QueueEventDispatcher{
170164
eventQueue: NewInMemoryQueue(defaultQueueSize),
171165
Dispatcher: &HTTPEventDispatcher{requester: utils.NewHTTPRequester()},
172-
metrics: &DefaultMetrics{},
166+
167+
queueSize: dispatcherMetricsRegistry.GetGauge(metrics.DispatcherQueueSize),
168+
retryFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherRetryFlush),
169+
failFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherFailedFlush),
170+
sucessFlush: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherSuccessFlush),
173171
}
174172
}

pkg/event/dispatcher_metrics_test.go

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)