Skip to content

Commit fe6da01

Browse files
Add origin detection fields (#310)
* Add origin detection * Add external env * Add tag cardinality option * Serialize fields * Update event serializer params * Add fields to service check * Use older fakefs version to test against Ruby v2 * Ruby 2.1 compatible multiline strings * Only use old fakefs on old versions * Missed multiline * Further fixes for windows * Fix content logic * Fudge allocation test * Increase timing for macos * Try avoiding allocation * Trace the allocations * Trace initialization * Return block result * Try warming up * Origin detection doesn't need to be a class * Remove field serializer from event * Remove windows specific * Return origin detection * Remove all linux specific alloction test * Attempt to bound regex * Take substring of path. * Replace regex with parser * Ruby <= 2.3 compliance * Remove trace allocations * Only turn origin detection off for linux * Make origin detection methods private * Remove unused instance vars * Add origin fields to telemetry metrics * Add delay serializer tests * Fix field order
1 parent bfa1399 commit fe6da01

23 files changed

+1099
-45
lines changed

Gemfile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,9 @@ end
2727

2828
group :test do
2929
gem 'mocha'
30+
if RUBY_VERSION < '3.0'
31+
gem 'fakefs', '~> 0.13.3'
32+
else
33+
gem 'fakefs', '~> 3.0'
34+
end
3035
end

lib/datadog/statsd.rb

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require_relative 'statsd/udp_connection'
77
require_relative 'statsd/uds_connection'
88
require_relative 'statsd/connection_cfg'
9+
require_relative 'statsd/origin_detection'
910
require_relative 'statsd/message_buffer'
1011
require_relative 'statsd/serialization'
1112
require_relative 'statsd/sender'
@@ -82,6 +83,9 @@ def tags
8283
# @option [Float] default sample rate if not overridden
8384
# @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread
8485
# @option [Boolean] delay_serialization delays stat serialization
86+
# @option [Boolean] origin_detection is origin detection enabled
87+
# @option [String] container_id the container ID field, used for origin detection
88+
# @option [String] cardinality the default tag cardinality to use
8589
def initialize(
8690
host = nil,
8791
port = nil,
@@ -104,15 +108,32 @@ def initialize(
104108
delay_serialization: false,
105109

106110
telemetry_enable: true,
107-
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL
111+
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL,
112+
113+
origin_detection: true,
114+
container_id: nil,
115+
cardinality: nil
108116
)
109117
unless tags.nil? || tags.is_a?(Array) || tags.is_a?(Hash)
110118
raise ArgumentError, 'tags must be an array of string tags or a Hash'
111119
end
112120

113121
@namespace = namespace
114122
@prefix = @namespace ? "#{@namespace}.".freeze : nil
115-
@serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags)
123+
124+
origin_detection_enabled = origin_detection_enabled?(origin_detection)
125+
container_id = get_container_id(container_id, origin_detection_enabled)
126+
127+
external_data = sanitize(ENV['DD_EXTERNAL_ENV'])
128+
129+
@serializer = Serialization::Serializer.new(prefix: @prefix,
130+
container_id: container_id,
131+
external_data: external_data,
132+
global_tags: tags,
133+
)
134+
135+
@cardinality = cardinality || ENV['DD_CARDINALITY'] || ENV['DATADOG_CARDINALITY']
136+
116137
@sample_rate = sample_rate
117138
@delay_serialization = delay_serialization
118139

@@ -136,6 +157,10 @@ def initialize(
136157
sender_queue_size: sender_queue_size,
137158

138159
telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
160+
container_id: container_id,
161+
external_data: external_data,
162+
cardinality: @cardinality,
163+
139164
serializer: serializer
140165
)
141166
end
@@ -159,6 +184,7 @@ def self.open(*args, **kwargs)
159184
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
160185
# @option opts [Array<String>] :tags An array of tags
161186
# @option opts [Numeric] :by increment value, default 1
187+
# @option opts [String] :cardinality The tag cardinality to use
162188
# @see #count
163189
def increment(stat, opts = EMPTY_OPTIONS)
164190
opts = { sample_rate: opts } if opts.is_a?(Numeric)
@@ -174,6 +200,7 @@ def increment(stat, opts = EMPTY_OPTIONS)
174200
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
175201
# @option opts [Array<String>] :tags An array of tags
176202
# @option opts [Numeric] :by decrement value, default 1
203+
# @option opts [String] :cardinality The tag cardinality to use
177204
# @see #count
178205
def decrement(stat, opts = EMPTY_OPTIONS)
179206
opts = { sample_rate: opts } if opts.is_a?(Numeric)
@@ -189,6 +216,7 @@ def decrement(stat, opts = EMPTY_OPTIONS)
189216
# @option opts [Numeric] :sample_rate sample rate, 1 for always
190217
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
191218
# @option opts [Array<String>] :tags An array of tags
219+
# @option opts [String] :cardinality The tag cardinality to use
192220
def count(stat, count, opts = EMPTY_OPTIONS)
193221
opts = { sample_rate: opts } if opts.is_a?(Numeric)
194222
send_stats(stat, count, COUNTER_TYPE, opts)
@@ -206,6 +234,7 @@ def count(stat, count, opts = EMPTY_OPTIONS)
206234
# @option opts [Numeric] :sample_rate sample rate, 1 for always
207235
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
208236
# @option opts [Array<String>] :tags An array of tags
237+
# @option opts [String] :cardinality The tag cardinality to use
209238
# @example Report the current user count:
210239
# $statsd.gauge('user.count', User.count)
211240
def gauge(stat, value, opts = EMPTY_OPTIONS)
@@ -221,6 +250,7 @@ def gauge(stat, value, opts = EMPTY_OPTIONS)
221250
# @option opts [Numeric] :sample_rate sample rate, 1 for always
222251
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
223252
# @option opts [Array<String>] :tags An array of tags
253+
# @option opts [String] :cardinality The tag cardinality to use
224254
# @example Report the current user count:
225255
# $statsd.histogram('user.count', User.count)
226256
def histogram(stat, value, opts = EMPTY_OPTIONS)
@@ -235,6 +265,7 @@ def histogram(stat, value, opts = EMPTY_OPTIONS)
235265
# @option opts [Numeric] :sample_rate sample rate, 1 for always
236266
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
237267
# @option opts [Array<String>] :tags An array of tags
268+
# @option opts [String] :cardinality The tag cardinality to use
238269
# @example Report the current user count:
239270
# $statsd.distribution('user.count', User.count)
240271
def distribution(stat, value, opts = EMPTY_OPTIONS)
@@ -251,6 +282,7 @@ def distribution(stat, value, opts = EMPTY_OPTIONS)
251282
# @param [Hash] opts the options to create the metric with
252283
# @option opts [Numeric] :sample_rate sample rate, 1 for always
253284
# @option opts [Array<String>] :tags An array of tags
285+
# @option opts [String] :cardinality The tag cardinality to use
254286
# @example Report the time (in ms) taken to activate an account
255287
# $statsd.distribution_time('account.activate') { @account.activate! }
256288
def distribution_time(stat, opts = EMPTY_OPTIONS)
@@ -272,6 +304,7 @@ def distribution_time(stat, opts = EMPTY_OPTIONS)
272304
# @option opts [Numeric] :sample_rate sample rate, 1 for always
273305
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
274306
# @option opts [Array<String>] :tags An array of tags
307+
# @option opts [String] :cardinality The tag cardinality to use
275308
def timing(stat, ms, opts = EMPTY_OPTIONS)
276309
opts = { sample_rate: opts } if opts.is_a?(Numeric)
277310
send_stats(stat, ms, TIMING_TYPE, opts)
@@ -287,6 +320,7 @@ def timing(stat, ms, opts = EMPTY_OPTIONS)
287320
# @option opts [Numeric] :sample_rate sample rate, 1 for always
288321
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
289322
# @option opts [Array<String>] :tags An array of tags
323+
# @option opts [String] :cardinality The tag cardinality to use
290324
# @yield The operation to be timed
291325
# @see #timing
292326
# @example Report the time (in ms) taken to activate an account
@@ -307,6 +341,7 @@ def time(stat, opts = EMPTY_OPTIONS)
307341
# @option opts [Numeric] :sample_rate sample rate, 1 for always
308342
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
309343
# @option opts [Array<String>] :tags An array of tags
344+
# @option opts [String] :cardinality The tag cardinality to use
310345
# @example Record a unique visitory by id:
311346
# $statsd.set('visitors.uniques', User.id)
312347
def set(stat, value, opts = EMPTY_OPTIONS)
@@ -348,6 +383,7 @@ def service_check(name, status, opts = EMPTY_OPTIONS)
348383
# @option opts [String, nil] :alert_type ('info') Can be "error", "warning", "info" or "success".
349384
# @option opts [Boolean, false] :truncate_if_too_long (false) Truncate the event if it is too long
350385
# @option opts [Array<String>] :tags tags to be added to every metric
386+
# @option opts [String] :cardinality The tag cardinality to use
351387
# @example Report an awful event:
352388
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
353389
def event(title, text, opts = EMPTY_OPTIONS)
@@ -427,17 +463,43 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS)
427463
telemetry.sent(metrics: 1) if telemetry
428464

