Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .harness/ffgolangserversdk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pipeline:
dockerfile: ff-sdk-testgrid/go/Dockerfile
context: ff-sdk-testgrid/go
buildArgs:
SDK_VERSION: v0.1.24
SDK_VERSION: v0.1.25
BUILD_MODE: local
resources:
limits:
Expand Down
69 changes: 49 additions & 20 deletions analyticsservice/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
variationValueAttribute string = "featureValue"
targetAttribute string = "target"
sdkVersionAttribute string = "SDK_VERSION"
SdkVersion string = "0.1.24"
SdkVersion string = "0.1.25"
sdkTypeAttribute string = "SDK_TYPE"
sdkType string = "server"
sdkLanguageAttribute string = "SDK_LANGUAGE"
Expand All @@ -46,6 +46,12 @@ type SafeAnalyticsCache[K comparable, V any] interface {
iterate(func(K, V))
}

// SafeSeenTargetsCache extends SafeAnalyticsCache and adds behavior specific to seen targets
type SafeSeenTargetsCache[K comparable, V any] interface {
SafeAnalyticsCache[K, V]
isLimitExceeded() bool
}

type analyticsEvent struct {
target *evaluation.Target
featureConfig *rest.FeatureConfig
Expand All @@ -55,33 +61,35 @@ type analyticsEvent struct {

// AnalyticsService provides a way to cache and send analytics to the server
type AnalyticsService struct {
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeAnalyticsCache[string, bool]
logEvaluationLimitReached atomic.Bool
logTargetLimitReached atomic.Bool
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeSeenTargetsCache[string, bool]
logEvaluationLimitReached atomic.Bool
logTargetLimitReached atomic.Bool
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
seenTargetsClearingInterval time.Duration
}

// NewAnalyticsService creates and starts a analytics service to send data to the client
func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *AnalyticsService {
func NewAnalyticsService(timeout time.Duration, logger logger.Logger, seenTargetsMaxSize int, seenTargetsClearingSchedule time.Duration) *AnalyticsService {
serviceTimeout := timeout
if timeout < 60*time.Second {
serviceTimeout = 60 * time.Second
} else if timeout > 1*time.Hour {
serviceTimeout = 1 * time.Hour
}
as := AnalyticsService{
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(),
timeout: serviceTimeout,
logger: logger,
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(seenTargetsMaxSize),
timeout: serviceTimeout,
logger: logger,
seenTargetsClearingInterval: seenTargetsClearingSchedule,
}
go as.listener()

Expand All @@ -94,6 +102,7 @@ func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.Clie
as.metricsClient = client
as.environmentID = environmentID
go as.startTimer(ctx)
go as.startSeenTargetsClearingSchedule(ctx, as.seenTargetsClearingInterval)
}

func (as *AnalyticsService) startTimer(ctx context.Context) {
Expand All @@ -103,6 +112,7 @@ func (as *AnalyticsService) startTimer(ctx context.Context) {
timeStamp := time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
as.sendDataAndResetCache(ctx, timeStamp)
case <-ctx.Done():
close(as.analyticsChan)
as.logger.Infof("%s Metrics stopped", sdk_codes.MetricsStopped)
return
}
Expand Down Expand Up @@ -149,9 +159,12 @@ func (as *AnalyticsService) listener() {
}

// Check if target has been seen
_, seen := as.seenTargets.get(ad.target.Identifier)
if _, seen := as.seenTargets.get(ad.target.Identifier); seen {
continue
}

if seen {
// Check if seen targets limit has been hit
if as.seenTargets.isLimitExceeded() {
continue
}

Expand Down Expand Up @@ -314,6 +327,22 @@ func (as *AnalyticsService) processTargetMetrics(targetAnalytics SafeAnalyticsCa
return targetData
}

func (as *AnalyticsService) startSeenTargetsClearingSchedule(ctx context.Context, clearingInterval time.Duration) {
ticker := time.NewTicker(clearingInterval)

for {
select {
case <-ticker.C:
as.logger.Debugf("Clearing seen targets")
as.seenTargets.clear()

case <-ctx.Done():
ticker.Stop()
return
}
}
}

func getEvaluationAnalyticKey(event analyticsEvent) string {
return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget)
}
2 changes: 1 addition & 1 deletion analyticsservice/analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestListenerHandlesEventsCorrectly(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
service := NewAnalyticsService(1*time.Minute, noOpLogger)
service := NewAnalyticsService(1*time.Minute, noOpLogger, 10, time.Hour)
defer close(service.analyticsChan)

// Start the listener in a goroutine
Expand Down
89 changes: 86 additions & 3 deletions analyticsservice/safe_maps_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package analyticsservice

import (
"fmt"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -81,13 +82,95 @@ func TestSafeTargetAnalytics(t *testing.T) {
}

func TestSafeSeenTargets(t *testing.T) {
s := newSafeSeenTargets()
// Initialize with a small maxSize for testing
maxSize := 3
s := newSafeSeenTargets(maxSize).(SafeSeenTargetsCache[string, bool])

testData := map[string]bool{
"target1": true,
"target21": true,
"target3": true,
"target4": true,
}

testMapOperations[string, bool](t, s, testData)
// Insert items and ensure limit is not exceeded
for key, value := range testData {
s.set(key, value)
}

if s.isLimitExceeded() {
t.Errorf("Limit should not have been exceeded yet")
}

// Add one more item to exceed the limit
s.set("target4", true)

// Ensure limitExceeded is true after exceeding the limit
if !s.isLimitExceeded() {
t.Errorf("Limit should be exceeded after adding target4")
}

// Ensure that new items are not added once the limit is exceeded
s.set("target5", true)
if _, exists := s.get("target5"); exists {
t.Errorf("target5 should not have been added as the limit was exceeded")
}

// Clear the map and ensure limit is reset
s.clear()

if s.isLimitExceeded() {
t.Errorf("Limit should have been reset after clearing the map")
}

// Add items again after clearing
s.set("target6", true)
if _, exists := s.get("target6"); !exists {
t.Errorf("target6 should have been added after clearing the map")
}

// Concurrency test
t.Run("ConcurrencyTest", func(t *testing.T) {
var wg sync.WaitGroup
concurrencyLevel := 100

// Re-initialize the map for concurrency testing
s = newSafeSeenTargets(100).(SafeSeenTargetsCache[string, bool])

// Concurrently set keys
for i := 0; i < concurrencyLevel; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "target" + fmt.Sprint(i)
s.set(key, true)
}(i)
}

// Concurrently get keys
for i := 0; i < concurrencyLevel; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := "target" + fmt.Sprint(i)
s.get(key)
}(i)
}

// Concurrently clear the map
for i := 0; i < concurrencyLevel/2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.clear()
}()
}

wg.Wait()

// Ensure the map is cleared after the concurrency operations
if s.size() > 0 {
t.Errorf("Map size should be 0 after clearing, got %d", s.size())
}
})

}
21 changes: 18 additions & 3 deletions analyticsservice/safe_seen_targets_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,32 @@ package analyticsservice

