1
1
import atexit
2
2
from typing import Any
3
3
4
- from datadog .dogstatsd .base import DogStatsd
4
+ from datadog import initialize
5
+ from datadog .dogstatsd .base import statsd
5
6
6
7
from .base import MetricsBackend , Tags
7
8
8
9
__all__ = ["DogStatsdMetricsBackend" ]
9
10
11
+ # Set the maximum number of packets to queue for the sender.
12
+ # How may packets to queue before blocking or dropping the packet if the packet queue is already full.
13
+ # 0 means unlimited.
14
+ SENDER_QUEUE_SIZE = 0
15
+
16
+ # Set timeout for packet queue operations, in seconds
17
+ # How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
18
+ # If set to None, wait forever.
19
+ # If set to zero drop the packet immediately if the queue is full.
20
+ SENDER_QUEUE_TIMEOUT = 0
21
+
10
22
11
23
class DogStatsdMetricsBackend (MetricsBackend ):
12
24
def __init__ (self , prefix : str | None = None , ** kwargs : Any ) -> None :
25
+ # TODO(dcramer): it'd be nice if the initialize call wasn't a global
13
26
self .tags = kwargs .pop ("tags" , None )
27
+ kwargs ["statsd_disable_buffering" ] = False
28
+
29
+ initialize (** kwargs )
30
+ statsd .disable_telemetry ()
14
31
15
- instance_kwargs : dict [str , Any ] = {
16
- "disable_telemetry" : True ,
17
- "disable_buffering" : False ,
18
- # When enabled, a background thread will be used to send metric payloads to the Agent.
19
- "disable_background_sender" : False ,
20
- }
21
- if socket_path := kwargs .get ("statsd_socket_path" ):
22
- instance_kwargs ["socket_path" ] = socket_path
23
- else :
24
- if host := kwargs .get ("statsd_host" ):
25
- instance_kwargs ["host" ] = host
26
- if port := kwargs .get ("statsd_port" ):
27
- instance_kwargs ["port" ] = int (port )
28
-
29
- self .statsd = DogStatsd (** instance_kwargs )
32
+ # When enabled, a background thread will be used to send metric payloads to the Agent.
33
+ statsd .enable_background_sender (
34
+ sender_queue_size = SENDER_QUEUE_SIZE , sender_queue_timeout = SENDER_QUEUE_TIMEOUT
35
+ )
36
+ # Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
37
+ atexit .register (statsd .wait_for_pending )
30
38
31
39
# Origin detection is enabled after 0.45 by default.
32
40
# Disable it since it silently fails.
33
41
# Ref: https://github.com/DataDog/datadogpy/issues/764
34
- self .statsd ._container_id = None
35
-
36
- # Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
37
- atexit .register (self .statsd .wait_for_pending )
38
-
42
+ statsd ._container_id = None
39
43
super ().__init__ (prefix = prefix )
40
44
41
45
def incr (
@@ -56,7 +60,7 @@ def incr(
56
60
tags ["instance" ] = instance
57
61
58
62
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
59
- self . statsd .increment (self ._get_key (key ), amount , sample_rate = sample_rate , tags = tags_list )
63
+ statsd .increment (self ._get_key (key ), amount , sample_rate = sample_rate , tags = tags_list )
60
64
61
65
def timing (
62
66
self ,
@@ -66,7 +70,6 @@ def timing(
66
70
tags : Tags | None = None ,
67
71
sample_rate : float = 1 ,
68
72
stacklevel : int = 0 ,
69
- precise : bool = False ,
70
73
) -> None :
71
74
tags = dict (tags or ())
72
75
@@ -76,12 +79,7 @@ def timing(
76
79
tags ["instance" ] = instance
77
80
78
81
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
79
- if not precise :
80
- self .statsd .timing (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
81
- else :
82
- self .statsd .distribution (
83
- self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list
84
- )
82
+ statsd .timing (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
85
83
86
84
def gauge (
87
85
self ,
@@ -101,7 +99,7 @@ def gauge(
101
99
tags ["instance" ] = instance
102
100
103
101
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
104
- self . statsd .gauge (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
102
+ statsd .gauge (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
105
103
106
104
def distribution (
107
105
self ,
@@ -126,7 +124,7 @@ def distribution(
126
124
tags ["instance" ] = instance
127
125
128
126
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
129
- self . statsd .distribution (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
127
+ statsd .distribution (self ._get_key (key ), value , sample_rate = sample_rate , tags = tags_list )
130
128
131
129
def event (
132
130
self ,
@@ -148,7 +146,7 @@ def event(
148
146
tags ["instance" ] = instance
149
147
150
148
tags_list = [f"{ k } :{ v } " for k , v in tags .items ()]
151
- self . statsd .event (
149
+ statsd .event (
152
150
title = title ,
153
151
message = message ,
154
152
alert_type = alert_type ,
0 commit comments