429465
sample_rate = opts[:sample_rate] || @sample_rate || 1
466+
cardinality = opts[:cardinality] || @cardinality
430467

431468
if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate
432469
full_stat =
433470
if @delay_serialization
434-
[stat, delta, type, opts[:tags], sample_rate]
471+
[stat, delta, type, opts[:tags], sample_rate, cardinality]
435472
else
436-
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)
473+
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate, cardinality: cardinality)
437474
end
438475

439476
forwarder.send_message(full_stat)
440477
end
441478
end
479+
480+
def origin_detection_enabled?(origin_detection)
481+
if !origin_detection.nil? && !origin_detection
482+
return false
483+
end
484+
485+
if ENV['DD_ORIGIN_DETECTION_ENABLED']
486+
return ![
487+
'0',
488+
'f',
489+
'false'
490+
].include?(
491+
ENV['DD_ORIGIN_DETECTION_ENABLED'].downcase
492+
)
493+
end
494+
495+
return true
496+
end
497+
498+
# Sanitize the DD_EXTERNAL_ENV input to ensure it doesn't contain invalid characters
499+
# that may break the protocol.
500+
# Removing any non-printable characters and `|`.
501+
def sanitize(external_data)
502+
external_data.gsub(/[^[:print:]]|`\|/, '') unless external_data.nil?
503+
end
442504
end
443505
end

lib/datadog/statsd/forwarder.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,19 @@ def initialize(
2222
single_thread: false,
2323

2424
logger: nil,
25+
container_id: nil,
26+
external_data: nil,
27+
cardinality: nil,
2528

2629
serializer:
2730
)
2831
@transport_type = connection_cfg.transport_type
2932

3033
@telemetry = if telemetry_flush_interval
3134
Telemetry.new(telemetry_flush_interval,
35+
container_id,
36+
external_data,
37+
cardinality,
3238
global_tags: global_tags,
3339
transport_type: @transport_type
3440
)

lib/datadog/statsd/message_buffer.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ def add(message)
2828
# Serializes the message if it hasn't been already. Part of the
2929
# delay_serialization feature.
3030
if message.is_a?(Array)
31-
stat, delta, type, tags, sample_rate = message
32-
message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate)
31+
stat, delta, type, tags, sample_rate, cardinality = message
32+
message = @serializer.to_stat(stat, delta, type, tags: tags, sample_rate: sample_rate, cardinality: cardinality)
3333
end
3434

3535
message_size = message.bytesize
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# frozen_string_literal: true
2+
module Datadog
3+
class Statsd
4+
private
5+
6+
CGROUPV1BASECONTROLLER = "memory"
7+
HOSTCGROUPNAMESPACEINODE = 0xEFFFFFFB
8+
9+
def host_cgroup_namespace?
10+
stat = File.stat("/proc/self/ns/cgroup") rescue nil
11+
return false unless stat
12+
stat.ino == HOSTCGROUPNAMESPACEINODE
13+
end
14+
15+
def parse_cgroup_node_path(lines)
16+
res = {}
17+
lines.split("\n").each do |line|
18+
tokens = line.split(':')
19+
next unless tokens.length == 3
20+
21+
controller = tokens[1]
22+
path = tokens[2]
23+
24+
if controller == CGROUPV1BASECONTROLLER || controller == ''
25+
res[controller] = path
26+
end
27+
end
28+
29+
res
30+
end
31+
32+
def get_cgroup_inode(cgroup_mount_path, proc_self_cgroup_path)
33+
content = File.read(proc_self_cgroup_path) rescue nil
34+
return nil unless content
35+
36+
controllers = parse_cgroup_node_path(content)
37+
38+
[CGROUPV1BASECONTROLLER, ''].each do |controller|
39+
next unless controllers[controller]
40+
41+
segments = [
42+
cgroup_mount_path.chomp('/'),
43+
controller.strip,
44+
controllers[controller].sub(/^\//, '')
45+
]
46+
path = segments.reject(&:empty?).join("/")
47+
inode = inode_for_path(path)
48+
return inode unless inode.nil?
49+
end
50+
51+
nil
52+
end
53+
54+
def inode_for_path(path)
55+
stat = File.stat(path) rescue nil
56+
return nil unless stat
57+
"in-#{stat.ino}"
58+
end
59+
60+
def parse_container_id(handle)
61+
exp_line = /^\d+:[^:]*:(.+)$/
62+
uuid = /[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}/
63+
container = /[0-9a-f]{64}/
64+
task = /[0-9a-f]{32}-\d+/
65+
exp_container_id = /(#{uuid}|#{container}|#{task})(?:\.scope)?$/
66+
67+
handle.each_line do |line|
68+
match = line.match(exp_line)
69+
next unless match && match[1]
70+
id_match = match[1].match(exp_container_id)
71+
72+
return id_match[1] if id_match && id_match[1]
73+
end
74+
75+
nil
76+
end
77+
78+
def read_container_id(fpath)
79+
handle = File.open(fpath, 'r') rescue nil
80+
return nil unless handle
81+
82+
id = parse_container_id(handle)
83+
handle.close
84+
id
85+
end
86+
87+
# Extracts the final container info from a line in mount info
88+
def extract_container_info(line)
89+
parts = line.strip.split("/")
90+
return nil unless parts.last == "hostname"
91+
92+
# Expected structure: [..., <group>, <container_id>, ..., "hostname"]
93+
container_id = nil
94+
group = nil
95+
96+
parts.each_with_index do |part, idx|
97+
# Match the container id and include the section prior to it.
98+
if part.length == 64 && !!(part =~ /\A[0-9a-f]{64}\z/)
99+
group = parts[idx - 1] if idx >= 1
100+
container_id = part
101+
elsif part.length > 32 && !!(part =~ /\A[0-9a-f]{32}-\d+\z/)
102+
group = parts[idx - 1] if idx >= 1
103+
container_id = part
104+
elsif !!(part =~ /\A[0-9a-f]{8}(-[0-9a-f]{4}){4}\z/)
105+
group = parts[idx - 1] if idx >= 1
106+
container_id = part
107+
end
108+
end
109+
110+
return container_id unless group == "sandboxes"
111+
end
112+
113+
# Parse /proc/self/mountinfo to extract the container id.
114+
# Often container runtimes embed the container id in the mount paths.
115+
# We parse the mount with a final `hostname` component, which is part of
116+
# the containers `etc/hostname` bind mount.
117+
def parse_mount_info(handle)
118+
handle.each_line do |line|
119+
split = line.split(" ")
120+
mnt1 = split[3]
121+
mnt2 = split[4]
122+
[mnt1, mnt2].each do |line|
123+
container_id = extract_container_info(line)
124+
return container_id unless container_id.nil?
125+
end
126+
end
127+
128+
nil
129+
end
130+
131+
def read_mount_info(path)
132+
handle = File.open(path, 'r') rescue nil
133+
return nil unless handle
134+
135+
info = parse_mount_info(handle)
136+
handle.close
137+
info
138+
end
139+
140+
def get_container_id(user_provided_id, cgroup_fallback)
141+
return user_provided_id unless user_provided_id.nil?
142+
return nil unless cgroup_fallback
143+
144+
container_id = read_container_id("/proc/self/cgroup")
145+
return container_id unless container_id.nil?
146+
147+
container_id = read_mount_info("/proc/self/mountinfo")
148+
return container_id unless container_id.nil?
149+
150+
return nil if host_cgroup_namespace?
151+
152+
get_cgroup_inode("/sys/fs/cgroup", "/proc/self/cgroup")
153+
end
154+
end
155+
end

0 commit comments

Comments
 (0)