Skip to content

Commit 73ecb8a

Browse files
fix: apply UseClusterSize cfg (#1786)
## Which problem is this PR solving? `UseClusterSize` should be honored when calculating total throughput Closes FINE-65 ## Short description of the changes - check UseClusterSize when calcualting throughput - add test --------- Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
1 parent 332bdb2 commit 73ecb8a

File tree

2 files changed

+96
-14
lines changed

2 files changed

+96
-14
lines changed

sample/sample.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ func (s *SamplerFactory) updatePeerCounts() {
5757
// Update goal throughput for all throughput-based dynsamplers
5858
for dynsamplerKey, dynsamplerInstance := range s.sharedDynsamplers {
5959
if hasThroughput, ok := dynsamplerInstance.(CanSetGoalThroughputPerSec); ok {
60-
// Calculate new throughput based on cluster size
61-
newThroughput := max(s.goalThroughputConfigs[dynsamplerKey]/s.peerCount, 1)
62-
hasThroughput.SetGoalThroughputPerSec(newThroughput)
60+
if cfg, ok := s.goalThroughputConfigs[dynsamplerKey]; ok {
61+
// Calculate new throughput based on cluster size
62+
newThroughput := max(cfg/s.peerCount, 1)
63+
hasThroughput.SetGoalThroughputPerSec(newThroughput)
64+
}
6365
}
6466
}
6567
}
@@ -92,7 +94,12 @@ func getSharedDynsampler[ST any, CT any](
9294
return dynsamplerInstance
9395
}
9496

95-
// createSampler creates a sampler with shared dynsamplers based on the config type
97+
// createSampler creates a sampler with shared dynsamplers based on the config type.
98+
// A unique dynsampler is created based on a composite key that includes the keyPrefix
99+
// (dataset/environment), sampler type, and configuration parameters (e.g., sample rate
100+
// and field list). This ensures that samplers with identical configurations share the
101+
// same underlying dynsampler instance, guaranteeing consistent sampling decisions across
102+
// parallel collector workers within a single Refinery instance.
96103
func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler {
97104
var sampler Sampler
98105

@@ -112,24 +119,32 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler {
112119
case *config.TotalThroughputSamplerConfig:
113120
dynsamplerKey := fmt.Sprintf("%s:totalthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
114121
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForTotalThroughputSampler)
115-
// Store goal throughput config under mutex protection
116-
s.mutex.Lock()
117-
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
118-
s.mutex.Unlock()
122+
// only track goal throughput config if we need to recalculate it later based on cluster size
123+
if c.UseClusterSize {
124+
s.mutex.Lock()
125+
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
126+
s.mutex.Unlock()
127+
}
119128
sampler = &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
120129
case *config.EMAThroughputSamplerConfig:
121130
dynsamplerKey := fmt.Sprintf("%s:emathroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
122131
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForEMAThroughputSampler)
123-
s.mutex.Lock()
124-
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
125-
s.mutex.Unlock()
132+
// only track goal throughput config if we need to recalculate it later based on cluster size
133+
if c.UseClusterSize {
134+
s.mutex.Lock()
135+
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
136+
s.mutex.Unlock()
137+
}
126138
sampler = &EMAThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
127139
case *config.WindowedThroughputSamplerConfig:
128140
dynsamplerKey := fmt.Sprintf("%s:windowedthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
129141
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForWindowedThroughputSampler)
130-
s.mutex.Lock()
131-
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
132-
s.mutex.Unlock()
142+
// only track goal throughput config if we need to recalculate it later based on cluster size
143+
if c.UseClusterSize {
144+
s.mutex.Lock()
145+
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
146+
s.mutex.Unlock()
147+
}
133148
sampler = &WindowedThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
134149
default:
135150
s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c)

sample/sample_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,9 +579,76 @@ func TestClusterSizeUpdatesSamplers(t *testing.T) {
579579
return throughputSampler3.dynsampler.GoalThroughputPerSec == 100
580580
}, 2*time.Second, 50*time.Millisecond, "New sampler with 200 throughput should have 100 with 2 peers")
581581

582+
// Test UseClusterSize=false: throughput should NOT be divided by peer count
583+
cm3 := makeYAML(
584+
"General/ConfigurationVersion", 2,
585+
)
586+
rm3 := makeYAML(
587+
"RulesVersion", 2,
588+
"Samplers/__default__/DeterministicSampler/SampleRate", 1,
589+
"Samplers/no-cluster-division/TotalThroughputSampler/GoalThroughputPerSec", 100,
590+
"Samplers/no-cluster-division/TotalThroughputSampler/UseClusterSize", false,
591+
"Samplers/no-cluster-division/TotalThroughputSampler/FieldList", []string{"service.name"},
592+
)
593+
cfg3, rules3 := createTempConfigs(t, cm3, rm3)
594+
c3, err := getConfig([]string{"--no-validate", "--config", cfg3, "--rules_config", rules3})
595+
assert.NoError(t, err)
596+
597+
// Create a new peer manager for this test case
598+
testPeers2 := peer.NewMockPeers([]string{"peer1"}, "")
599+
600+
factory3 := SamplerFactory{
601+
Config: c3,
602+
Logger: &logger.NullLogger{},
603+
Metrics: &metrics.NullMetrics{},
604+
Peers: testPeers2,
605+
}
606+
err = factory3.Start()
607+
require.NoError(t, err)
608+
609+
sampler4 := factory3.GetSamplerImplementationForKey("no-cluster-division")
610+
require.NotNil(t, sampler4)
611+
612+
throughputSampler4, ok := sampler4.(*TotalThroughputSampler)
613+
require.True(t, ok, "Expected TotalThroughputSampler")
614+
615+
// With 1 peer and UseClusterSize=false, throughput should be 100 (full original value)
616+
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec)
617+
618+
// Add a second peer
619+
testPeers2.UpdatePeers([]string{"peer1", "peer2"})
620+
621+
// Wait a bit for potential updates (though none should happen)
622+
time.Sleep(100 * time.Millisecond)
623+
624+
// Throughput should STILL be 100 (not divided by 2) because UseClusterSize is false
625+
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
626+
"Throughput should remain 100 with 2 peers when UseClusterSize=false")
627+
628+
// Add a third peer
629+
testPeers2.UpdatePeers([]string{"peer1", "peer2", "peer3"})
630+
631+
// Wait a bit for potential updates (though none should happen)
632+
time.Sleep(100 * time.Millisecond)
633+
634+
// Throughput should STILL be 100 (not divided by 3) because UseClusterSize is false
635+
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
636+
"Throughput should remain 100 with 3 peers when UseClusterSize=false")
637+
638+
// Remove a peer (back to 2)
639+
testPeers2.UpdatePeers([]string{"peer1", "peer2"})
640+
641+
// Wait a bit for potential updates (though none should happen)
642+
time.Sleep(100 * time.Millisecond)
643+
644+
// Throughput should STILL be 100 because UseClusterSize is false
645+
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
646+
"Throughput should remain 100 after removing peers when UseClusterSize=false")
647+
582648
// Cleanup dynsampler instances to prevent goroutine leaks
583649
throughputSampler.dynsampler.Stop()
584650
throughputSampler3.dynsampler.Stop()
651+
throughputSampler4.dynsampler.Stop()
585652
}
586653

587654
func TestRulesBasedSamplerDoesNotShareDynsamplersBetweenEnvironments(t *testing.T) {

0 commit comments

Comments
 (0)