Skip to content

Commit 511bc66

Browse files
committed
Add Rules Caching and Rules Matching Logic
1 parent 73a4d41 commit 511bc66

File tree

8 files changed

+569
-6
lines changed

8 files changed

+569
-6
lines changed

sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
require 'json'
99
require 'opentelemetry/sdk'
1010
require_relative 'sampling_rule'
11+
require_relative 'fallback_sampler'
1112
require_relative 'sampling_rule_applier'
13+
require_relative 'rule_cache'
1214
require_relative 'aws_xray_sampling_client'
1315

1416
module OpenTelemetry
@@ -57,7 +59,9 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
5759
@target_polling_jitter_millis = (rand / 10) * 1000
5860

5961
@aws_proxy_endpoint = endpoint || DEFAULT_AWS_PROXY_ENDPOINT
62+
@fallback_sampler = OpenTelemetry::Sampler::XRay::FallbackSampler.new
6063
@client_id = self.class.generate_client_id
64+
@rule_cache = OpenTelemetry::Sampler::XRay::RuleCache.new(resource)
6165

6266
@sampling_client = OpenTelemetry::Sampler::XRay::AWSXRaySamplingClient.new(@aws_proxy_endpoint)
6367

@@ -68,10 +72,25 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
6872
end
6973

7074
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
71-
OpenTelemetry::SDK::Trace::Samplers::Result.new(
72-
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
73-
tracestate: tracestate,
74-
attributes: attributes
75+
if @rule_cache.expired?
76+
OpenTelemetry.logger.debug('Rule cache is expired, so using fallback sampling strategy')
77+
return @fallback_sampler.should_sample?(
78+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
79+
)
80+
end
81+
82+
matched_rule = @rule_cache.get_matched_rule(attributes)
83+
if matched_rule
84+
return matched_rule.should_sample?(
85+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
86+
)
87+
end
88+
89+
OpenTelemetry.logger.debug(
90+
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
91+
)
92+
@fallback_sampler.should_sample?(
93+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
7594
)
7695
end
7796

@@ -113,7 +132,7 @@ def update_sampling_rules(response_object)
113132
sampling_rules << SamplingRuleApplier.new(sampling_rule)
114133
end
115134
end
116-
# TODO: Add Sampling Rules to a Rule Cache
135+
@rule_cache.update_rules(sampling_rules)
117136
else
118137
OpenTelemetry.logger.error('SamplingRuleRecords from GetSamplingRules request is not defined')
119138
end
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
11+
class FallbackSampler
12+
def initialize
13+
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
14+
end
15+
16+
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
17+
# TODO: implement and use Rate Limiting Sampler
18+
19+
@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
20+
end
21+
22+
def description
23+
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}'
24+
end
25+
end
26+
end
27+
end
28+
end
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# RuleCache stores all the Sampling Rule Appliers, each corresponding
11+
# to the user's Sampling Rules that were retrieved from AWS X-Ray
12+
class RuleCache
13+
# The cache expires 1 hour after the last refresh time.
14+
RULE_CACHE_TTL_MILLIS = 60 * 60 * 1000
15+
16+
def initialize(sampler_resource)
17+
@rule_appliers = []
18+
@sampler_resource = sampler_resource
19+
@last_updated_epoch_millis = Time.now.to_i * 1000
20+
@cache_lock = Mutex.new
21+
end
22+
23+
def expired?
24+
now_in_millis = Time.now.to_i * 1000
25+
now_in_millis > @last_updated_epoch_millis + RULE_CACHE_TTL_MILLIS
26+
end
27+
28+
def get_matched_rule(attributes)
29+
@rule_appliers.find do |rule|
30+
rule.matches?(attributes, @sampler_resource) || rule.sampling_rule.rule_name == 'Default'
31+
end
32+
end
33+
34+
def update_rules(new_rule_appliers)
35+
old_rule_appliers_map = {}
36+
37+
@cache_lock.synchronize do
38+
@rule_appliers.each do |rule|
39+
old_rule_appliers_map[rule.sampling_rule.rule_name] = rule
40+
end
41+
42+
new_rule_appliers.each_with_index do |new_rule, index|
43+
rule_name_to_check = new_rule.sampling_rule.rule_name
44+
next unless old_rule_appliers_map.key?(rule_name_to_check)
45+
46+
old_rule = old_rule_appliers_map[rule_name_to_check]
47+
new_rule_appliers[index] = old_rule if new_rule.sampling_rule.equals?(old_rule.sampling_rule)
48+
end
49+
50+
@rule_appliers = new_rule_appliers
51+
sort_rules_by_priority
52+
@last_updated_epoch_millis = Time.now.to_i * 1000
53+
end
54+
end
55+
56+
private
57+
58+
def sort_rules_by_priority
59+
@rule_appliers.sort! do |rule1, rule2|
60+
if rule1.sampling_rule.priority == rule2.sampling_rule.priority
61+
rule1.sampling_rule.rule_name < rule2.sampling_rule.rule_name ? -1 : 1
62+
else
63+
rule1.sampling_rule.priority - rule2.sampling_rule.priority
64+
end
65+
end
66+
end
67+
end
68+
end
69+
end
70+
end

sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require 'opentelemetry/sdk'
88
require 'opentelemetry-semantic_conventions'
99
require 'date'
10+
require_relative 'sampling_rule'
1011
require_relative 'statistics'
1112
require_relative 'utils'
1213

@@ -22,13 +23,88 @@ class SamplingRuleApplier
2223

2324
def initialize(sampling_rule, statistics = OpenTelemetry::Sampler::XRay::Statistics.new, target = nil)
2425
@sampling_rule = sampling_rule
26+
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(@sampling_rule.fixed_rate)
27+
28+
# TODO: Add Reservoir Sampler (Rate Limiting Sampler)
29+
30+
@reservoir_expiry_time = MAX_DATE_TIME_SECONDS
31+
@statistics = statistics
32+
end
33+
34+
def matches?(attributes, resource)
35+
http_target = nil
36+
http_url = nil
37+
http_method = nil
38+
http_host = nil
39+
40+
unless attributes.nil?
41+
http_target = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_TARGET]
42+
http_url = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_URL]
43+
http_method = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_METHOD]
44+
http_host = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_HOST]
45+
end
46+
47+
service_type = nil
48+
resource_arn = nil
49+
50+
resource_hash = resource.attribute_enumerator.to_h
51+
52+
if resource
53+
service_name = resource_hash[OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME] || ''
54+
cloud_platform = resource_hash[OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM]
55+
service_type = OpenTelemetry::Sampler::XRay::Utils::CLOUD_PLATFORM_MAPPING[cloud_platform] if cloud_platform.is_a?(String)
56+
resource_arn = get_arn(resource, attributes)
57+
end
58+
59+
if http_target.nil? && http_url.is_a?(String)
60+
begin
61+
uri = URI(http_url)
62+
http_target = uri.path.empty? ? '/' : uri.path
63+
rescue URI::InvalidURIError
64+
http_target = '/'
65+
end
66+
elsif http_target.nil? && http_url.nil?
67+
http_target = '/'
68+
end
69+
70+
OpenTelemetry::Sampler::XRay::Utils.attribute_match(attributes, @sampling_rule.attributes) &&
71+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.host, http_host) &&
72+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.http_method, http_method) &&
73+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_name, service_name) &&
74+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.url_path, http_target) &&
75+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_type, service_type) &&
76+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.resource_arn, resource_arn)
2577
end
2678

