Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b09df3b
Add origin detection
StephenWakely Jun 25, 2025
cf25b16
Add external env
StephenWakely Jun 26, 2025
7dee16a
Add tag cardinality option
StephenWakely Jun 27, 2025
c28b06a
Serialize fields
StephenWakely Jul 2, 2025
4df07b8
Update event serializer params
StephenWakely Jul 3, 2025
ff41013
Merge remote-tracking branch 'origin/master' into stephen/origin-dete…
StephenWakely Jul 4, 2025
5b48dd2
Add fields to service check
StephenWakely Jul 4, 2025
abb3b79
Use older fakefs version to test against Ruby v2
StephenWakely Jul 7, 2025
a50fafb
Ruby 2.1 compatible multiline strings
StephenWakely Jul 7, 2025
3e0ad4f
Only use old fakefs on old versions
StephenWakely Jul 7, 2025
deee154
Missed multiline
StephenWakely Jul 7, 2025
84f505a
Further fixes for windows
StephenWakely Jul 7, 2025
9c182b2
Fix content logic
StephenWakely Jul 7, 2025
6111742
Fudge allocation test
StephenWakely Jul 7, 2025
6282d13
Increase timing for macos
StephenWakely Jul 7, 2025
280b919
Try avoiding allocation
StephenWakely Jul 7, 2025
571458d
Trace the allocations
StephenWakely Jul 7, 2025
b7ed78a
Trace initialization
StephenWakely Jul 7, 2025
afb698b
Return block result
StephenWakely Jul 7, 2025
035cb88
Try warming up
StephenWakely Jul 8, 2025
4289183
Origin detection doesn't need to be a class
StephenWakely Jul 8, 2025
e7d35dd
Remove field serializer from event
StephenWakely Jul 8, 2025
922cd2e
Remove windows specific
StephenWakely Jul 8, 2025
7e1bb67
Return origin detection
StephenWakely Jul 8, 2025
dd1eb6a
Remove all linux specific alloction test
StephenWakely Jul 8, 2025
76486f8
Attempt to bound regex
StephenWakely Jul 8, 2025
166fd8b
Take substring of path.
StephenWakely Jul 8, 2025
cd89b8c
Replace regex with parser
StephenWakely Jul 8, 2025
556bfe1
Ruby <= 2.3 compliance
StephenWakely Jul 8, 2025
fcbd7f7
Remove trace allocations
StephenWakely Jul 8, 2025
29f4f08
Only turn origin detection off for linux
StephenWakely Jul 8, 2025
a4bd599
Make origin detection methods private
StephenWakely Jul 9, 2025
1c58afa
Remove unused instance vars
StephenWakely Jul 9, 2025
ecbbf3d
Add origin fields to telemetry metrics
StephenWakely Jul 10, 2025
373be59
Add delay serializer tests
StephenWakely Jul 10, 2025
cea7499
Fix field order
StephenWakely Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ end

group :test do
gem 'mocha'
gem 'fakefs', '~> 3.0'
end
66 changes: 63 additions & 3 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative 'statsd/udp_connection'
require_relative 'statsd/uds_connection'
require_relative 'statsd/connection_cfg'
require_relative 'statsd/origin_detection'
require_relative 'statsd/message_buffer'
require_relative 'statsd/serialization'
require_relative 'statsd/sender'
Expand Down Expand Up @@ -82,6 +83,9 @@ def tags
# @option [Float] default sample rate if not overridden
# @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread
# @option [Boolean] delay_serialization delays stat serialization
# @option [Boolean] origin_detection is origin detection enabled
# @option [String] container_id the container ID field, used for origin detection
# @option [String] cardinality the default tag cardinality to use
def initialize(
host = nil,
port = nil,
Expand All @@ -104,15 +108,34 @@ def initialize(
delay_serialization: false,

telemetry_enable: true,
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL,

origin_detection: true,
container_id: nil,
cardinality: nil
)
unless tags.nil? || tags.is_a?(Array) || tags.is_a?(Hash)
raise ArgumentError, 'tags must be an array of string tags or a Hash'
end

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil
@serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags)

origin_detection_enabled = origin_detection_enabled?(origin_detection)
container_id = OriginDetection
.new()
.get_container_id(container_id, origin_detection_enabled)

external_data = sanitize(ENV['DD_EXTERNAL_ENV'])

@serializer = Serialization::Serializer.new(prefix: @prefix,
container_id: container_id,
external_data: external_data,
global_tags: tags,
)

