Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions sample/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func (s *SamplerFactory) updatePeerCounts() {
// Update goal throughput for all throughput-based dynsamplers
for dynsamplerKey, dynsamplerInstance := range s.sharedDynsamplers {
if hasThroughput, ok := dynsamplerInstance.(CanSetGoalThroughputPerSec); ok {
// Calculate new throughput based on cluster size
newThroughput := max(s.goalThroughputConfigs[dynsamplerKey]/s.peerCount, 1)
hasThroughput.SetGoalThroughputPerSec(newThroughput)
if cfg, ok := s.goalThroughputConfigs[dynsamplerKey]; ok {
// Calculate new throughput based on cluster size
newThroughput := max(cfg/s.peerCount, 1)
hasThroughput.SetGoalThroughputPerSec(newThroughput)
}
}
}
}
Expand Down Expand Up @@ -113,23 +115,29 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler {
dynsamplerKey := fmt.Sprintf("%s:totalthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForTotalThroughputSampler)
// Store goal throughput config under mutex protection
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
if c.UseClusterSize {
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
}
sampler = &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
case *config.EMAThroughputSamplerConfig:
dynsamplerKey := fmt.Sprintf("%s:emathroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForEMAThroughputSampler)
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
if c.UseClusterSize {
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
}
sampler = &EMAThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
case *config.WindowedThroughputSamplerConfig:
dynsamplerKey := fmt.Sprintf("%s:windowedthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList)
dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForWindowedThroughputSampler)
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
if c.UseClusterSize {
s.mutex.Lock()
s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec
s.mutex.Unlock()
}
sampler = &WindowedThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance}
default:
s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c)
Expand Down
67 changes: 67 additions & 0 deletions sample/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,76 @@ func TestClusterSizeUpdatesSamplers(t *testing.T) {
return throughputSampler3.dynsampler.GoalThroughputPerSec == 100
}, 2*time.Second, 50*time.Millisecond, "New sampler with 200 throughput should have 100 with 2 peers")

// Test UseClusterSize=false: throughput should NOT be divided by peer count
cm3 := makeYAML(
"General/ConfigurationVersion", 2,
)
rm3 := makeYAML(
"RulesVersion", 2,
"Samplers/__default__/DeterministicSampler/SampleRate", 1,
"Samplers/no-cluster-division/TotalThroughputSampler/GoalThroughputPerSec", 100,
"Samplers/no-cluster-division/TotalThroughputSampler/UseClusterSize", false,
"Samplers/no-cluster-division/TotalThroughputSampler/FieldList", []string{"service.name"},
)
cfg3, rules3 := createTempConfigs(t, cm3, rm3)
c3, err := getConfig([]string{"--no-validate", "--config", cfg3, "--rules_config", rules3})
assert.NoError(t, err)

// Create a new peer manager for this test case
testPeers2 := peer.NewMockPeers([]string{"peer1"}, "")

factory3 := SamplerFactory{
Config: c3,
Logger: &logger.NullLogger{},
Metrics: &metrics.NullMetrics{},
Peers: testPeers2,
}
err = factory3.Start()
require.NoError(t, err)

sampler4 := factory3.GetSamplerImplementationForKey("no-cluster-division")
require.NotNil(t, sampler4)

throughputSampler4, ok := sampler4.(*TotalThroughputSampler)
require.True(t, ok, "Expected TotalThroughputSampler")

// With 1 peer and UseClusterSize=false, throughput should be 100 (full original value)
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec)

// Add a second peer
testPeers2.UpdatePeers([]string{"peer1", "peer2"})

// Wait a bit for potential updates (though none should happen)
time.Sleep(100 * time.Millisecond)

// Throughput should STILL be 100 (not divided by 2) because UseClusterSize is false
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
"Throughput should remain 100 with 2 peers when UseClusterSize=false")

// Add a third peer
testPeers2.UpdatePeers([]string{"peer1", "peer2", "peer3"})

// Wait a bit for potential updates (though none should happen)
time.Sleep(100 * time.Millisecond)

// Throughput should STILL be 100 (not divided by 3) because UseClusterSize is false
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
"Throughput should remain 100 with 3 peers when UseClusterSize=false")

// Remove a peer (back to 2)
testPeers2.UpdatePeers([]string{"peer1", "peer2"})

// Wait a bit for potential updates (though none should happen)
time.Sleep(100 * time.Millisecond)

// Throughput should STILL be 100 because UseClusterSize is false
assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec,
"Throughput should remain 100 after removing peers when UseClusterSize=false")

// Cleanup dynsampler instances to prevent goroutine leaks
throughputSampler.dynsampler.Stop()
throughputSampler3.dynsampler.Stop()
throughputSampler4.dynsampler.Stop()
}

func BenchmarkGetSamplerImplementation(b *testing.B) {
Expand Down