Skip to content

Commit d6e7d69

Browse files
jdKyle-Verhoog
andauthored
fix(profiling/pprof): provide timestamp to export() (#1312)
Currently, the Exporters have to guess what the time range is for the provided events. This is not optimal in cases where we don't have any events — even if it _should_ not happen in practice. By forcing the caller of the exporter to provide the collection time range, we can generate data that are actually more precise. Co-authored-by: Kyle Verhoog <[email protected]>
1 parent a9d70e6 commit d6e7d69

File tree

8 files changed

+38
-32
lines changed

8 files changed

+38
-32
lines changed

ddtrace/profiling/exporter/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ class Exporter(object):
1010
"""Exporter base class."""
1111

1212
@staticmethod
13-
def export(events):
13+
def export(events, start_time_ns, end_time_ns):
1414
"""Export events.
1515
1616
:param events: List of events to export.
17+
:param start_time_ns: The start time of recording.
18+
:param end_time_ns: The end time of recording.
1719
"""
1820
raise NotImplementedError
1921

@@ -23,6 +25,6 @@ class NullExporter(Exporter):
2325
"""Exporter that does nothing."""
2426

2527
@staticmethod
26-
def export(events):
28+
def export(events, start_time_ns, end_time_ns):
2729
"""Discard events."""
2830
pass

ddtrace/profiling/exporter/file.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@ class PprofFileExporter(pprof.PprofExporter):
1212
prefix = attr.ib()
1313
_increment = attr.ib(default=1, init=False, repr=False)
1414

15-
def export(self, events):
15+
def export(self, events, start_time_ns, end_time_ns):
1616
"""Export events to pprof file.
1717
1818
The file name is based on the prefix passed to init. The process ID number and type of export is then added as a
1919
suffix.
2020
2121
:param events: The event dictionary from a `ddtrace.profiling.recorder.Recorder`.
22+
:param start_time_ns: The start time of recording.
23+
:param end_time_ns: The end time of recording.
2224
"""
23-
profile = super(PprofFileExporter, self).export(events)
25+
profile = super(PprofFileExporter, self).export(events, start_time_ns, end_time_ns)
2426
with gzip.open(self.prefix + (".%d.%d" % (os.getpid(), self._increment)), "wb") as f:
2527
f.write(profile.SerializeToString())
2628
self._increment += 1

ddtrace/profiling/exporter/http.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,12 @@ def _get_tags(service):
121121
tags[key] = value
122122
return tags
123123

124-
def export(self, events):
124+
def export(self, events, start_time_ns, end_time_ns):
125125
"""Export events to an HTTP endpoint.
126126
127127
:param events: The event dictionary from a `ddtrace.profiling.recorder.Recorder`.
128+
:param start_time_ns: The start time of recording.
129+
:param end_time_ns: The end time of recording.
128130
"""
129131
if not self.endpoint:
130132
raise InvalidEndpoint("Endpoint is empty")
@@ -144,16 +146,18 @@ def export(self, events):
144146
}
145147

