diff --git a/sampler/xray/example/xray_sampling_on_rails_demonstration.ru b/sampler/xray/example/xray_sampling_on_rails_demonstration.ru new file mode 100644 index 000000000..5acb00de0 --- /dev/null +++ b/sampler/xray/example/xray_sampling_on_rails_demonstration.ru @@ -0,0 +1,102 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'bundler/inline' + +gemfile(true) do + source 'https://rubygems.org' + + gem 'concurrent-ruby', '1.3.4' + gem 'rails', '~> 7.0.4' + gem 'puma' + + gem 'opentelemetry-sdk' + gem 'opentelemetry-instrumentation-rails' + gem 'opentelemetry-sampler-xray', path: './../' # Use local version of the X-Ray Sampler + # gem 'opentelemetry-sampler-xray' # Use RubyGems version of the X-Ray Sampler +end + +require "action_controller/railtie" +require "action_mailer/railtie" +require "rails/test_unit/railtie" + +class App < Rails::Application + config.root = __dir__ + config.consider_all_requests_local = true + + routes.append do + root to: 'welcome#index' + get "/test" => 'welcome#test' + end +end + +class WelcomeController < ActionController::Base + def index + render inline: 'Successfully called "/" endpoint' + end + + def test + render inline: 'Successfully called "/test" endpoint' + end +end + +ENV['OTEL_TRACES_EXPORTER'] ||= 'console' +ENV['OTEL_SERVICE_NAME'] ||= 'xray-sampler-on-rails-service' + +OpenTelemetry::SDK.configure do |c| + c.use_all({ 'OpenTelemetry::Instrumentation::ActiveRecord' => { enabled: false } }) +end + +OpenTelemetry.tracer_provider.sampler = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource:OpenTelemetry::SDK::Resources::Resource.create({ + "service.name"=>"xray-sampler-on-rails-service" +})) + +App.initialize! + +run App + +#### Running and using the Sample App +# To run this example run the `rackup` command with this file +# Example: rackup xray_sampling_on_rails_demonstration.ru +# Navigate to http://localhost:9292/ +# Spans for any requests sampled by the X-Ray Sampler will appear in the console + +#### Required configuration in the OpenTelemetry Collector +# In order for sampling rules to be obtained from AWS X-Ray, the awsproxy extension +# must be configured in the OpenTelemetry Collector, which will use your AWS credentials. +# - https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/awsproxy#aws-proxy +# Without the awsproxy extension, the X-Ray Sampler will use a fallback sampler +# with a sampling strategy of "1 request/second, plus 5% of any additional requests" + +#### Testing out configurable X-Ray Sampling Rules against the "service.name" resource attribute. +# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces. +# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules + # Matching Criteria + # ServiceName = xray-sampler-on-rails-service + # ServiceType = * + # Host = * + # ResourceARN = * + # HTTPMethod = * + # URLPath = * +# For the above matching criteria, try out the following settings to sample or not sample requests +# - Limit to 0r/sec then 0 fixed rate +# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply) +# - Limit to 0r/sec then 100% fixed rate + +#### Testing out configurable X-Ray Sampling Rules against the "/test" endpoint in this sample app. +# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces. +# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules + # Matching Criteria + # ServiceName = * + # ServiceType = * + # Host = * + # ResourceARN = * + # HTTPMethod = * + # URLPath = /test +# For the above matching criteria, try out the following settings to sample or not sample requests +# - Limit to 0r/sec then 0 fixed rate +# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply) +# - Limit to 0r/sec then 100% fixed rate \ No newline at end of file diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb b/sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb index 41f051ca2..1e0f08972 100644 --- a/sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb +++ b/sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb @@ -4,14 +4,14 @@ # # SPDX-License-Identifier: Apache-2.0 -require 'net/http' require 'json' +require 'net/http' require 'opentelemetry/sdk' -require_relative 'sampling_rule' +require_relative 'aws_xray_sampling_client' require_relative 'fallback_sampler' -require_relative 'sampling_rule_applier' require_relative 'rule_cache' -require_relative 'aws_xray_sampling_client' +require_relative 'sampling_rule' +require_relative 'sampling_rule_applier' module OpenTelemetry module Sampler @@ -68,7 +68,8 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI # Start the Sampling Rules poller start_sampling_rules_poller - # TODO: Start the Sampling Targets poller + # Start the Sampling Targets poller + start_sampling_targets_poller end def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:) @@ -113,6 +114,15 @@ def start_sampling_rules_poller end end + def start_sampling_targets_poller + @target_poller = Thread.new do + loop do + sleep(((@target_polling_interval * 1000) + @target_polling_jitter_millis) / 1000.0) + retrieve_and_update_sampling_targets + end + end + end + def retrieve_and_update_sampling_rules sampling_rules_response = @sampling_client.fetch_sampling_rules if sampling_rules_response&.body && sampling_rules_response.body != '' @@ -125,6 +135,19 @@ def retrieve_and_update_sampling_rules OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules') end + def retrieve_and_update_sampling_targets + request_body = { + SamplingStatisticsDocuments: @rule_cache.create_sampling_statistics_documents(@client_id) + } + sampling_targets_response = @sampling_client.fetch_sampling_targets(request_body) + if sampling_targets_response&.body && sampling_targets_response.body != '' + response_body = JSON.parse(sampling_targets_response.body) + update_sampling_targets(response_body) + else + OpenTelemetry.logger.debug('SamplingTargets Response is falsy') + end + end + def update_sampling_rules(response_object) sampling_rules = [] if response_object && response_object['SamplingRuleRecords'] @@ -140,6 +163,33 @@ def update_sampling_rules(response_object) end end + def update_sampling_targets(response_object) + if response_object && response_object['SamplingTargetDocuments'] + target_documents = {} + + response_object['SamplingTargetDocuments'].each do |new_target| + target_documents[new_target['RuleName']] = new_target + end + + refresh_sampling_rules, next_polling_interval = @rule_cache.update_targets( + target_documents, + response_object['LastRuleModification'] + ) + + @target_polling_interval = next_polling_interval + + if refresh_sampling_rules + OpenTelemetry.logger.debug('Performing out-of-band sampling rule polling to fetch updated rules.') + @rule_poller&.kill + start_sampling_rules_poller + end + else + OpenTelemetry.logger.debug('SamplingTargetDocuments from SamplingTargets request is not defined') + end + rescue StandardError => e + OpenTelemetry.logger.debug("Error occurred when updating Sampling Targets: #{e}") + end + class << self def generate_client_id hex_chars = ('0'..'9').to_a + ('a'..'f').to_a diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb b/sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb index f25b08052..75174150a 100644 --- a/sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb +++ b/sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb @@ -4,6 +4,8 @@ # # SPDX-License-Identifier: Apache-2.0 +require_relative 'rate_limiting_sampler' + module OpenTelemetry module Sampler module XRay @@ -11,10 +13,15 @@ module XRay class FallbackSampler def initialize @fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05) + @rate_limiting_sampler = RateLimitingSampler.new(1) end def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:) - # TODO: implement and use Rate Limiting Sampler + sampling_result = @rate_limiting_sampler.should_sample?( + trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes + ) + + return sampling_result if sampling_result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP @fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes) end diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiter.rb b/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiter.rb new file mode 100644 index 000000000..6baee1dc0 --- /dev/null +++ b/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiter.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +# Copyright OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Sampler + module XRay + # RateLimiter keeps track of the current reservoir quota balance available (measured via available time) + # If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time) + # A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available. + class RateLimiter + def initialize(quota, max_balance_in_seconds = 1) + @max_balance_millis = max_balance_in_seconds * 1000.0 + @quota = quota + @wallet_floor_millis = Time.now.to_f * 1000 + # current "balance" would be `ceiling - floor` + @lock = Mutex.new + end + + def take(cost = 1) + return false if @quota <= 0 + + quota_per_millis = @quota / 1000.0 + + # assume divide by zero not possible + cost_in_millis = cost / quota_per_millis + + @lock.synchronize do + wallet_ceiling_millis = Time.now.to_f * 1000 + current_balance_millis = wallet_ceiling_millis - @wallet_floor_millis + current_balance_millis = [current_balance_millis, @max_balance_millis].min + pending_remaining_balance_millis = current_balance_millis - cost_in_millis + + if pending_remaining_balance_millis >= 0 + @wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis + return true + end + + # No changes to the wallet state + false + end + end + end + end + end +end diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiting_sampler.rb b/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiting_sampler.rb new file mode 100644 index 000000000..d5dd093b4 --- /dev/null +++ b/sampler/xray/lib/opentelemetry/sampler/xray/rate_limiting_sampler.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +# Copyright OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require_relative 'rate_limiter' + +module OpenTelemetry + module Sampler + module XRay + # RateLimitingSampler is a Sampler that uses a RateLimiter to determine + # if it should sample or not based on the quota balance available. + class RateLimitingSampler + def initialize(quota) + @quota = quota + @reservoir = RateLimiter.new(quota) + end + + def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:) + tracestate = OpenTelemetry::Trace.current_span(parent_context).context.tracestate + if @reservoir.take(1) + OpenTelemetry::SDK::Trace::Samplers::Result.new( + decision: OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE, + tracestate: tracestate, + attributes: attributes + ) + else + OpenTelemetry::SDK::Trace::Samplers::Result.new( + decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP, + tracestate: tracestate, + attributes: attributes + ) + end + end + + def to_s + "RateLimitingSampler{rate limiting sampling with sampling config of #{@quota} req/sec and 0% of additional requests}" + end + end + end + end +end diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb b/sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb index 4583fe839..b1b13f5b4 100644 --- a/sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb +++ b/sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb @@ -53,6 +53,52 @@ def update_rules(new_rule_appliers) end end + def create_sampling_statistics_documents(client_id) + statistics_documents = [] + + @cache_lock.synchronize do + @rule_appliers.each do |rule| + statistics = rule.snapshot_statistics + now_in_seconds = Time.now.to_i + + sampling_statistics_doc = { + ClientID: client_id, + RuleName: rule.sampling_rule.rule_name, + Timestamp: now_in_seconds, + RequestCount: statistics.request_count, + BorrowCount: statistics.borrow_count, + SampledCount: statistics.sample_count + } + + statistics_documents << sampling_statistics_doc + end + end + + statistics_documents + end + + def update_targets(target_documents, last_rule_modification) + min_polling_interval = nil + next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS + + @cache_lock.synchronize do + @rule_appliers.each_with_index do |rule, index| + target = target_documents[rule.sampling_rule.rule_name] + if target + @rule_appliers[index] = rule.with_target(target) + min_polling_interval = target['Interval'] if target['Interval'] && (min_polling_interval.nil? || min_polling_interval > target['Interval']) + else + OpenTelemetry.logger.debug('Invalid sampling target: missing rule name') + end + end + + next_polling_interval = min_polling_interval if min_polling_interval + + refresh_sampling_rules = last_rule_modification * 1000 > @last_updated_epoch_millis + return [refresh_sampling_rules, next_polling_interval] + end + end + private def sort_rules_by_priority diff --git a/sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb b/sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb index 0863d738c..7ace9a6aa 100644 --- a/sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb +++ b/sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb @@ -9,6 +9,7 @@ require 'date' require_relative 'sampling_rule' require_relative 'statistics' +require_relative 'rate_limiting_sampler' require_relative 'utils' module OpenTelemetry @@ -26,10 +27,24 @@ def initialize(sampling_rule, statistics = OpenTelemetry::Sampler::XRay::Statist @sampling_rule = sampling_rule @fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(@sampling_rule.fixed_rate) - # TODO: Add Reservoir Sampler (Rate Limiting Sampler) + @reservoir_sampler = if @sampling_rule.reservoir_size.positive? + OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(1) + else + OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(0) + end @reservoir_expiry_time = MAX_DATE_TIME_SECONDS @statistics = statistics + @statistics_lock = Mutex.new + + @statistics.reset_statistics + @borrowing_enabled = true + + apply_target(target) if target + end + + def with_target(target) + self.class.new(@sampling_rule, @statistics, target) end def matches?(attributes, resource) @@ -78,14 +93,21 @@ def matches?(attributes, resource) end def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:) - # TODO: Record Sampling Statistics - + has_borrowed = false result = OpenTelemetry::SDK::Trace::Samplers::Result.new( decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP, tracestate: OpenTelemetry::Trace::Tracestate::DEFAULT ) - # TODO: Apply Reservoir Sampling + now = Time.now + reservoir_expired = now >= @reservoir_expiry_time + + unless reservoir_expired + result = @reservoir_sampler.should_sample?( + trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes + ) + has_borrowed = @borrowing_enabled && result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + end if result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP result = @fixed_rate_sampler.should_sample?( @@ -93,11 +115,41 @@ def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes: ) end + @statistics_lock.synchronize do + @statistics.sample_count += result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP ? 0 : 1 + @statistics.borrow_count += has_borrowed ? 1 : 0 + @statistics.request_count += 1 + end + result end + def snapshot_statistics + @statistics_lock.synchronize do + statistics_copy = @statistics.dup + @statistics.reset_statistics + return statistics_copy + end + end + private + def apply_target(target) + @borrowing_enabled = false + + @reservoir_sampler = OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(target['ReservoirQuota']) if target['ReservoirQuota'] + + @reservoir_expiry_time = if target['ReservoirQuotaTTL'] + Time.at(target['ReservoirQuotaTTL']) + else + Time.now + end + + return unless target['FixedRate'] + + @fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(target['FixedRate']) + end + def get_arn(resource, attributes) resource_hash = resource.attribute_enumerator.to_h arn = resource_hash[SEMCONV::Resource::AWS_ECS_CONTAINER_ARN] || diff --git a/sampler/xray/test/aws_xray_remote_sampler_test.rb b/sampler/xray/test/aws_xray_remote_sampler_test.rb index 3de4299cb..e745b27a5 100644 --- a/sampler/xray/test/aws_xray_remote_sampler_test.rb +++ b/sampler/xray/test/aws_xray_remote_sampler_test.rb @@ -91,7 +91,14 @@ assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::DROP, rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) - # TODO: Run more tests after updating Sampling Targets + rs.instance_variable_get(:@root).instance_variable_get(:@root).send(:retrieve_and_update_sampling_targets) + + assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE, + rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) + assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE, + rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) + assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE, + rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) end it 'generates valid client id' do @@ -119,5 +126,88 @@ def create_spans(sampled_array, thread_id, span_attributes, remote_sampler, numb sampled_array[thread_id] = sampled end - # TODO: Run tests for Reservoir Sampling + it 'test_multithreading_with_large_reservoir' do + stub_request(:post, "http://#{TEST_URL}/GetSamplingRules") + .to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_RULES)) + stub_request(:post, "http://#{TEST_URL}/SamplingTargets") + .to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS)) + + rs = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new( + resource: OpenTelemetry::SDK::Resources::Resource.create({ + 'service.name' => 'test-service-name', + 'cloud.platform' => 'test-cloud-platform' + }) + ) + + attributes = { 'abc' => '1234' } + assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::DROP, + rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) + rs.instance_variable_get(:@root).instance_variable_get(:@root).send(:retrieve_and_update_sampling_targets) + + number_of_spans = 100 + thread_count = 100 + sampled_array = Array.new(thread_count, 0) + threads = [] + + thread_count.times do |idx| + threads << Thread.new do + create_spans(sampled_array, idx, attributes, rs, number_of_spans) + end + end + + threads.each(&:join) + sum_sampled = sampled_array.sum + + test_rule_applier = rs.instance_variable_get(:@root).instance_variable_get(:@root).instance_variable_get(:@rule_cache).instance_variable_get(:@rule_appliers)[0] + assert_equal 100_000, test_rule_applier.instance_variable_get(:@reservoir_sampler).instance_variable_get(:@quota) + assert_equal 10_000, sum_sampled + end + + it 'test_multithreading_with_some_reservoir' do + stub_request(:post, "http://#{TEST_URL}/GetSamplingRules") + .to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_RULES)) + stub_request(:post, "http://#{TEST_URL}/SamplingTargets") + .to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS)) + + rs = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new( + resource: OpenTelemetry::SDK::Resources::Resource.create({ + 'service.name' => 'test-service-name', + 'cloud.platform' => 'test-cloud-platform' + }) + ) + + attributes = { 'abc' => 'non-matching attribute value, use default rule' } + assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE, + rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision) + + rs.instance_variable_get(:@root).instance_variable_get(:@root).send(:retrieve_and_update_sampling_targets) + + # Freeze time 1.5 seconds later in the future, but there should only be 1 second worth + # of reservoir available, which amounts to 100 sampled spans in this test. + # Here we will freeze time and pretend all thread jobs start and end at the exact same time, + # given exactly 1 second of available reservoir (100 quota) only. + current_time = Time.now + Timecop.freeze(current_time + 1.5) + + number_of_spans = 100 + thread_count = 100 + sampled_array = Array.new(thread_count, 0) + threads = [] + + thread_count.times do |idx| + threads << Thread.new do + create_spans(sampled_array, idx, attributes, rs, number_of_spans) + end + end + + threads.each(&:join) + sum_sampled = sampled_array.sum + + test_rule_applier = rs.instance_variable_get(:@root).instance_variable_get(:@root).instance_variable_get(:@rule_cache).instance_variable_get(:@rule_appliers)[1] + assert_equal 100, test_rule_applier.instance_variable_get(:@reservoir_sampler).instance_variable_get(:@quota) + assert_equal 100, sum_sampled + + # Return to normal time. + Timecop.return + end end diff --git a/sampler/xray/test/fallback_sampler_test.rb b/sampler/xray/test/fallback_sampler_test.rb index 01f5224b7..a1282fb0d 100644 --- a/sampler/xray/test/fallback_sampler_test.rb +++ b/sampler/xray/test/fallback_sampler_test.rb @@ -7,7 +7,116 @@ require 'test_helper' describe OpenTelemetry::Sampler::XRay::FallbackSampler do - # TODO: Add tests for Fallback sampler when Rate Limiter is implemented + before do + # Freeze time at the current moment + @current_time = Time.now + end + + after do + # Return to normal time + Timecop.return + end + + it 'test_should_sample' do + Timecop.freeze(@current_time) + sampler = OpenTelemetry::Sampler::XRay::FallbackSampler.new + + sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, links: []) + + # 0 seconds passed, 0 quota available + sampled = 0 + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 0, sampled + + # 0.4 seconds passed, 0.4 quota available + sampled = 0 + @current_time += 0.4 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 0, sampled + + # Another 0.8 seconds passed, 1 quota available (1.2 quota capped at 1 quota), 1 quota consumed + sampled = 0 + @current_time += 0.8 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 1, sampled + + # Another 1.9 seconds passed, 1 quota available (1.9 quota capped at 1 quota), 1 quota consumed + sampled = 0 + @current_time += 1.9 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 1, sampled + + # Another 0.9 seconds passed, 0.9 quota available, 0 quota consumed + sampled = 0 + @current_time += 0.9 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 0, sampled + + # Another 2.0 seconds passed, 1 quota available (2.0 quota capped at 1 quota), 1 quota consumed + sampled = 0 + @current_time += 2.0 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 1, sampled + + # Another 2.4 seconds passed, 1 quota available (2.4 quota capped at 1 quota), 1 quota consumed + sampled = 0 + @current_time += 2.4 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 1, sampled + + # Another 100 seconds passed, 1 quota available (100 quota capped at 1 quota), 1 quota consumed + sampled = 0 + @current_time += 100 + Timecop.freeze(@current_time) + 30.times do + if sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + sampled += 1 + end + end + assert_equal 1, sampled + end it 'test_to_string' do assert_equal( diff --git a/sampler/xray/test/rate_limiter_test.rb b/sampler/xray/test/rate_limiter_test.rb new file mode 100644 index 000000000..51d3266fa --- /dev/null +++ b/sampler/xray/test/rate_limiter_test.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +# Copyright OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' +require 'timecop' + +describe OpenTelemetry::Sampler::XRay::RateLimiter do + before do + # Freeze time at the current moment + @current_time = Time.now + end + + after do + # Return to normal time + Timecop.return + end + + it 'test_take' do + Timecop.freeze(@current_time) + limiter = OpenTelemetry::Sampler::XRay::RateLimiter.new(30, 1) + + # First batch - no quota is available + spent = 0 + 100.times do + spent += 1 if limiter.take(1) + end + assert_equal 0, spent + + # Second batch - should get half the available quota after 0.5 seconds + @current_time += 0.5 + Timecop.freeze(@current_time) + spent = 0 + 100.times do + spent += 1 if limiter.take(1) + end + assert_equal 15, spent + + # Third batch - should get all the available quota after 1 second + @current_time += 1 + Timecop.freeze(@current_time) + spent = 0 + 100.times do + spent += 1 if limiter.take(1) + end + assert_equal 30, spent + end + + it 'test_take_with_zero_quota' do + limiter = OpenTelemetry::Sampler::XRay::RateLimiter.new(0, 1) + + # Zero quota should always return false + refute limiter.take(1) + end + + it 'test_take_with_negative_quota' do + limiter = OpenTelemetry::Sampler::XRay::RateLimiter.new(-5, 1) + + # Negative quota should always return false + refute limiter.take(1) + end +end diff --git a/sampler/xray/test/rate_limiting_sampler_test.rb b/sampler/xray/test/rate_limiting_sampler_test.rb new file mode 100644 index 000000000..b24bc862f --- /dev/null +++ b/sampler/xray/test/rate_limiting_sampler_test.rb @@ -0,0 +1,173 @@ +# frozen_string_literal: true + +# Copyright OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::Sampler::XRay::RateLimitingSampler do + before do + # Freeze time at the current moment + @current_time = Time.now + Timecop.freeze(@current_time) + end + + after do + # Return to normal time + Timecop.return + end + + it 'test_should_sample' do + sampler = OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(30) + + sampled = 0 + 100.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 0, sampled + + @current_time += 0.5 + Timecop.freeze(@current_time) # Move forward half a second + + sampled = 0 + 100.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 15, sampled + + @current_time += 1 + Timecop.freeze(@current_time) # Move forward 1 second + + sampled = 0 + 100.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 30, sampled + + @current_time += 2.5 + Timecop.freeze(@current_time) # Move forward 2.5 more seconds + + sampled = 0 + 100.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 30, sampled + + @current_time += 1000 + Timecop.freeze(@current_time) # Move forward 1000 seconds + + sampled = 0 + 100.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 30, sampled + end + + it 'test_should_sample_with_quota_of_one' do + sampler = OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(1) + + sampled = 0 + 50.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 0, sampled + + @current_time += 0.5 + Timecop.freeze(@current_time) # Move forward half a second + + sampled = 0 + 50.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 0, sampled + + @current_time += 0.5 + Timecop.freeze(@current_time) # Move forward another half second + + sampled = 0 + 50.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 1, sampled + + @current_time += 1000 + Timecop.freeze(@current_time) # Move forward 1000 seconds + + sampled = 0 + 50.times do + next unless sampler.should_sample?(parent_context: nil, + trace_id: '3759e988bd862e3fe1be46a994272793', + name: 'name', + kind: OpenTelemetry::Trace::SpanKind::SERVER, + attributes: {}, + links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP + + sampled += 1 + end + assert_equal 1, sampled + end + + it 'test_to_string' do + assert_equal( + 'RateLimitingSampler{rate limiting sampling with sampling config of 123 req/sec and 0% of additional requests}', + OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(123).to_s + ) + end +end diff --git a/sampler/xray/test/rule_cache_test.rb b/sampler/xray/test/rule_cache_test.rb index bad80e3fc..cc7ac1c1f 100644 --- a/sampler/xray/test/rule_cache_test.rb +++ b/sampler/xray/test/rule_cache_test.rb @@ -24,6 +24,11 @@ def create_rule(name, priority, reservoir_size, fixed_rate) OpenTelemetry::Sampler::XRay::SamplingRuleApplier.new(OpenTelemetry::Sampler::XRay::SamplingRule.new(test_sampling_rule)) end + after do + # Return to normal time + Timecop.return + end + it 'test_cache_updates_and_sorts_rules' do # Set up default rule in rule cache default_rule = create_rule('Default', 10_000, 1, 0.05) @@ -55,12 +60,13 @@ def create_rule(name, priority, reservoir_size, fixed_rate) end it 'test_rule_cache_expiration_logic' do - Timecop.freeze(Time.now) do + current_time = Time.now + Timecop.freeze(current_time) do default_rule = create_rule('Default', 10_000, 1, 0.05) cache = OpenTelemetry::Sampler::XRay::RuleCache.new(OpenTelemetry::SDK::Resources::Resource.create({})) cache.update_rules([default_rule]) - Timecop.travel(2 * 60 * 60) # Travel 2 hours into the future + Timecop.freeze(current_time + (2 * 60 * 60)) # Travel 2 hours into the future assert cache.expired? end end @@ -109,5 +115,86 @@ def create_rule(name, priority, reservoir_size, fixed_rate) assert_equal 'second_rule', rule_appliers[0].sampling_rule.rule_name end - # TODO: Add tests for updating Sampling Targets and getting statistics + it 'test_update_sampling_targets' do + rule1 = create_rule('default', 10_000, 1, 0.05) + rule2 = create_rule('test', 20, 10, 0.2) + cache = OpenTelemetry::Sampler::XRay::RuleCache.new(OpenTelemetry::SDK::Resources::Resource.create({})) + cache.update_rules([rule1, rule2]) + + time = Time.now.to_i + target1 = { + 'FixedRate' => 0.05, + 'Interval' => 15, + 'ReservoirQuota' => 1, + 'ReservoirQuotaTTL' => time + 10, + 'RuleName' => 'default' + } + target2 = { + 'FixedRate' => 0.15, + 'Interval' => 12, + 'ReservoirQuota' => 5, + 'ReservoirQuotaTTL' => time + 10, + 'RuleName' => 'test' + } + target3 = { + 'FixedRate' => 0.15, + 'Interval' => 3, + 'ReservoirQuota' => 5, + 'ReservoirQuotaTTL' => time + 10, + 'RuleName' => 'associated rule does not exist' + } + + target_map = { + 'default' => target1, + 'test' => target2, + 'associated rule does not exist' => target3 + } + + refresh_sampling_rules, next_polling_interval = cache.update_targets(target_map, time - 10) + refute refresh_sampling_rules + assert_equal target2['Interval'], next_polling_interval + + rule_appliers = cache.instance_variable_get(:@rule_appliers) + assert_equal 2, rule_appliers.length + + refresh_sampling_rules_after, = cache.update_targets(target_map, time + 1) + assert refresh_sampling_rules_after + end + + it 'test_get_all_statistics' do + current_time = Time.now + Timecop.freeze(current_time) do + rule1 = create_rule('test', 4, 2, 2.0) + rule2 = create_rule('default', 5, 5, 5.0) + + cache = OpenTelemetry::Sampler::XRay::RuleCache.new(OpenTelemetry::SDK::Resources::Resource.create) + cache.update_rules([rule1, rule2]) + + Timecop.freeze(current_time + 0.001) # Travel 1ms into the future + + client_id = '12345678901234567890abcd' + statistics = cache.create_sampling_statistics_documents(client_id) + + expected_statistics = [ + { + ClientID: client_id, + RuleName: 'test', + Timestamp: Time.now.to_i, + RequestCount: 0, + BorrowCount: 0, + SampledCount: 0 + }, + { + ClientID: client_id, + RuleName: 'default', + Timestamp: Time.now.to_i, + RequestCount: 0, + BorrowCount: 0, + SampledCount: 0 + } + ] + + assert_equal expected_statistics, statistics + end + end end