From 736dad69b4792f8d3d16e4aa0ba8a5143cb12ab3 Mon Sep 17 00:00:00 2001 From: Yingrong <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 12 Feb 2026 10:17:57 -0500 Subject: [PATCH 1/2] fix: use cluster size --- sample/sample.go | 32 +++++++++++++-------- sample/sample_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/sample/sample.go b/sample/sample.go index c20476ed4b..5416c7ec72 100644 --- a/sample/sample.go +++ b/sample/sample.go @@ -57,9 +57,11 @@ func (s *SamplerFactory) updatePeerCounts() { // Update goal throughput for all throughput-based dynsamplers for dynsamplerKey, dynsamplerInstance := range s.sharedDynsamplers { if hasThroughput, ok := dynsamplerInstance.(CanSetGoalThroughputPerSec); ok { - // Calculate new throughput based on cluster size - newThroughput := max(s.goalThroughputConfigs[dynsamplerKey]/s.peerCount, 1) - hasThroughput.SetGoalThroughputPerSec(newThroughput) + if cfg, ok := s.goalThroughputConfigs[dynsamplerKey]; ok { + // Calculate new throughput based on cluster size + newThroughput := max(cfg/s.peerCount, 1) + hasThroughput.SetGoalThroughputPerSec(newThroughput) + } } } } @@ -113,23 +115,29 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler { dynsamplerKey := fmt.Sprintf("%s:totalthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForTotalThroughputSampler) // Store goal throughput config under mutex protection - s.mutex.Lock() - s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec - s.mutex.Unlock() + if c.UseClusterSize { + s.mutex.Lock() + s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec + s.mutex.Unlock() + } sampler = &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance} case *config.EMAThroughputSamplerConfig: dynsamplerKey := fmt.Sprintf("%s:emathroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForEMAThroughputSampler) - s.mutex.Lock() - s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec - s.mutex.Unlock() + if c.UseClusterSize { + s.mutex.Lock() + s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec + s.mutex.Unlock() + } sampler = &EMAThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance} case *config.WindowedThroughputSamplerConfig: dynsamplerKey := fmt.Sprintf("%s:windowedthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForWindowedThroughputSampler) - s.mutex.Lock() - s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec - s.mutex.Unlock() + if c.UseClusterSize { + s.mutex.Lock() + s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec + s.mutex.Unlock() + } sampler = &WindowedThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics, dynsampler: dynsamplerInstance} default: s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c) diff --git a/sample/sample_test.go b/sample/sample_test.go index d8886cafe5..8a6a2b54a8 100644 --- a/sample/sample_test.go +++ b/sample/sample_test.go @@ -579,9 +579,76 @@ func TestClusterSizeUpdatesSamplers(t *testing.T) { return throughputSampler3.dynsampler.GoalThroughputPerSec == 100 }, 2*time.Second, 50*time.Millisecond, "New sampler with 200 throughput should have 100 with 2 peers") + // Test UseClusterSize=false: throughput should NOT be divided by peer count + cm3 := makeYAML( + "General/ConfigurationVersion", 2, + ) + rm3 := makeYAML( + "RulesVersion", 2, + "Samplers/__default__/DeterministicSampler/SampleRate", 1, + "Samplers/no-cluster-division/TotalThroughputSampler/GoalThroughputPerSec", 100, + "Samplers/no-cluster-division/TotalThroughputSampler/UseClusterSize", false, + "Samplers/no-cluster-division/TotalThroughputSampler/FieldList", []string{"service.name"}, + ) + cfg3, rules3 := createTempConfigs(t, cm3, rm3) + c3, err := getConfig([]string{"--no-validate", "--config", cfg3, "--rules_config", rules3}) + assert.NoError(t, err) + + // Create a new peer manager for this test case + testPeers2 := peer.NewMockPeers([]string{"peer1"}, "") + + factory3 := SamplerFactory{ + Config: c3, + Logger: &logger.NullLogger{}, + Metrics: &metrics.NullMetrics{}, + Peers: testPeers2, + } + err = factory3.Start() + require.NoError(t, err) + + sampler4 := factory3.GetSamplerImplementationForKey("no-cluster-division") + require.NotNil(t, sampler4) + + throughputSampler4, ok := sampler4.(*TotalThroughputSampler) + require.True(t, ok, "Expected TotalThroughputSampler") + + // With 1 peer and UseClusterSize=false, throughput should be 100 (full original value) + assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec) + + // Add a second peer + testPeers2.UpdatePeers([]string{"peer1", "peer2"}) + + // Wait a bit for potential updates (though none should happen) + time.Sleep(100 * time.Millisecond) + + // Throughput should STILL be 100 (not divided by 2) because UseClusterSize is false + assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec, + "Throughput should remain 100 with 2 peers when UseClusterSize=false") + + // Add a third peer + testPeers2.UpdatePeers([]string{"peer1", "peer2", "peer3"}) + + // Wait a bit for potential updates (though none should happen) + time.Sleep(100 * time.Millisecond) + + // Throughput should STILL be 100 (not divided by 3) because UseClusterSize is false + assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec, + "Throughput should remain 100 with 3 peers when UseClusterSize=false") + + // Remove a peer (back to 2) + testPeers2.UpdatePeers([]string{"peer1", "peer2"}) + + // Wait a bit for potential updates (though none should happen) + time.Sleep(100 * time.Millisecond) + + // Throughput should STILL be 100 because UseClusterSize is false + assert.Equal(t, 100, throughputSampler4.dynsampler.GoalThroughputPerSec, + "Throughput should remain 100 after removing peers when UseClusterSize=false") + // Cleanup dynsampler instances to prevent goroutine leaks throughputSampler.dynsampler.Stop() throughputSampler3.dynsampler.Stop() + throughputSampler4.dynsampler.Stop() } func BenchmarkGetSamplerImplementation(b *testing.B) { From c46e8fe63b50e75b3fb973079fcf87655940b285 Mon Sep 17 00:00:00 2001 From: Yingrong <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 12 Feb 2026 14:43:17 -0500 Subject: [PATCH 2/2] add comments --- sample/sample.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sample/sample.go b/sample/sample.go index 5416c7ec72..b2552edbe6 100644 --- a/sample/sample.go +++ b/sample/sample.go @@ -94,7 +94,12 @@ func getSharedDynsampler[ST any, CT any]( return dynsamplerInstance } -// createSampler creates a sampler with shared dynsamplers based on the config type +// createSampler creates a sampler with shared dynsamplers based on the config type. +// A unique dynsampler is created based on a composite key that includes the keyPrefix +// (dataset/environment), sampler type, and configuration parameters (e.g., sample rate +// and field list). This ensures that samplers with identical configurations share the +// same underlying dynsampler instance, guaranteeing consistent sampling decisions across +// parallel collector workers within a single Refinery instance. func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler { var sampler Sampler @@ -114,7 +119,7 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler { case *config.TotalThroughputSamplerConfig: dynsamplerKey := fmt.Sprintf("%s:totalthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForTotalThroughputSampler) - // Store goal throughput config under mutex protection + // only track goal throughput config if we need to recalculate it later based on cluster size if c.UseClusterSize { s.mutex.Lock() s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec @@ -124,6 +129,7 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler { case *config.EMAThroughputSamplerConfig: dynsamplerKey := fmt.Sprintf("%s:emathroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForEMAThroughputSampler) + // only track goal throughput config if we need to recalculate it later based on cluster size if c.UseClusterSize { s.mutex.Lock() s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec @@ -133,6 +139,7 @@ func (s *SamplerFactory) createSampler(c any, keyPrefix string) Sampler { case *config.WindowedThroughputSamplerConfig: dynsamplerKey := fmt.Sprintf("%s:windowedthroughput:%d:%v", keyPrefix, c.GoalThroughputPerSec, c.FieldList) dynsamplerInstance := getSharedDynsampler(s, dynsamplerKey, c, createDynForWindowedThroughputSampler) + // only track goal throughput config if we need to recalculate it later based on cluster size if c.UseClusterSize { s.mutex.Lock() s.goalThroughputConfigs[dynsamplerKey] = c.GoalThroughputPerSec