146148
exceptions = []
147-
profile = super(PprofHTTPExporter, self).export(events)
149+
profile = super(PprofHTTPExporter, self).export(events, start_time_ns, end_time_ns)
148150
s = six.BytesIO()
149151
with gzip.GzipFile(fileobj=s, mode="wb") as gz:
150152
gz.write(profile.SerializeToString())
151153
fields = {
152154
"runtime-id": RUNTIME_ID,
153155
"recording-start": (
154-
datetime.datetime.utcfromtimestamp(profile.time_nanos // 10e8).isoformat() + "Z"
156+
datetime.datetime.utcfromtimestamp(start_time_ns).replace(microsecond=0).isoformat() + "Z"
157+
).encode(),
158+
"recording-end": (
159+
datetime.datetime.utcfromtimestamp(end_time_ns).replace(microsecond=0).isoformat() + "Z"
155160
).encode(),
156-
"recording-end": (datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z").encode(),
157161
"runtime": PYTHON_IMPLEMENTATION,
158162
"format": b"pprof",
159163
"type": b"cpu+alloc+exceptions",

ddtrace/profiling/exporter/pprof.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,16 +263,16 @@ def max_none(a, b):
263263
return a
264264
return max(a, b)
265265

266-
def export(self, events):
266+
def export(self, events, start_time_ns, end_time_ns):
267267
"""Convert events to pprof format.
268268
269269
:param events: The event dictionary from a `ddtrace.profiling.recorder.Recorder`.
270-
:return: A dict where key is the type of profiling and value is the profile objects in protobuf format.
270+
:param start_time_ns: The start time of recording.
271+
:param end_time_ns: The end time of recording.
272+
:return: A protobuf Profile object.
271273
"""
272274
program_name = self._get_program_name()
273275

274-
start_time_ns = None
275-
stop_time_ns = None
276276
sum_period = 0
277277
nb_event = 0
278278

@@ -282,8 +282,6 @@ def export(self, events):
282282
stack_events = []
283283
for event in events.get(stack.StackSampleEvent, []):
284284
stack_events.append(event)
285-
start_time_ns = self.min_none(event.timestamp, start_time_ns)
286-
stop_time_ns = self.max_none(event.timestamp, stop_time_ns)
287285
sum_period += event.sampling_period
288286
nb_event += 1
289287

@@ -354,8 +352,6 @@ def export(self, events):
354352
traces.extend(event.snapshot.traces._traces)
355353
# Assume they are all the same
356354
traceback_limit = event.snapshot.traceback_limit
357-
start_time_ns = self.min_none(event.timestamp, start_time_ns)
358-
stop_time_ns = self.max_none(event.timestamp, stop_time_ns)
359355
# Ignore period for memory events are it's not a time-based sampling
360356

361357
if nb_events:
@@ -369,10 +365,7 @@ def export(self, events):
369365
else:
370366
period = None
371367

372-
if stop_time_ns is not None and start_time_ns is not None:
373-
duration_ns = stop_time_ns - start_time_ns
374-
else:
375-
duration_ns = None
368+
duration_ns = end_time_ns - start_time_ns
376369

377370
return converter._build_profile(
378371
start_time_ns=start_time_ns,

ddtrace/profiling/scheduler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- encoding: utf-8 -*-
22
import logging
33

4+
from ddtrace import compat
45
from ddtrace.profiling import _attr
56
from ddtrace.profiling import _periodic
67
from ddtrace.profiling import _traceback
@@ -18,6 +19,7 @@ class Scheduler(object):
1819
exporters = attr.ib()
1920
interval = attr.ib(factory=_attr.from_env("DD_PROFILING_UPLOAD_INTERVAL", 60, float))
2021
_periodic = attr.ib(init=False, default=None)
22+
_last_export = attr.ib(init=False, default=None)
2123

2224
def __enter__(self):
2325
self.start()
@@ -29,6 +31,7 @@ def start(self):
2931
self.interval, self.flush, name="%s:%s" % (__name__, self.__class__.__name__)
3032
)
3133
LOG.debug("Starting scheduler")
34+
self._last_export = compat.time_ns()
3235
self._periodic.start()
3336
LOG.debug("Scheduler started")
3437

@@ -53,10 +56,12 @@ def flush(self):
5356
"""Flush events from recorder to exporters."""
5457
LOG.debug("Flushing events")
5558
events = self.recorder.reset()
59+
start = self._last_export
60+
self._last_export = compat.time_ns()
5661
total_events = sum(len(v) for v in events.values())
5762
for exp in self.exporters:
5863
try:
59-
exp.export(events)
64+
exp.export(events, start, self._last_export)
6065
except exporter.ExportError as e:
6166
LOG.error("Unable to export %d events: %s", total_events, _traceback.format_exception(e))
6267
except Exception:

tests/profiling/exporter/test_file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
def test_export(tmp_path):
1010
filename = str(tmp_path / "pprof")
1111
exp = file.PprofFileExporter(filename)
12-
exp.export(test_pprof.TEST_EVENTS)
12+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
1313
test_main.check_pprof_file(filename + "." + str(os.getpid()) + ".1")

tests/profiling/exporter/test_http.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def do_POST(self):
6363
items[part.get_param("name", header="content-disposition")].append(part.get_payload(decode=True))
6464
for key, check in {
6565
"recording-start": lambda x: x[0] == b"1970-01-01T00:00:00Z",
66-
"recording-end": lambda x: x[0].startswith(b"20") and x[0].endswith(b"Z"),
66+
"recording-end": lambda x: b"1970-01-01T00:00:01Z",
6767
"runtime": lambda x: x[0] == platform.python_implementation().encode(),
6868
"format": lambda x: x[0] == b"pprof",
6969
"type": lambda x: x[0] == b"cpu+alloc+exceptions",
@@ -139,7 +139,7 @@ def test_wrong_api_key(endpoint_test_server):
139139
# This is mostly testing our test server, not the exporter
140140
exp = http.PprofHTTPExporter(_ENDPOINT, "this is not the right API key")
141141
with pytest.raises(http.UploadFailed) as t:
142-
exp.export(test_pprof.TEST_EVENTS)
142+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
143143
e = t.exceptions[0]
144144
assert isinstance(e, http.RequestFailed)
145145
assert e.response.status == 400
@@ -148,19 +148,19 @@ def test_wrong_api_key(endpoint_test_server):
148148

149149
def test_export(endpoint_test_server):
150150
exp = http.PprofHTTPExporter(_ENDPOINT, _API_KEY)
151-
exp.export(test_pprof.TEST_EVENTS)
151+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
152152

153153

154154
def test_export_no_endpoint(endpoint_test_server):
155155
exp = http.PprofHTTPExporter(endpoint="")
156156
with pytest.raises(http.InvalidEndpoint):
157-
exp.export(test_pprof.TEST_EVENTS)
157+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
158158

159159

160160
def test_export_server_down():
161161
exp = http.PprofHTTPExporter("http://localhost:2", _API_KEY)
162162
with pytest.raises(http.UploadFailed) as t:
163-
exp.export(test_pprof.TEST_EVENTS)
163+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
164164
e = t.exceptions[0]
165165
assert isinstance(e, (IOError, OSError))
166166
assert e.errno in (61, 99)
@@ -169,15 +169,15 @@ def test_export_server_down():
169169
def test_export_timeout(endpoint_test_timeout_server):
170170
exp = http.PprofHTTPExporter(_TIMEOUT_ENDPOINT, _API_KEY, timeout=1)
171171
with pytest.raises(http.UploadFailed) as t:
172-
exp.export(test_pprof.TEST_EVENTS)
172+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
173173
e = t.value.exceptions[0]
174174
assert isinstance(e, socket.timeout)
175175

176176

177177
def test_export_reset(endpoint_test_reset_server):
178178
exp = http.PprofHTTPExporter(_RESET_ENDPOINT, _API_KEY, timeout=1)
179179
with pytest.raises(http.UploadFailed) as t:
180-
exp.export(test_pprof.TEST_EVENTS)
180+
exp.export(test_pprof.TEST_EVENTS, 0, 1)
181181
e = t.value.exceptions[0]
182182
if six.PY3:
183183
assert isinstance(e, ConnectionResetError)

tests/profiling/exporter/test_pprof.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ def test_ppprof_exporter():
395395
exp = pprof.PprofExporter()
396396
exp._get_program_name = mock.Mock()
397397
exp._get_program_name.return_value = "bonjour"
398-
exports = exp.export(TEST_EVENTS)
398+
exports = exp.export(TEST_EVENTS, 1, 7)
399399
if tracemalloc:
400400
if stack.FEATURES["stack-exceptions"]:
401401
filename = "test-pprof-exporter_tracemalloc+stack-exceptions.txt"
@@ -410,7 +410,7 @@ def test_ppprof_exporter():
410410

411411
def test_pprof_exporter_empty():
412412
exp = pprof.PprofExporter()
413-
export = exp.export({})
413+
export = exp.export({}, 0, 1)
414414
assert len(export.sample) == 0
415415

416416

@@ -575,5 +575,5 @@ def test_ppprof_memory_exporter():
575575
unit: 8
576576
}
577577
""" == str(
578-
exp.export(events)
578+
exp.export(events, 1, 2)
579579
)

0 commit comments

Comments
 (0)