Skip to content

Commit ac14089

Browse files
authored
feat: switch hash to faster (#9)
Did some profiling of the profiler via Apple Instruments, our hashing takes more than 10% of the sampling time for large stack traces, so I switched us to collecting the parts in order! Heres @jmgrosen explanation > Sorry if I'm drawing this out longer than is necessary! But I'm worried about this global atomic ending up a bottleneck at high domain counts. > > After looking into the runtime events system a bit more, I don't think we even need a counter--each part just needs its bytes and two booleans `is_start` and `is_end`. Each domain gets its own ring buffer, so we don't have to worry about parts of different points being interleaved. The only things we need to do are > 1\. reassemble in-order parts of the same point > 2\. detect when a new point starts > 3\. detect when there has been a ring buffer overflow > > Assume for now there is only one domain/ring buffer. Then to implement `event_of_perf_event` we, starting at an `is_start` part, collect bytes in our buffer, until we hit an `is_end` part and return the unmarshalled form. This handles (1) and (2). To handle (3), we can use the `lost_events` callback from the runtime events system to put `event_of_perf_event` in a "throw parts away until the next `is_start` part" state. (The documentation is a little unclear, but based on the implementation, `lost_events` will _always_ be called when the ring buffer has overflowed.) > > Then for handling multiple ring buffers, we just store a different event buffer per-ring buffer, keyed on the `int` ring buffer ID that each of the runtime events callbacks receives. ## Test plan compare runs of the example program with Apple Instruments, notice the hash function no longer shows up as major cpu usage. Additionally, to test the logic of the new parts, I modified the example program to generate more events than necessary in order to overwrite the ring buffer leading to lost events, and ensured we did not segfault or anything.
2 parents 663b3dd + d7e138c commit ac14089

File tree

6 files changed

+97
-91
lines changed

6 files changed

+97
-91
lines changed

dune-project

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
(package
1818
(name pyro-caml-instruments)
1919
(synopsis "Pyroscope + OCaml = Pyro Caml")
20-
(depends ocaml logs digestif ppx_deriving)
20+
(depends ocaml logs ppx_deriving)
2121
(tags ("profiling")))
2222

2323
(package

lib/Event.ml

Lines changed: 75 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@
1616
(*****************************************************************************)
1717
(* Event *)
1818
(*****************************************************************************)
19-
(* md5 feels fast and safe enough *)
20-
module Hash = Digest.MD5
2119

22-
(* an event is either a point or partial. We have a notion of a partial event
23-
since runtime events only support payloads of max 1024 bytes, and any larger
24-
will raise an exception. Since some callstacks can be LARGE!! we break up the
20+
(* runtime events only support payloads of max 1024 bytes, and any larger will
21+
raise an exception. Since some callstacks can be LARGE!! we break up the
2522
callstack into a multipart message that is then reassembled by the
26-
profiler *)
27-
type t =
28-
| Point of (float * Stack_trace.raw_stack_trace)
29-
| Partial of { id : Hash.t; bytes : Bytes.t; part : int; part_count : int }
23+
profiler. *)
24+
type t = { bytes : Bytes.t; part : int; part_count : int }
3025

26+
(* The actual underlying data we're transmitting, a stack trace with a
27+
timestamp *)
28+
type point = float * Stack_trace.raw_stack_trace
3129
type marshaled = bytes * int
3230

33-
let make_partial id part_count part bytes =
34-
Partial { id; bytes; part; part_count }
35-
3631
let split_bytes bytes size =
3732
let rec aux offset parts =
3833
if offset >= Bytes.length bytes then List.rev parts
@@ -44,27 +39,23 @@ let split_bytes bytes size =
4439
aux 0 []
4540

4641
let marshal e =
47-
let marshaled_event = Marshal.to_bytes e [] in
48-
let len = Bytes.length marshaled_event in
49-
(marshaled_event, len)
42+
let marshaled_obj = Marshal.to_bytes e [] in
43+
let len = Bytes.length marshaled_obj in
44+
(marshaled_obj, len)
5045

51-
(* 800 is chosen since when we break the event up into a partial event, we need
46+
(* 900 is chosen since when we break the event up into a partial event, we need
5247
to save some room for the other parts of the Partial data structure besides
5348
the bytes*)
5449
(* TODO be more clever about max size *)
55-
let marshal_event ?(max_size = 800) e =
56-
let marshaled_event, len = marshal e in
57-
(* Max size of runtime event type payload *)
50+
let marshal_point ?(max_size = 900) (p : point) =
51+
let marshaled_point, _len = marshal p in
52+
(* Max size of runtime event type payload is 1024, but we want to stay
53+
slightly under that so we can store metadata about which part this is *)
5854
(* https://ocaml.org/manual/5.3/api/Runtime_events.Type.html *)
59-
if len <= 1024 then [ (marshaled_event, len) ]
60-
else
61-
(* if it's bigger split it up! *)
62-
let id = Hash.bytes marshaled_event in
63-
let parts = split_bytes marshaled_event max_size in
64-
let mk_part part part_bytes =
65-
make_partial id (List.length parts) part part_bytes
66-
in
67-
parts |> List.mapi (fun i part_bytes -> marshal (mk_part i part_bytes))
55+
let parts = split_bytes marshaled_point max_size in
56+
let part_count = List.length parts in
57+
let mk_part part part_bytes = { bytes = part_bytes; part; part_count } in
58+
parts |> List.mapi (fun i part_bytes -> marshal (mk_part i part_bytes))
6859

6960
(*****************************************************************************)
7061
(* Perf event *)
@@ -83,49 +74,70 @@ let perf_event_type =
8374
let perf_event =
8475
Runtime_events.User.register "Perf_event" Perf_event_tag perf_event_type
8576

86-
let emit_event e =
87-
let marshaled_events = marshal_event e in
77+
let emit_point (p : point) =
78+
let marshaled_events = marshal_point p in
8879
List.iter
8980
(fun marshaled -> Runtime_events.User.write perf_event marshaled)
9081
marshaled_events
9182
[@@inline always]
9283

93-
(* buffer for storing partial events so we can then rebuild them *)
94-
type event_buffer = (Hash.t, (int * Bytes.t) list) Hashtbl.t
84+
(* buffer for storing partial points so we can then rebuild them *)
85+
(* of type (ring_id, point parts) *)
86+
type point_buffer = (int, (int * Bytes.t) list) Hashtbl.t
87+
88+
(** [event_of_perf_event ring_buffer_index buffer event] collects marshaled
89+
events, and re-assembles them into points. Since the points are split into
90+
parts, we return [None] if there was not enough parts to reconstruct a
91+
point, or [Some point] if there were.
9592
96-
(* event_of_perf event will keep track of partial events and spit them out as
97-
normal events if it finds all the parts *)
98-
(* TODO this would probably make more sense if it spit out Point option or
99-
similar? *)
100-
let event_of_perf_event buffer (marshaled, _) : t =
93+
The runtime events file we read in has a unique ring buffer for each domain
94+
([ring_buffer_index]). Events are written to this buffer in order. This means
95+
we can assume that the parts will come be read in order (e.g. ring 1 part 1,
96+
ring 1 part 2 ...). We collect these parts to form the final point. If we
97+
receive an out of order part, the last part in a point, or lose runtime events
98+
in a ring buffer, we reset our point buffer *)
99+
let process_perf_event ring_buffer_index buffer (marshaled, _) : point option =
101100
let event = Marshal.from_bytes marshaled 0 in
101+
let ring_parts =
102+
match Hashtbl.find_opt buffer ring_buffer_index with
103+
| Some parts -> parts
104+
| None -> []
105+
in
102106
match event with
103-
| Partial { id; bytes; part_count; part } ->
104-
let parts =
105-
match Hashtbl.find_opt buffer id with
106-
| Some parts -> (part, bytes) :: parts
107-
| None -> [ (part, bytes) ]
108-
in
109-
let parts =
110-
List.sort_uniq (fun (id1, _) (id2, _) -> Int.compare id1 id2) parts
111-
in
112-
if List.length parts = part_count then (
107+
(* Don't store in buffer if we can immediately unmarshal *)
108+
| { bytes; part_count; _ } when part_count = 1 ->
109+
(* Also clear out the buffer just in case *)
110+
Hashtbl.remove buffer ring_buffer_index;
111+
Some (Marshal.from_bytes bytes 0)
112+
(* If we don't have any parts, and receive something besides the start part,
113+
just wait for the next start part*)
114+
| { part; _ } when List.length ring_parts = 0 && part != 0 -> None
115+
(* If we already have some parts, or this is the start part, begin collecting parts *)
116+
| { bytes; part_count; _ } ->
117+
let parts = bytes :: ring_parts in
118+
let parts_len = List.length parts in
119+
(* If we have enough then unmarshal! *)
120+
(* TODO: We probably can just make the array all at once since we know the
121+
size in theory? *)
122+
if parts_len = part_count then (
113123
let full_bytes =
114-
parts |> List.map snd
115-
|> List.fold_left
116-
(fun acc bytes ->
117-
let new_acc =
118-
Bytes.create (Bytes.length acc + Bytes.length bytes)
119-
in
120-
Bytes.blit acc 0 new_acc 0 (Bytes.length acc);
121-
Bytes.blit bytes 0 new_acc (Bytes.length acc)
122-
(Bytes.length bytes);
123-
new_acc)
124-
(Bytes.create 0)
124+
List.fold_right
125+
(fun acc bytes ->
126+
let new_acc =
127+
Bytes.create (Bytes.length acc + Bytes.length bytes)
128+
in
129+
Bytes.blit acc 0 new_acc 0 (Bytes.length acc);
130+
Bytes.blit bytes 0 new_acc (Bytes.length acc) (Bytes.length bytes);
131+
new_acc)
132+
parts (Bytes.create 0)
125133
in
126-
Hashtbl.remove buffer id;
127-
Marshal.from_bytes full_bytes 0)
134+
Hashtbl.remove buffer ring_buffer_index;
135+
Some (Marshal.from_bytes full_bytes 0))
136+
else if parts_len < part_count then (
137+
(* Weird state, clear buffer *)
138+
Hashtbl.remove buffer ring_buffer_index;
139+
None)
128140
else (
129-
Hashtbl.replace buffer id parts;
130-
event)
131-
| _ -> event
141+
(* If not then update the buffer *)
142+
Hashtbl.replace buffer ring_buffer_index parts;
143+
None)

lib/Pyro_caml_instruments.ml

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515

1616
open Event
1717

18-
let src = Logs.Src.create "pyro_caml" ~doc:"Pyro Caml"
19-
20-
module Log = (val Logs.src_log src)
21-
2218
(*****************************************************************************)
2319
(* Instrument side code *)
2420
(*****************************************************************************)
@@ -35,8 +31,8 @@ let emit_point_event raw_backtrace =
3531
that doesn't play well when linked into a rust program. Monotomic time
3632
would be nice so if the user/system changes the time of day we aren't
3733
screwed up, but for now we can assume that probably won't happen much*)
38-
let event = Point (Unix.gettimeofday (), raw_stack_trace) in
39-
emit_event event
34+
let point = (Unix.gettimeofday (), raw_stack_trace) in
35+
emit_point point
4036
[@@inline always]
4137

4238
let tracker : (unit, unit) Gc.Memprof.tracker =
@@ -77,28 +73,33 @@ let maybe_with_memprof_sampler ?sampling_rate f =
7773
(* Profiler code *)
7874
(*****************************************************************************)
7975
let create_cursor path pid = Runtime_events.create_cursor (Some (path, pid))
80-
let empty_callbacks = Runtime_events.Callbacks.create ()
8176

8277
(* Minimize work we do in process event since the instrumented program can write
8378
events quickly and so we need to keep pace while polling if we can *)
84-
let process_event now interval sample_points = function
85-
| Point (time, raw_st) ->
79+
let process_point now interval sample_points = function
80+
| Some (time, raw_st) ->
8681
if now -. time < interval then
8782
sample_points := (time, raw_st) :: !sample_points
88-
| Partial _ -> ()
83+
| None -> ()
8984

90-
let read_poll ?(max_events = None) ?(callbacks = empty_callbacks) cursor
91-
interval =
92-
let event_buffer = Hashtbl.create 1000 in
85+
let read_poll ?(max_events = None) cursor interval =
86+
let point_buffer = Hashtbl.create 1000 in
9387
let now = Unix.gettimeofday () in
9488
let sample_points = ref [] in
89+
let callbacks =
90+
Runtime_events.Callbacks.create
91+
~lost_events:(fun (ring_buffer_index : int) (_num_lost : int) ->
92+
(* If we've lost events clear that ring buffer's event buffer *)
93+
Hashtbl.remove point_buffer ring_buffer_index)
94+
()
95+
in
9596
let callbacks =
9697
Runtime_events.Callbacks.add_user_event perf_event_type
97-
(fun (_ring_buffer_index : int) (_ts : Runtime_events.Timestamp.t)
98-
_event_t (e : marshaled) ->
98+
(fun (ring_buffer_index : int) (_ts : Runtime_events.Timestamp.t) _event_t
99+
(e : marshaled) ->
99100
e
100-
|> event_of_perf_event event_buffer
101-
|> process_event now interval sample_points)
101+
|> process_perf_event ring_buffer_index point_buffer
102+
|> process_point now interval sample_points)
102103
callbacks
103104
in
104105
(* TODO? Multithread this? *)

lib/Pyro_caml_instruments.mli

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,10 @@ val create_cursor : string -> int -> Runtime_events.cursor
2222
from the given [path] and [pid]. *)
2323

2424
val read_poll :
25-
?max_events:int option ->
26-
?callbacks:Runtime_events.Callbacks.t ->
27-
Runtime_events.cursor ->
28-
float ->
29-
Stack_trace.t list
25+
?max_events:int option -> Runtime_events.cursor -> float -> Stack_trace.t list
3026
(** [read_poll cursor sample_interval] will read the profiling runtime events
3127
from the given cursor, and will give attempt to give a single
3228
{!Stack_trace.t} per every unique thread id, within [sample_interval] of the
3329
start time of this function call. If a sample is not within this interval,
3430
it will not be considered. Additionally it also ensures the samples chosen
35-
are those closest to the start of the call of this function. Existing
36-
callbacks can also be passed via [?callbacks], and those will be included
37-
when reading the cursor *)
31+
are those closest to the start of the call of this function. *)

lib/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name pyro_caml_instruments)
33
(public_name pyro-caml-instruments)
4-
(libraries runtime_events unix logs digestif)
4+
(libraries runtime_events unix logs)
55
(preprocess
66
(pps ppx_deriving.eq))
77
(instrumentation.backend

pyro-caml-instruments.opam

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ depends: [
1111
"dune" {>= "3.20"}
1212
"ocaml"
1313
"logs"
14-
"digestif"
1514
"ppx_deriving"
1615
"odoc" {with-doc}
1716
]

0 commit comments

Comments
 (0)