@@ -15,43 +15,58 @@ import (
1515 "github.com/splitio/go-split-commons/v6/provisional/strategy"
1616 "github.com/splitio/go-split-commons/v6/service/api"
1717 "github.com/splitio/go-split-commons/v6/storage"
18+ "github.com/splitio/go-split-commons/v6/storage/filter"
1819 "github.com/splitio/go-split-commons/v6/storage/inmemory"
1920 "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap"
2021 "github.com/splitio/go-split-commons/v6/synchronizer"
2122 "github.com/splitio/go-split-commons/v6/synchronizer/worker/impressionscount"
2223 "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
2324 "github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
2425 "github.com/splitio/go-split-commons/v6/tasks"
26+ "github.com/splitio/go-split-commons/v6/telemetry"
2527 "github.com/splitio/go-toolkit/v5/logging"
2628)
2729
30+ const (
31+ bfExpectedElemenets = 10000000
32+ bfFalsePositiveProbability = 0.01
33+ bfCleaningPeriod = 86400 // 24 hours
34+ uniqueKeysPeriodTaskInMemory = 900 // 15 min
35+ uniqueKeysPeriodTaskRedis = 300 // 5 min
36+ impressionsCountPeriodTaskInMemory = 1800 // 30 min
37+ impressionsCountPeriodTaskRedis = 300 // 5 min
38+ impressionsBulkSizeRedis = 100
39+ )
40+
2841func setupWorkers (
2942 logger logging.LoggerInterface ,
3043 api * api.SplitAPI ,
3144 str * storages ,
3245 hc application.MonitorProducerInterface ,
3346 cfg * sdkConf.Config ,
3447 flagSetsFilter flagsets.FlagSetFilter ,
48+ md dtos.Metadata ,
49+ impComponents impComponents ,
3550) * synchronizer.Workers {
3651 return & synchronizer.Workers {
37- SplitUpdater : split .NewSplitUpdater (str .splits , api .SplitFetcher , logger , str .telemetry , hc , flagSetsFilter ),
38- SegmentUpdater : segment .NewSegmentUpdater (str .splits , str .segments , api .SegmentFetcher , logger , str .telemetry , hc ),
39- ImpressionRecorder : workers .NewImpressionsWorker (logger , str .telemetry , api .ImpressionRecorder , str .impressions , & cfg .Impressions ),
40- EventRecorder : workers .NewEventsWorker (logger , str .telemetry , api .EventRecorder , str .events , & cfg .Events ),
52+ SplitUpdater : split .NewSplitUpdater (str .splits , api .SplitFetcher , logger , str .telemetry , hc , flagSetsFilter ),
53+ SegmentUpdater : segment .NewSegmentUpdater (str .splits , str .segments , api .SegmentFetcher , logger , str .telemetry , hc ),
54+ ImpressionRecorder : workers .NewImpressionsWorker (logger , str .telemetry , api .ImpressionRecorder , str .impressions , & cfg .Impressions ),
55+ EventRecorder : workers .NewEventsWorker (logger , str .telemetry , api .EventRecorder , str .events , & cfg .Events ),
56+ ImpressionsCountRecorder : impressionscount .NewRecorderSingle (impComponents .counter , api .ImpressionRecorder , md , logger , str .telemetry ),
57+ TelemetryRecorder : telemetry .NewTelemetrySynchronizer (str .telemetry , api .TelemetryRecorder , str .splits , str .segments , logger , md , str .telemetry ),
4158 }
4259}
4360
4461func setupTasks (
4562 cfg * sdkConf.Config ,
46- str * storages ,
4763 logger logging.LoggerInterface ,
4864 workers * synchronizer.Workers ,
4965 impComponents impComponents ,
50- md dtos.Metadata ,
51- api * api.SplitAPI ,
5266) * synchronizer.SplitTasks {
5367 impCfg := cfg .Impressions
5468 evCfg := cfg .Events
69+ dummyHC := & application.Dummy {}
5570 tg := & synchronizer.SplitTasks {
5671 SplitSyncTask : tasks .NewFetchSplitsTask (workers .SplitUpdater , int (cfg .Splits .SyncPeriod .Seconds ()), logger ),
5772 SegmentSyncTask : tasks .NewFetchSegmentsTask (
@@ -60,21 +75,18 @@ func setupTasks(
6075 cfg .Segments .WorkerCount ,
6176 cfg .Segments .QueueSize ,
6277 logger ,
78+ dummyHC ,
6379 ),
64- ImpressionSyncTask : tasks .NewRecordImpressionsTask (workers .ImpressionRecorder , int (impCfg .SyncPeriod .Seconds ()), logger , 5000 ),
65- EventSyncTask : tasks .NewRecordEventsTask (workers .EventRecorder , 5000 , int (evCfg .SyncPeriod .Seconds ()), logger ),
66- TelemetrySyncTask : & NoOpTask {},
67- UniqueKeysTask : & NoOpTask {},
68- CleanFilterTask : & NoOpTask {},
69- ImpsCountConsumerTask : & NoOpTask {},
70- }
71-
72- if impCfg .Mode == "optimized" {
73- tg .ImpressionsCountSyncTask = tasks .NewRecordImpressionsCountTask (
74- impressionscount .NewRecorderSingle (impComponents .counter , api .ImpressionRecorder , md , logger , str .telemetry ),
80+ ImpressionSyncTask : tasks .NewRecordImpressionsTask (workers .ImpressionRecorder , int (impCfg .SyncPeriod .Seconds ()), logger , 5000 ),
81+ EventSyncTask : tasks .NewRecordEventsTask (workers .EventRecorder , 5000 , int (evCfg .SyncPeriod .Seconds ()), logger ),
82+ TelemetrySyncTask : & NoOpTask {},
83+ UniqueKeysTask : tasks .NewRecordUniqueKeysTask (workers .TelemetryRecorder , * impComponents .tracker , uniqueKeysPeriodTaskInMemory , logger ),
84+ CleanFilterTask : tasks .NewCleanFilterTask (* impComponents .filter , logger , bfCleaningPeriod ),
85+ ImpsCountConsumerTask : tasks .NewRecordImpressionsCountTask (
86+ workers .ImpressionsCountRecorder ,
7587 logger ,
7688 int (impCfg .CountSyncPeriod .Seconds ()),
77- )
89+ ),
7890 }
7991
8092 return tg
@@ -83,6 +95,8 @@ func setupTasks(
8395type impComponents struct {
8496 manager provisional.ImpressionManager
8597 counter * strategy.ImpressionsCounter
98+ tracker * strategy.UniqueKeysTracker
99+ filter * storage.Filter
86100}
87101
88102func setupImpressionsComponents (c * sdkConf.Impressions , telemetry storage.TelemetryRuntimeProducer ) (impComponents , error ) {
@@ -92,20 +106,28 @@ func setupImpressionsComponents(c *sdkConf.Impressions, telemetry storage.Teleme
92106 return impComponents {}, fmt .Errorf ("error building impressions observer: %w" , err )
93107 }
94108
109+ counter := strategy .NewImpressionsCounter ()
110+ bf := filter .NewBloomFilter (bfExpectedElemenets , bfFalsePositiveProbability )
111+ tracker := strategy .NewUniqueKeysTracker (bf )
112+ none := strategy .NewNoneImpl (counter , tracker , false )
113+
95114 var s strategy.ProcessStrategyInterface
96- var counter * strategy.ImpressionsCounter
97115 switch c .Mode {
98116 case conf .ImpressionsModeDebug :
99117 s = strategy .NewDebugImpl (observer , false )
100118 case conf .ImpressionsModeNone :
119+ s = none
101120 default : // optimized
102- counter = strategy .NewImpressionsCounter ()
103121 s = strategy .NewOptimizedImpl (observer , counter , telemetry , false )
104122 }
105123
124+ impManager := provisional .NewImpressionManagerImp (none , s )
125+
106126 return impComponents {
107- manager : provisional . NewImpressionManager ( s ) ,
127+ manager : impManager ,
108128 counter : counter ,
129+ tracker : & tracker ,
130+ filter : & bf ,
109131 }, nil
110132}
111133
0 commit comments