Skip to content

Commit 3e7f93d

Browse files
Add UDS SOCK_STREAM support to the DogStatsD client (#869)
* AMLII-2166 - Add UDS Streams support to the DogStatsD client Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path. * Fix bug with multiple threaded usage of stream sockets by guaranteeing the "size" and "message" packets are always sent one after another. * Carlosroman/add uds stream support (#895) * Setting correct transport type max payload size when setting the socket. * updated broken tests (#896) --------- Co-authored-by: Carlos <[email protected]>
1 parent 010d523 commit 3e7f93d

File tree

4 files changed

+281
-43
lines changed

4 files changed

+281
-43
lines changed

datadog/dogstatsd/aggregator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def flush_aggregated_sampled_metrics(self):
5555
return metrics
5656

5757
def get_context(self, name, tags):
58-
tags_str = ",".join(tags) if tags is not None else ""
59-
return "{}:{}".format(name, tags_str)
58+
tags_str = u",".join(tags) if tags is not None else ""
59+
return u"{}:{}".format(name, tags_str)
6060

6161
def count(self, name, value, tags, rate, timestamp=0):
6262
return self.add_metric(

datadog/dogstatsd/base.py

Lines changed: 95 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import os
1414
import socket
1515
import errno
16+
import struct
1617
import threading
1718
import time
1819
from threading import Lock, RLock
@@ -49,6 +50,11 @@
4950
DEFAULT_HOST = "localhost"
5051
DEFAULT_PORT = 8125
5152

53+
# Socket prefixes
54+
UNIX_ADDRESS_SCHEME = "unix://"
55+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
56+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
57+
5258
# Buffering-related values (in seconds)
5359
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
5460
MIN_FLUSH_INTERVAL = 0.0001
@@ -488,12 +494,47 @@ def socket_path(self):
488494
def socket_path(self, path):
489495
with self._socket_lock:
490496
self._socket_path = path
491-
if path is None:
492-
self._transport = "udp"
493-
self._max_payload_size = self._max_buffer_len or UDP_OPTIMAL_PAYLOAD_LENGTH
494-
else:
495-
self._transport = "uds"
496-
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
497+
498+
@property
499+
def socket(self):
500+
return self._socket
501+
502+
@socket.setter
503+
def socket(self, new_socket):
504+
self._socket = new_socket
505+
if new_socket:
506+
try:
507+
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
508+
if new_socket.family == socket.AF_UNIX:
509+
if self._socket_kind == socket.SOCK_STREAM:
510+
self._transport = "uds-stream"
511+
else:
512+
self._transport = "uds"
513+
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
514+
else:
515+
self._transport = "udp"
516+
self._max_payload_size = self._max_buffer_len or UDP_OPTIMAL_PAYLOAD_LENGTH
517+
return
518+
except AttributeError: # _socket can't have a type if it doesn't have sockopts
519+
log.info("Unexpected socket provided with no support for getsockopt")
520+
self._socket_kind = None
521+
# When the socket is None, we use the UDP optimal payload length
522+
self._max_payload_size = UDP_OPTIMAL_PAYLOAD_LENGTH
523+
524+
@property
525+
def telemetry_socket(self):
526+
return self._telemetry_socket
527+
528+
@telemetry_socket.setter
529+
def telemetry_socket(self, t_socket):
530+
self._telemetry_socket = t_socket
531+
if t_socket:
532+
try:
533+
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
534+
return
535+
except AttributeError: # _telemetry_socket can't have a kind if it doesn't have sockopts
536+
log.info("Unexpected telemetry socket provided with no support for getsockopt")
537+
self._telemetry_socket_kind = None
497538

498539
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
499540
"""
@@ -738,11 +779,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
738779

739780
@classmethod
740781
def _get_uds_socket(cls, socket_path, timeout):
741-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
742-
sock.settimeout(timeout)
743-
cls._ensure_min_send_buffer_size(sock)
744-
sock.connect(socket_path)
745-
return sock
782+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
783+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
784+
valid_socket_kinds = [socket.SOCK_DGRAM]
785+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
786+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
787+
valid_socket_kinds = [socket.SOCK_STREAM]
788+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
789+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
790+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
791+
792+
last_error = ValueError("Invalid socket path")
793+
for socket_kind in valid_socket_kinds:
794+
# py2 stores socket kinds differently than py3, determine the name independently from version
795+
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]
796+
797+
try:
798+
sock = socket.socket(socket.AF_UNIX, socket_kind)
799+
sock.settimeout(timeout)
800+
cls._ensure_min_send_buffer_size(sock)
801+
sock.connect(socket_path)
802+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
803+
return sock
804+
except Exception as e:
805+
if sock is not None:
806+
sock.close()
807+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
808+
if e.errno == errno.EPROTOTYPE:
809+
last_error = e
810+
continue
811+
raise e
812+
raise last_error
746813

747814
@classmethod
748815
def _get_udp_socket(cls, host, port, timeout):
@@ -1243,14 +1310,23 @@ def _xmit_packet_with_telemetry(self, packet):
12431310
self.packets_dropped_writer += 1
12441311

12451312
def _xmit_packet(self, packet, is_telemetry):
1313+
socket_kind = None
12461314
try:
12471315
if is_telemetry and self._dedicated_telemetry_destination():
12481316
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
1317+
socket_kind = self._telemetry_socket_kind
12491318
else:
12501319
# If set, use socket directly
12511320
mysocket = self.socket or self.get_socket()
1321+
socket_kind = self._socket_kind
12521322

1253-
mysocket.send(packet.encode(self.encoding))
1323+
encoded_packet = packet.encode(self.encoding)
1324+
if socket_kind == socket.SOCK_STREAM:
1325+
with self._socket_lock:
1326+
mysocket.sendall(struct.pack('<I', len(encoded_packet)))
1327+
mysocket.sendall(encoded_packet)
1328+
else:
1329+
mysocket.send(encoded_packet)
12541330

12551331
if not is_telemetry and self._telemetry:
12561332
self.packets_sent += 1
@@ -1283,13 +1359,19 @@ def _xmit_packet(self, packet, is_telemetry):
12831359
)
12841360
self.close_socket()
12851361
except Exception as exc:
1286-
print("Unexpected error: %s", exc)
1362+
print("Unexpected error: ", exc)
12871363
log.error("Unexpected error: %s", str(exc))
12881364

12891365
if not is_telemetry and self._telemetry:
12901366
self.bytes_dropped_writer += len(packet)
12911367
self.packets_dropped_writer += 1
12921368

1369+
# if in stream mode we need to shut down the socket; we can't recover from a
1370+
# partial send
1371+
if socket_kind == socket.SOCK_STREAM:
1372+
log.debug("Confirming socket closure after error streaming")
1373+
self.close_socket()
1374+
12931375
return False
12941376

12951377
def _send_to_buffer(self, packet):

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import struct
7+
import tempfile
38
from threading import Thread
9+
import uuid
410

511
import pytest
612

713
from datadog.dogstatsd.base import DogStatsd
814

915
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
16+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
17+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1218
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
19+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1420
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
21+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1622
statsd = DogStatsd(
1723
telemetry_min_flush_interval=0,
1824
disable_background_sender=disable_background_sender,
@@ -24,7 +30,11 @@ def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pend
2430
statsd._reset_telemetry()
2531

2632
def reader_thread():
27-
msg = bar.recv(8192)
33+
if socket_kind == socket.SOCK_DGRAM:
34+
msg = bar.recv(8192)
35+
else:
36+
size = struct.unpack("<I", bar.recv(4))[0]
37+
msg = bar.recv(size)
2838
assert msg == b"test.metric:1|c\n"
2939

3040
t = Thread(target=reader_thread, name="test_sender_mode/reader_thread")
@@ -49,6 +59,25 @@ def test_set_socket_timeout():
4959
statsd.close_socket()
5060
assert statsd.get_socket().gettimeout() == 1
5161

62+
def test_stream_cleanup():
63+
foo, _ = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
64+
65+
foo.settimeout(0)
66+
statsd = DogStatsd(disable_buffering=True)
67+
statsd.socket = foo
68+
statsd.increment("test", 1)
69+
statsd.increment("test", 1)
70+
statsd.increment("test", 1)
71+
assert statsd.socket is not None
72+
73+
foo.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) # different os's have different mins, e.g. this sets the buffer size to 2304 on certain linux variants
74+
75+
with pytest.raises(socket.error):
76+
foo.sendall(os.urandom(5000)) # pre-emptively clog the buffer
77+
78+
statsd.increment("test", 1)
79+
80+
assert statsd.socket is None
5281

5382
@pytest.mark.parametrize(
5483
"disable_background_sender, disable_buffering",
@@ -101,3 +130,41 @@ def test_buffering_with_context():
101130
bar.settimeout(5)
102131
msg = bar.recv(8192)
103132
assert msg == b"first:1|c\n"
133+
134+
@pytest.fixture()
135+
def socket_dir():
136+
tempdir = tempfile.mkdtemp()
137+
yield tempdir
138+
shutil.rmtree(tempdir)
139+
140+
@pytest.mark.parametrize(
141+
"socket_prefix, socket_kind, success",
142+
[
143+
("", socket.SOCK_DGRAM, True),
144+
("", socket.SOCK_STREAM, True),
145+
("unix://", socket.SOCK_DGRAM, True),
146+
("unix://", socket.SOCK_STREAM, True),
147+
("unixstream://", socket.SOCK_DGRAM, False),
148+
("unixstream://", socket.SOCK_STREAM, True),
149+
("unixgram://", socket.SOCK_DGRAM, True),
150+
("unixgram://", socket.SOCK_STREAM, False)
151+
]
152+
)
153+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
154+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
155+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
156+
listener_socket.bind(socket_path)
157+
158+
if socket_kind == socket.SOCK_STREAM:
159+
listener_socket.listen(1)
160+
161+
with closing(listener_socket):
162+
statsd = DogStatsd(
163+
socket_path = socket_prefix + socket_path
164+
)
165+
166+
if success:
167+
assert statsd.get_socket() is not None
168+
else:
169+
with pytest.raises(socket.error):
170+
statsd.get_socket()

0 commit comments

Comments
 (0)