Skip to content

Commit 9220c5e

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
1 parent 362e187 commit 9220c5e

File tree

3 files changed

+172
-24
lines changed

3 files changed

+172
-24
lines changed

datadog/dogstatsd/base.py

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import os
1414
import socket
1515
import errno
16+
import struct
1617
import threading
1718
import time
1819
from threading import Lock, RLock
@@ -49,6 +50,11 @@
4950
DEFAULT_HOST = "localhost"
5051
DEFAULT_PORT = 8125
5152

53+
# Socket prefixes
54+
UNIX_ADDRESS_SCHEME = "unix://"
55+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
56+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
57+
5258
# Buffering-related values (in seconds)
5359
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
5460
MIN_FLUSH_INTERVAL = 0.0001
@@ -489,6 +495,30 @@ def socket_path(self, path):
489495
self._transport = "uds"
490496
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
491497

498+
@property
499+
def socket(self):
500+
return self._socket
501+
502+
@socket.setter
503+
def socket(self, new_socket):
504+
self._socket = new_socket
505+
if new_socket:
506+
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
507+
else:
508+
self._socket_kind = None
509+
510+
@property
511+
def telemetry_socket(self):
512+
return self._telemetry_socket
513+
514+
@telemetry_socket.setter
515+
def telemetry_socket(self, t_socket):
516+
self._telemetry_socket = t_socket
517+
if t_socket:
518+
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
519+
else:
520+
self._telemetry_socket_kind = None
521+
492522
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
493523
"""
494524
Use a background thread to communicate with the dogstatsd server.
@@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731761

732762
@classmethod
733763
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
764+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
765+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
766+
valid_socket_kinds = [socket.SOCK_DGRAM]
767+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
768+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
769+
valid_socket_kinds = [socket.SOCK_STREAM]
770+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
771+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
772+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
773+
774+
last_error = ValueError("Invalid socket path")
775+
for socket_kind in valid_socket_kinds:
776+
# py2 stores socket kinds differently than py3, determine the name independently from version
777+
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]
778+
779+
try:
780+
sock = socket.socket(socket.AF_UNIX, socket_kind)
781+
sock.settimeout(timeout)
782+
cls._ensure_min_send_buffer_size(sock)
783+
sock.connect(socket_path)
784+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
785+
return sock
786+
except Exception as e:
787+
if sock is not None:
788+
sock.close()
789+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
790+
if e.errno == errno.EPROTOTYPE:
791+
last_error = e
792+
continue
793+
raise e
794+
raise last_error
739795

740796
@classmethod
741797
def _get_udp_socket(cls, host, port, timeout):
@@ -1219,11 +1275,16 @@ def _xmit_packet(self, packet, is_telemetry):
12191275
try:
12201276
if is_telemetry and self._dedicated_telemetry_destination():
12211277
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
1278+
socket_kind = self._telemetry_socket_kind
12221279
else:
12231280
# If set, use socket directly
12241281
mysocket = self.socket or self.get_socket()
1282+
socket_kind = self._socket_kind
12251283

1226-
mysocket.send(packet.encode(self.encoding))
1284+
encoded_packet = packet.encode(self.encoding)
1285+
if socket_kind == socket.SOCK_STREAM:
1286+
mysocket.send(struct.pack('<I', len(encoded_packet)))
1287+
mysocket.send(encoded_packet)
12271288

12281289
if not is_telemetry and self._telemetry:
12291290
self.packets_sent += 1
@@ -1256,7 +1317,7 @@ def _xmit_packet(self, packet, is_telemetry):
12561317
)
12571318
self.close_socket()
12581319
except Exception as exc:
1259-
print("Unexpected error: %s", exc)
1320+
print("Unexpected error: ", exc)
12601321
log.error("Unexpected error: %s", str(exc))
12611322

12621323
if not is_telemetry and self._telemetry:

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import tempfile
37
from threading import Thread
8+
import uuid
49

510
import pytest
611

712
from datadog.dogstatsd.base import DogStatsd
813

914
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
15+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
16+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1217
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
18+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1419
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
20+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1621
statsd = DogStatsd(
1722
telemetry_min_flush_interval=0,
1823
disable_background_sender=disable_background_sender,
@@ -101,3 +106,41 @@ def test_buffering_with_context():
101106
bar.settimeout(5)
102107
msg = bar.recv(8192)
103108
assert msg == b"first:1|c\n"
109+
110+
@pytest.fixture()
111+
def socket_dir():
112+
tempdir = tempfile.mkdtemp()
113+
yield tempdir
114+
shutil.rmtree(tempdir)
115+
116+
@pytest.mark.parametrize(
117+
"socket_prefix, socket_kind, success",
118+
[
119+
("", socket.SOCK_DGRAM, True),
120+
("", socket.SOCK_STREAM, True),
121+
("unix://", socket.SOCK_DGRAM, True),
122+
("unix://", socket.SOCK_STREAM, True),
123+
("unixstream://", socket.SOCK_DGRAM, False),
124+
("unixstream://", socket.SOCK_STREAM, True),
125+
("unixgram://", socket.SOCK_DGRAM, True),
126+
("unixgram://", socket.SOCK_STREAM, False)
127+
]
128+
)
129+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
130+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
131+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
132+
listener_socket.bind(socket_path)
133+
134+
if socket_kind == socket.SOCK_STREAM:
135+
listener_socket.listen(1)
136+
137+
with closing(listener_socket):
138+
statsd = DogStatsd(
139+
socket_path = socket_prefix + socket_path
140+
)
141+
142+
if success:
143+
assert statsd.get_socket() is not None
144+
else:
145+
with pytest.raises(socket.error):
146+
statsd.get_socket()

tests/unit/dogstatsd/test_statsd.py

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# Standard libraries
1111
from collections import deque
1212
from contextlib import closing
13+
import struct
1314
from threading import Thread
1415
import errno
1516
import os
@@ -41,11 +42,12 @@ class FakeSocket(object):
4142

4243
FLUSH_GRACE_PERIOD = 0.2
4344

44-
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
45+
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, socket_kind=socket.SOCK_DGRAM):
4546
self.payloads = deque()
4647

4748
self._flush_interval = flush_interval
4849
self._flush_wait = False
50+
self._socket_kind = socket_kind
4951
self.timeout = () # unit tuple = settimeout was not called
5052

5153
def send(self, payload):
@@ -64,17 +66,29 @@ def recv(self, count=1, reset_wait=False, no_wait=False):
6466
time.sleep(self._flush_interval+self.FLUSH_GRACE_PERIOD)
6567
self._flush_wait = True
6668

67-
if count > len(self.payloads):
69+
payload_len = len(self.payloads)
70+
if self._socket_kind == socket.SOCK_STREAM:
71+
if payload_len % 2 != 0 or count > (payload_len / 2):
72+
return None
73+
elif count > len(self.payloads):
6874
return None
6975

7076
out = []
7177
for _ in range(count):
72-
out.append(self.payloads.popleft().decode('utf-8'))
78+
if self._socket_kind == socket.SOCK_DGRAM:
79+
out.append(self.payloads.popleft().decode('utf-8'))
80+
else:
81+
length = struct.unpack('<I', self.payloads.popleft())[0]
82+
pl = self.payloads.popleft()[:length].decode('utf-8')
83+
out.append(pl)
7384
return '\n'.join(out)
7485

7586
def close(self):
7687
pass
7788

89+
def getsockopt(self, *args):
90+
return self._socket_kind
91+
7892
def __repr__(self):
7993
return str(self.payloads)
8094

@@ -1061,19 +1075,31 @@ def test_batching(self):
10611075
telemetry=telemetry_metrics(metrics=2, bytes_sent=len(expected))
10621076
)
10631077

1064-
def test_flush(self):
1078+
def test_flush_dgram(self):
1079+
self._test_flush(socket.SOCK_DGRAM)
1080+
1081+
def test_flush_stream(self):
1082+
self._test_flush(socket.SOCK_STREAM)
1083+
1084+
def _test_flush(self, socket_kind):
10651085
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
1066-
fake_socket = FakeSocket()
1086+
fake_socket = FakeSocket(socket_kind=socket_kind)
10671087
dogstatsd.socket = fake_socket
10681088

10691089
dogstatsd.increment('page.views')
10701090
self.assertIsNone(fake_socket.recv(no_wait=True))
10711091
dogstatsd.flush()
10721092
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))
10731093

1074-
def test_flush_interval(self):
1094+
def test_flush_interval_dgram(self):
1095+
self._test_flush_interval(socket.SOCK_DGRAM)
1096+
1097+
def test_flush_interval_stream(self):
1098+
self._test_flush_interval(socket.SOCK_STREAM)
1099+
1100+
def _test_flush_interval(self, socket_kind):
10751101
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
1076-
fake_socket = FakeSocket()
1102+
fake_socket = FakeSocket(socket_kind=socket_kind)
10771103
dogstatsd.socket = fake_socket
10781104

10791105
dogstatsd.increment('page.views')
@@ -1088,9 +1114,15 @@ def test_flush_interval(self):
10881114
fake_socket.recv(2, no_wait=True)
10891115
)
10901116

1091-
def test_aggregation_buffering_simultaneously(self):
1117+
def test_aggregation_buffering_simultaneously_dgram(self):
1118+
self._test_aggregation_buffering_simultaneously(socket.SOCK_DGRAM)
1119+
1120+
def test_aggregation_buffering_simultaneously_stream(self):
1121+
self._test_aggregation_buffering_simultaneously(socket.SOCK_STREAM)
1122+
1123+
def _test_aggregation_buffering_simultaneously(self, socket_kind):
10921124
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, telemetry_min_flush_interval=0)
1093-
fake_socket = FakeSocket()
1125+
fake_socket = FakeSocket(socket_kind=socket_kind)
10941126
dogstatsd.socket = fake_socket
10951127
for _ in range(10):
10961128
dogstatsd.increment('test.aggregation_and_buffering')
@@ -1099,9 +1131,15 @@ def test_aggregation_buffering_simultaneously(self):
10991131
dogstatsd.flush()
11001132
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))
11011133

1102-
def test_aggregation_buffering_simultaneously_with_interval(self):
1134+
def test_aggregation_buffering_simultaneously_with_interval_dgram(self):
1135+
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_DGRAM)
1136+
1137+
def test_aggregation_buffering_simultaneously_with_interval_stream(self):
1138+
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_STREAM)
1139+
1140+
def _test_aggregation_buffering_simultaneously_with_interval(self, socket_kind):
11031141
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, flush_interval=1, telemetry_min_flush_interval=0)
1104-
fake_socket = FakeSocket()
1142+
fake_socket = FakeSocket(socket_kind=socket_kind)
11051143
dogstatsd.socket = fake_socket
11061144
for _ in range(10):
11071145
dogstatsd.increment('test.aggregation_and_buffering_with_interval')
@@ -1185,12 +1223,18 @@ def test_batching_sequential(self):
11851223
)
11861224
)
11871225

1188-
def test_batching_runtime_changes(self):
1226+
def test_batching_runtime_changes_dgram(self):
1227+
self._test_batching_runtime_changes(socket.SOCK_DGRAM)
1228+
1229+
def test_batching_runtime_changes_stream(self):
1230+
self._test_batching_runtime_changes(socket.SOCK_STREAM)
1231+
1232+
def _test_batching_runtime_changes(self, socket_kind):
11891233
dogstatsd = DogStatsd(
11901234
disable_buffering=True,
11911235
telemetry_min_flush_interval=0
11921236
)
1193-
dogstatsd.socket = FakeSocket()
1237+
dogstatsd.socket = FakeSocket(socket_kind=socket_kind)
11941238

11951239
# Send some unbuffered metrics and verify we got it immediately
11961240
last_telemetry_size = self.send_and_assert(

0 commit comments

Comments
 (0)