1
1
import atexit
2
2
from typing import Any
3
3
4
- from datadog import initialize
5
- from datadog .dogstatsd .base import statsd
4
+ from datadog .dogstatsd .base import DogStatsd
6
5
7
6
from .base import MetricsBackend , Tags
8
7
9
8
__all__ = ["DogStatsdMetricsBackend" ]
10
9
11
- # Set the maximum number of packets to queue for the sender.
12
- # How may packets to queue before blocking or dropping the packet if the packet queue is already full.
13
- # 0 means unlimited.
14
- SENDER_QUEUE_SIZE = 0
15
-
16
- # Set timeout for packet queue operations, in seconds
17
- # How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
18
- # If set to None, wait forever.
19
- # If set to zero drop the packet immediately if the queue is full.
20
- SENDER_QUEUE_TIMEOUT = 0
21
-
22
10
23
11
class DogStatsdMetricsBackend (MetricsBackend ):
24
12
def __init__ (self , prefix : str | None = None , ** kwargs : Any ) -> None :
25
- # TODO(dcramer): it'd be nice if the initialize call wasn't a global
26
13
self .tags = kwargs .pop ("tags" , None )
27
- kwargs ["statsd_disable_buffering" ] = False
28
-
29
- initialize (** kwargs )
30
- statsd .disable_telemetry ()
31
14
32
- # When enabled, a background thread will be used to send metric payloads to the Agent.
33
- statsd .enable_background_sender (
34
- sender_queue_size = SENDER_QUEUE_SIZE , sender_queue_timeout = SENDER_QUEUE_TIMEOUT
35
- )
36
- # Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
37
- atexit .register (statsd .wait_for_pending )
15
+ instance_kwargs : dict [str , Any ] = {
16
+ "disable_telemetry" : True ,
17
+ "disable_buffering" : False ,
18
+ # When enabled, a background thread will be used to send metric payloads to the Agent.
19
+ "disable_background_sender" : False ,
20
+ }
21
+ if socket_path := kwargs .get ("statsd_socket_path" ):
22
+ instance_kwargs ["socket_path" ] = socket_path
23
+ else :
24
+ if host := kwargs .get ("statsd_host" ):
25
+ instance_kwargs ["host" ] = host
26
+ if port := kwargs .get ("statsd_port" ):
27
+ instance_kwargs ["port" ] = int (port )
28
+
29
+ self .statsd = DogStatsd (** instance_kwargs )
38
30
39
31
# Origin detection is enabled after 0.45 by default.
40
32
# Disable it since it silently fails.
41
33
# Ref: https://github.com/DataDog/datadogpy/issues/764
42
- statsd ._container_id = None
34
+ self .statsd ._container_id = None
35
+
36
+ # Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
37
+ atexit .register (self .statsd .wait_for_pending )
38
+
43
39
super ().__init__ (prefix = prefix )
44
40
45
41
def incr (
@@ -60,7 +56,7 @@ def incr(
60
56
tags ["instance" ] = instance
61
57
62
58
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
63
- statsd .increment (self ._get_key (key ), amount , sample_rate = sample_rate , tags = tags_list )
59
+ self . statsd .increment (self ._get_key (key ), amount , sample_rate = sample_rate , tags = tags_list )
64
60
65
61
def timing (
66
62
self ,
@@ -70,6 +66,7 @@ def timing(
70
66
tags : Tags | None = None ,
71
67
sample_rate : float = 1 ,
72
68
stacklevel : int = 0 ,
69
+ precise : bool = False ,
73
70
) -> None :
74
71
tags = dict (tags or ())
75
72
@@ -79,7 +76,12 @@ def timing(
79
76
tags ["instance" ] = instance
80
77
81
78
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
82
- statsd .timing (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
79
+ if not precise :
80
+ self .statsd .timing (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
81
+ else :
82
+ self .statsd .distribution (
83
+ self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list
84
+ )
83
85
84
86
def gauge (
85
87
self ,
@@ -99,7 +101,7 @@ def gauge(
99
101
tags ["instance" ] = instance
100
102
101
103
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
102
- statsd .gauge (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
104
+ self . statsd .gauge (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
103
105
104
106
def distribution (
105
107
self ,
@@ -124,7 +126,7 @@ def distribution(
124
126
tags ["instance" ] = instance
125
127
126
128
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
127
- statsd .distribution (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
129
+ self . statsd .distribution (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
128
130
129
131
def event (
130
132
self ,
@@ -146,7 +148,7 @@ def event(
146
148
tags ["instance" ] = instance
147
149
148
150
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
149
- statsd .event (
151
+ self . statsd .event (
150
152
title = title ,
151
153
message = message ,
152
154
alert_type = alert_type ,
0 commit comments