@cardinality = cardinality || ENV['DD_CARDINALITY'] || ENV['DATADOG_CARDINALITY']

@sample_rate = sample_rate
@delay_serialization = delay_serialization

Expand Down Expand Up @@ -159,6 +182,7 @@ def self.open(*args, **kwargs)
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [Numeric] :by increment value, default 1
# @option opts [String] :cardinality The tag cardinality to use
# @see #count
def increment(stat, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
Expand All @@ -174,6 +198,7 @@ def increment(stat, opts = EMPTY_OPTIONS)
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [Numeric] :by decrement value, default 1
# @option opts [String] :cardinality The tag cardinality to use
# @see #count
def decrement(stat, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
Expand All @@ -189,6 +214,7 @@ def decrement(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
def count(stat, count, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
send_stats(stat, count, COUNTER_TYPE, opts)
Expand All @@ -206,6 +232,7 @@ def count(stat, count, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -221,6 +248,7 @@ def gauge(stat, value, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.histogram('user.count', User.count)
def histogram(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -235,6 +263,7 @@ def histogram(stat, value, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the current user count:
# $statsd.distribution('user.count', User.count)
def distribution(stat, value, opts = EMPTY_OPTIONS)
Expand All @@ -251,6 +280,7 @@ def distribution(stat, value, opts = EMPTY_OPTIONS)
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Report the time (in ms) taken to activate an account
# $statsd.distribution_time('account.activate') { @account.activate! }
def distribution_time(stat, opts = EMPTY_OPTIONS)
Expand All @@ -272,6 +302,7 @@ def distribution_time(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
def timing(stat, ms, opts = EMPTY_OPTIONS)
opts = { sample_rate: opts } if opts.is_a?(Numeric)
send_stats(stat, ms, TIMING_TYPE, opts)
Expand All @@ -287,6 +318,7 @@ def timing(stat, ms, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @yield The operation to be timed
# @see #timing
# @example Report the time (in ms) taken to activate an account
Expand All @@ -307,6 +339,7 @@ def time(stat, opts = EMPTY_OPTIONS)
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Boolean] :pre_sampled If true, the client assumes the caller has already sampled metrics at :sample_rate, and doesn't perform sampling.
# @option opts [Array<String>] :tags An array of tags
# @option opts [String] :cardinality The tag cardinality to use
# @example Record a unique visitory by id:
# $statsd.set('visitors.uniques', User.id)
def set(stat, value, opts = EMPTY_OPTIONS)
Expand Down Expand Up @@ -348,6 +381,7 @@ def service_check(name, status, opts = EMPTY_OPTIONS)
# @option opts [String, nil] :alert_type ('info') Can be "error", "warning", "info" or "success".
# @option opts [Boolean, false] :truncate_if_too_long (false) Truncate the event if it is too long
# @option opts [Array<String>] :tags tags to be added to every metric
# @option opts [String] :cardinality The tag cardinality to use
# @example Report an awful event:
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
def event(title, text, opts = EMPTY_OPTIONS)
Expand Down Expand Up @@ -427,17 +461,43 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS)
telemetry.sent(metrics: 1) if telemetry

sample_rate = opts[:sample_rate] || @sample_rate || 1
cardinality = opts[:cardinality] || @cardinality

if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate
full_stat =
if @delay_serialization
[stat, delta, type, opts[:tags], sample_rate]
else
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate, cardinality: cardinality)
end

forwarder.send_message(full_stat)
end
end

def origin_detection_enabled?(origin_detection)
if !origin_detection.nil? && !origin_detection
return false
end

if ENV['DD_ORIGIN_DETECTION_ENABLED']
return ![
'0',
'f',
'false'
].include?(
ENV['DD_ORIGIN_DETECTION_ENABLED'].downcase
)
end

return true
end

# Sanitize the DD_EXTERNAL_ENV input to ensure it doesn't contain invalid characters
# that may break the protocol.
# Removing any non-printable characters and `|`.
def sanitize(external_data)
external_data.gsub(/[^[:print:]]|`\|/, '') unless external_data.nil?
end
end
end
138 changes: 138 additions & 0 deletions lib/datadog/statsd/origin_detection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
module Datadog
class Statsd
class OriginDetection
CGROUPV1BASECONTROLLER = "memory"
HOSTCGROUPNAMESPACEINODE = 0xEFFFFFFB

def get_filepaths
{
cgroup_path: "/proc/self/cgroup",
self_mount_info_path: "/proc/self/mountinfo",
default_cgroup_mount_path: "/sys/fs/cgroup"
}
end

def is_host_cgroup_namespace?
stat = File.stat("/proc/self/ns/cgroup") rescue nil
return false unless stat
stat.ino == HOSTCGROUPNAMESPACEINODE
end

def parse_cgroup_node_path(lines)
res = {}
lines.split("\n").each do |line|
tokens = line.split(':')
next unless tokens.length == 3

controller = tokens[1]
path = tokens[2]

if controller == CGROUPV1BASECONTROLLER || controller == ''
res[controller] = path
end
end

res
end

def get_cgroup_inode(cgroup_mount_path, proc_self_cgroup_path)
content = File.read(proc_self_cgroup_path)
controllers = parse_cgroup_node_path(content)

[CGROUPV1BASECONTROLLER, ''].each do |controller|
next unless controllers[controller]

segments = [
cgroup_mount_path.chomp('/'),
controller.strip,
controllers[controller].sub(/^\//, '')
]
path = segments.reject(&:empty?).join("/")
inode = inode_for_path(path)
return inode unless inode.nil?
end

nil
end

private

def inode_for_path(path)
stat = File.stat(path) rescue nil
return nil unless stat
"in-#{stat.ino}"
end

def parse_container_id(handle)
exp_line = /^\d+:[^:]*:(.+)$/
uuid = /[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}/
container = /[0-9a-f]{64}/
task = /[0-9a-f]{32}-\d+/
exp_container_id = /(#{uuid}|#{container}|#{task})(?:\.scope)?$/

handle.each_line do |line|
match = line.match(exp_line)
next unless match && match[1]
id_match = match[1].match(exp_container_id)

return id_match[1] if id_match && id_match[1]
end

nil
end

public

def read_container_id(fpath)
handle = File.open(fpath, 'r') rescue nil
return nil unless handle

id = parse_container_id(handle)
handle.close
id
end

def parse_mount_info(handle)
container_regexp = '([0-9a-f]{64})|([0-9a-f]{32}-\d+)|([0-9a-f]{8}(-[0-9a-f]{4}){4}$)'
cid_mount_info_regexp = %r{.*/([^\s/]+)/(?:#{container_regexp})/[\S]*hostname}

handle.each_line do |line|
matches = line.scan(cid_mount_info_regexp)
next if matches.empty?

match = matches.last
containerd_sandbox_prefix = "sandboxes"
if match && match[0] != containerd_sandbox_prefix
return match[1]
end
end

nil
end

def read_mount_info(path)
handle = File.open(path, 'r') rescue nil
return nil unless handle

info = parse_mount_info(handle)
handle.close
info
end

def get_container_id(user_provided_id, cgroup_fallback)
return user_provided_id unless user_provided_id.nil?
return nil unless cgroup_fallback

container_id = read_container_id("/proc/self/cgroup")
return container_id unless container_id.nil?

container_id = read_mount_info("/proc/self/mountinfo")
return container_id unless container_id.nil?

return nil if is_host_cgroup_namespace?

get_cgroup_inode("/sys/fs/cgroup", "/proc/self/cgroup")
end
end
end
end
1 change: 1 addition & 0 deletions lib/datadog/statsd/serialization.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Serialization
require_relative 'serialization/tag_serializer'
require_relative 'serialization/service_check_serializer'
require_relative 'serialization/event_serializer'
require_relative 'serialization/field_serializer'
require_relative 'serialization/stat_serializer'

require_relative 'serialization/serializer'
8 changes: 7 additions & 1 deletion lib/datadog/statsd/serialization/event_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class EventSerializer
alert_type: 't:',
}.freeze

def initialize(global_tags: [])
def initialize(container_id, external_data, global_tags: [])
@tag_serializer = TagSerializer.new(global_tags)
@field_serializer = FieldSerializer.new(container_id, external_data)
end

def format(title, text, options = EMPTY_OPTIONS)
Expand Down Expand Up @@ -47,6 +48,10 @@ def format(title, text, options = EMPTY_OPTIONS)
event << tags
end

if fields = field_serializer.format(options[:cardinality])
event << fields
end

if event.bytesize > MAX_EVENT_SIZE
if options[:truncate_if_too_long]
event.slice!(MAX_EVENT_SIZE..event.length)
Expand All @@ -59,6 +64,7 @@ def format(title, text, options = EMPTY_OPTIONS)

protected
attr_reader :tag_serializer
attr_reader :field_serializer

def escape(text)
text.delete('|').tap do |t|
Expand Down
Loading
Loading