import (
"sync"
"sync/atomic"
)

type safeSeenTargets struct {
sync.RWMutex
data map[string]bool
data map[string]bool
maxSize int
limitExceeded atomic.Bool
}

func newSafeSeenTargets() SafeAnalyticsCache[string, bool] {
func newSafeSeenTargets(maxSize int) SafeSeenTargetsCache[string, bool] {
return &safeSeenTargets{
data: make(map[string]bool),
data: make(map[string]bool),
maxSize: maxSize,
}
}

func (s *safeSeenTargets) set(key string, seen bool) {
s.Lock()
defer s.Unlock()

if len(s.data) >= s.maxSize {
s.limitExceeded.Store(true)
return
}

s.data[key] = seen
}

Expand All @@ -44,6 +54,7 @@ func (s *safeSeenTargets) clear() {
s.Lock()
defer s.Unlock()
s.data = make(map[string]bool)
s.limitExceeded.Store(false)
}

func (s *safeSeenTargets) iterate(f func(string, bool)) {
Expand All @@ -53,3 +64,7 @@ func (s *safeSeenTargets) iterate(f func(string, bool)) {
f(key, value)
}
}

func (s *safeSeenTargets) isLimitExceeded() bool {
return s.limitExceeded.Load()
}
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewCfClient(sdkKey string, options ...ConfigOption) (*CfClient, error) {
opt(config)
}

analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger)
analyticsService := analyticsservice.NewAnalyticsService(time.Minute, config.Logger, config.seenTargetsMaxSize, config.seenTargetsClearInterval)

client := &CfClient{
sdkKey: sdkKey,
Expand Down
Loading
Loading