2779
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
28-
OpenTelemetry::SDK::Trace::Samplers::Result.new(
80+
# TODO: Record Sampling Statistics
81+
82+
result = OpenTelemetry::SDK::Trace::Samplers::Result.new(
2983
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
3084
tracestate: OpenTelemetry::Trace::Tracestate::DEFAULT
3185
)
86+
87+
# TODO: Apply Reservoir Sampling
88+
89+
if result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
90+
result = @fixed_rate_sampler.should_sample?(
91+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
92+
)
93+
end
94+
95+
result
96+
end
97+
98+
private
99+
100+
def get_arn(resource, attributes)
101+
resource_hash = resource.attribute_enumerator.to_h
102+
arn = resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_ECS_CONTAINER_ARN] ||
103+
resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_ECS_CLUSTER_ARN] ||
104+
resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_EKS_CLUSTER_ARN]
105+
106+
arn = attributes[OpenTelemetry::SemanticConventions::Trace::AWS_LAMBDA_INVOKED_ARN] || resource_hash[OpenTelemetry::SemanticConventions::Resource::FAAS_ID] if arn.nil?
107+
arn
32108
end
33109
end
34110
end

sampler/xray/test/aws_xray_remote_sampler_test.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
assert !sampler.instance_variable_get(:@rule_poller).nil?
2323
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
2424
assert !sampler.instance_variable_get(:@sampling_client).nil?
25+
assert !sampler.instance_variable_get(:@rule_cache).nil?
2526
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
2627
end
2728

@@ -40,6 +41,8 @@
4041
assert !sampler.instance_variable_get(:@rule_poller).nil?
4142
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
4243
assert !sampler.instance_variable_get(:@sampling_client).nil?
44+
assert !sampler.instance_variable_get(:@rule_cache).nil?
45+
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
4346
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
4447
end
4548

@@ -62,10 +65,35 @@
6265
assert !sampler.instance_variable_get(:@rule_poller).nil?
6366
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 120 * 1000)
6467
assert !sampler.instance_variable_get(:@sampling_client).nil?
68+
assert !sampler.instance_variable_get(:@rule_cache).nil?
69+
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
6570
assert_equal(sampler.instance_variable_get(:@aws_proxy_endpoint), 'abc.com')
6671
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
6772
end
6873

74+
it 'updates sampling rules and targets with pollers and should sample' do
75+
stub_request(:post, "#{TEST_URL}/GetSamplingRules")
76+
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_RULES))
77+
stub_request(:post, "#{TEST_URL}/SamplingTargets")
78+
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS))
79+
80+
resource = OpenTelemetry::SDK::Resources::Resource.create(
81+
OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME => 'test-service-name',
82+
OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
83+
)
84+
rs = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource: resource)
85+
86+
attributes = { 'abc' => '1234' }
87+
88+
sleep(1.0)
89+
test_rule_applier = rs.instance_variable_get(:@root).instance_variable_get(:@root).instance_variable_get(:@rule_cache).instance_variable_get(:@rule_appliers)[0]
90+
assert_equal 'test', test_rule_applier.instance_variable_get(:@sampling_rule).instance_variable_get(:@rule_name)
91+
assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
92+
rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision)
93+
94+
# TODO: Run more tests after updating Sampling Targets
95+
end
96+
6997
it 'generates valid client id' do
7098
client_id = OpenTelemetry::Sampler::XRay::InternalAWSXRayRemoteSampler.generate_client_id
7199
assert_match(/[0-9a-z]{24}/, client_id)
@@ -81,4 +109,15 @@
81109
expected_string = 'InternalAWSXRayRemoteSampler{aws_proxy_endpoint=127.0.0.1:2000, rule_polling_interval_millis=300000}'
82110
assert_equal(sampler.description, expected_string)
83111
end
112+
113+
def create_spans(sampled_array, thread_id, span_attributes, remote_sampler, number_of_spans)
114+
sampled = 0
115+
number_of_spans.times do
116+
sampled += 1 if remote_sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: span_attributes,
117+
links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
118+
end
119+
sampled_array[thread_id] = sampled
120+
end
121+
122+
# TODO: Run tests for Reservoir Sampling
84123
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'test_helper'
8+
9+
describe OpenTelemetry::Sampler::XRay::FallbackSampler do
10+
# TODO: Add tests for Fallback sampler when Rate Limiter is implemented
11+
12+
it 'test_to_string' do
13+
assert_equal(
14+
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}',
15+
OpenTelemetry::Sampler::XRay::FallbackSampler.new.description
16+
)
17+
end
18+
end

0 commit comments

Comments
 (0)