Skip to content

Commit c292d77

Browse files
committed
Add Rate Limiter and Sampling Targets Poller Logic
1 parent a0b2477 commit c292d77

File tree

11 files changed

+746
-14
lines changed

11 files changed

+746
-14
lines changed

sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7-
require 'net/http'
87
require 'json'
8+
require 'net/http'
99
require 'opentelemetry/sdk'
10-
require_relative 'sampling_rule'
10+
require_relative 'aws_xray_sampling_client'
1111
require_relative 'fallback_sampler'
12-
require_relative 'sampling_rule_applier'
1312
require_relative 'rule_cache'
14-
require_relative 'aws_xray_sampling_client'
13+
require_relative 'sampling_rule'
14+
require_relative 'sampling_rule_applier'
1515

1616
module OpenTelemetry
1717
module Sampler
@@ -68,7 +68,8 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
6868
# Start the Sampling Rules poller
6969
start_sampling_rules_poller
7070

71-
# TODO: Start the Sampling Targets poller
71+
# Start the Sampling Targets poller
72+
start_sampling_targets_poller
7273
end
7374

7475
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
@@ -113,6 +114,15 @@ def start_sampling_rules_poller
113114
end
114115
end
115116

117+
def start_sampling_targets_poller
118+
@target_poller = Thread.new do
119+
loop do
120+
sleep(((@target_polling_interval * 1000) + @target_polling_jitter_millis) / 1000.0)
121+
retrieve_and_update_sampling_targets
122+
end
123+
end
124+
end
125+
116126
def retrieve_and_update_sampling_rules
117127
sampling_rules_response = @sampling_client.fetch_sampling_rules
118128
if sampling_rules_response&.body && sampling_rules_response.body != ''
@@ -125,6 +135,19 @@ def retrieve_and_update_sampling_rules
125135
OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules')
126136
end
127137

138+
def retrieve_and_update_sampling_targets
139+
request_body = {
140+
SamplingStatisticsDocuments: @rule_cache.create_sampling_statistics_documents(@client_id)
141+
}
142+
sampling_targets_response = @sampling_client.fetch_sampling_targets(request_body)
143+
if sampling_targets_response&.body && sampling_targets_response.body != ''
144+
response_body = JSON.parse(sampling_targets_response.body)
145+
update_sampling_targets(response_body)
146+
else
147+
OpenTelemetry.logger.debug('SamplingTargets Response is falsy')
148+
end
149+
end
150+
128151
def update_sampling_rules(response_object)
129152
sampling_rules = []
130153
if response_object && response_object['SamplingRuleRecords']
@@ -140,6 +163,33 @@ def update_sampling_rules(response_object)
140163
end
141164
end
142165

166+
def update_sampling_targets(response_object)
167+
if response_object && response_object['SamplingTargetDocuments']
168+
target_documents = {}
169+
170+
response_object['SamplingTargetDocuments'].each do |new_target|
171+
target_documents[new_target['RuleName']] = new_target
172+
end
173+
174+
refresh_sampling_rules, next_polling_interval = @rule_cache.update_targets(
175+
target_documents,
176+
response_object['LastRuleModification']
177+
)
178+
179+
@target_polling_interval = next_polling_interval
180+
181+
if refresh_sampling_rules
182+
OpenTelemetry.logger.debug('Performing out-of-band sampling rule polling to fetch updated rules.')
183+
@rule_poller&.kill
184+
start_sampling_rules_poller
185+
end
186+
else
187+
OpenTelemetry.logger.debug('SamplingTargetDocuments from SamplingTargets request is not defined')
188+
end
189+
rescue StandardError => e
190+
OpenTelemetry.logger.debug("Error occurred when updating Sampling Targets: #{e}")
191+
end
192+
143193
class << self
144194
def generate_client_id
145195
hex_chars = ('0'..'9').to_a + ('a'..'f').to_a

sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,24 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7+
require_relative 'rate_limiting_sampler'
8+
79
module OpenTelemetry
810
module Sampler
911
module XRay
1012
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
1113
class FallbackSampler
1214
def initialize
1315
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
16+
@rate_limiting_sampler = RateLimitingSampler.new(1)
1417
end
1518

1619
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
17-
# TODO: implement and use Rate Limiting Sampler
20+
sampling_result = @rate_limiting_sampler.should_sample?(
21+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
22+
)
23+
24+
return sampling_result if sampling_result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
1825

1926
@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
2027
end
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
11+
# If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
12+
# A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
13+
class RateLimiter
14+
def initialize(quota, max_balance_in_seconds = 1)
15+
@max_balance_millis = max_balance_in_seconds * 1000.0
16+
@quota = quota
17+
@wallet_floor_millis = Time.now.to_f * 1000
18+
# current "balance" would be `ceiling - floor`
19+
@lock = Mutex.new
20+
end
21+
22+
def take(cost = 1)
23+
return false if @quota.zero?
24+
25+
quota_per_millis = @quota / 1000.0
26+
27+
# assume divide by zero not possible
28+
cost_in_millis = cost / quota_per_millis
29+
30+
@lock.synchronize do
31+
wallet_ceiling_millis = Time.now.to_f * 1000
32+
current_balance_millis = wallet_ceiling_millis - @wallet_floor_millis
33+
current_balance_millis = [current_balance_millis, @max_balance_millis].min
34+
pending_remaining_balance_millis = current_balance_millis - cost_in_millis
35+
36+
if pending_remaining_balance_millis >= 0
37+
@wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis
38+
return true
39+
end
40+
41+
# No changes to the wallet state
42+
false
43+
end
44+
end
45+
end
46+
end
47+
end
48+
end
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require_relative 'rate_limiter'
8+
9+
module OpenTelemetry
10+
module Sampler
11+
module XRay
12+
# RateLimitingSampler is a Sampler that uses a RateLimiter to determine
13+
# if it should sample or not based on the quota balance available.
14+
class RateLimitingSampler
15+
def initialize(quota)
16+
@quota = quota
17+
@reservoir = RateLimiter.new(quota)
18+
end
19+
20+
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
21+
tracestate = OpenTelemetry::Trace.current_span(parent_context).context.tracestate
22+
if @reservoir.take(1)
23+
OpenTelemetry::SDK::Trace::Samplers::Result.new(
24+
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE,
25+
tracestate: tracestate,
26+
attributes: attributes
27+
)
28+
else
29+
OpenTelemetry::SDK::Trace::Samplers::Result.new(
30+
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
31+
tracestate: tracestate,
32+
attributes: attributes
33+
)
34+
end
35+
end
36+
37+
def to_s
38+
"RateLimitingSampler{rate limiting sampling with sampling config of #{@quota} req/sec and 0% of additional requests}"
39+
end
40+
end
41+
end
42+
end
43+
end

sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,52 @@ def update_rules(new_rule_appliers)
5353
end
5454
end
5555

56+
def create_sampling_statistics_documents(client_id)
57+
statistics_documents = []
58+
59+
@cache_lock.synchronize do
60+
@rule_appliers.each do |rule|
61+
statistics = rule.snapshot_statistics
62+
now_in_seconds = Time.now.to_i
63+
64+
sampling_statistics_doc = {
65+
ClientID: client_id,
66+
RuleName: rule.sampling_rule.rule_name,
67+
Timestamp: now_in_seconds,
68+
RequestCount: statistics.request_count,
69+
BorrowCount: statistics.borrow_count,
70+
SampledCount: statistics.sample_count
71+
}
72+
73+
statistics_documents << sampling_statistics_doc
74+
end
75+
end
76+
77+
statistics_documents
78+
end
79+
80+
def update_targets(target_documents, last_rule_modification)
81+
min_polling_interval = nil
82+
next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS
83+
84+
@cache_lock.synchronize do
85+
@rule_appliers.each_with_index do |rule, index|
86+
target = target_documents[rule.sampling_rule.rule_name]
87+
if target
88+
@rule_appliers[index] = rule.with_target(target)
89+
min_polling_interval = target['Interval'] if target['Interval'] && (min_polling_interval.nil? || min_polling_interval > target['Interval'])
90+
else
91+
OpenTelemetry.logger.debug('Invalid sampling target: missing rule name')
92+
end
93+
end
94+
95+
next_polling_interval = min_polling_interval if min_polling_interval
96+
97+
refresh_sampling_rules = last_rule_modification * 1000 > @last_updated_epoch_millis
98+
return [refresh_sampling_rules, next_polling_interval]
99+
end
100+
end
101+
56102
private
57103

58104
def sort_rules_by_priority

sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require 'date'
1010
require_relative 'sampling_rule'
1111
require_relative 'statistics'
12+
require_relative 'rate_limiting_sampler'
1213
require_relative 'utils'
1314

1415
module OpenTelemetry
@@ -26,10 +27,24 @@ def initialize(sampling_rule, statistics = OpenTelemetry::Sampler::XRay::Statist
2627
@sampling_rule = sampling_rule
2728
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(@sampling_rule.fixed_rate)
2829

29-
# TODO: Add Reservoir Sampler (Rate Limiting Sampler)
30+
@reservoir_sampler = if @sampling_rule.reservoir_size.positive?
31+
OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(1)
32+
else
33+
OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(0)
34+
end
3035

3136
@reservoir_expiry_time = MAX_DATE_TIME_SECONDS
3237
@statistics = statistics
38+
@statistics_lock = Mutex.new
39+
40+
@statistics.reset_statistics
41+
@borrowing_enabled = true
42+
43+
apply_target(target) if target
44+
end
45+
46+
def with_target(target)
47+
self.class.new(@sampling_rule, @statistics, target)
3348
end
3449

3550
def matches?(attributes, resource)
@@ -78,26 +93,63 @@ def matches?(attributes, resource)
7893
end
7994

8095
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
81-
# TODO: Record Sampling Statistics
82-
96+
has_borrowed = false
8397
result = OpenTelemetry::SDK::Trace::Samplers::Result.new(
8498
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
8599
tracestate: OpenTelemetry::Trace::Tracestate::DEFAULT
86100
)
87101

88-
# TODO: Apply Reservoir Sampling
102+
now = Time.now
103+
reservoir_expired = now >= @reservoir_expiry_time
104+
105+
unless reservoir_expired
106+
result = @reservoir_sampler.should_sample?(
107+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
108+
)
109+
has_borrowed = @borrowing_enabled && result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
110+
end
89111

90112
if result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
91113
result = @fixed_rate_sampler.should_sample?(
92114
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
93115
)
94116
end
95117

118+
@statistics_lock.synchronize do
119+
@statistics.sample_count += result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP ? 0 : 1
120+
@statistics.borrow_count += has_borrowed ? 1 : 0
121+
@statistics.request_count += 1
122+
end
123+
96124
result
97125
end
98126

127+
def snapshot_statistics
128+
@statistics_lock.synchronize do
129+
statistics_copy = @statistics.dup
130+
@statistics.reset_statistics
131+
return statistics_copy
132+
end
133+
end
134+
99135
private
100136

137+
def apply_target(target)
138+
@borrowing_enabled = false
139+
140+
@reservoir_sampler = OpenTelemetry::Sampler::XRay::RateLimitingSampler.new(target['ReservoirQuota']) if target['ReservoirQuota']
141+
142+
@reservoir_expiry_time = if target['ReservoirQuotaTTL']
143+
Time.at(target['ReservoirQuotaTTL'])
144+
else
145+
Time.now
146+
end
147+
148+
return unless target['FixedRate']
149+
150+
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(target['FixedRate'])
151+
end
152+
101153
def get_arn(resource, attributes)
102154
resource_hash = resource.attribute_enumerator.to_h
103155
arn = resource_hash[SEMCONV::Resource::AWS_ECS_CONTAINER_ARN] ||

0 commit comments

Comments
 (0)