@@ -10,93 +10,23 @@ use super::super::Sample;
10
10
use super :: super :: StackTraceId ;
11
11
use crate :: collections:: identifiable:: Id ;
12
12
use crate :: internal:: Timestamp ;
13
- use crate :: profiles:: SizeRestrictedBuffer ;
13
+ use crate :: profiles:: { DefaultObservationCodec as DefaultCodec , ObservationCodec } ;
14
14
use byteorder:: { NativeEndian , ReadBytesExt } ;
15
- use std:: io:: { self , Cursor , Read , Write } ;
16
-
17
- // todo: document
18
- #[ derive( Clone , Copy , Debug , Default ) ]
19
- pub enum EncodingType {
20
- #[ default]
21
- None ,
22
- Zstd ,
23
- }
15
+ use std:: io:: { self , Write } ;
24
16
25
- enum ObservationEncoder {
26
- Noop ( SizeRestrictedBuffer ) ,
27
- Zstd ( zstd:: Encoder < ' static , SizeRestrictedBuffer > ) ,
28
- }
17
+ pub type TimestampedObservations = TimestampedObservationsImpl < DefaultCodec > ;
29
18
30
- pub struct TimestampedObservations {
31
- compressed_timestamped_data : ObservationEncoder ,
19
+ pub struct TimestampedObservationsImpl < C : ObservationCodec > {
20
+ compressed_timestamped_data : C :: Encoder ,
32
21
sample_types_len : usize ,
33
22
}
34
23
35
- enum ObservationDecoder {
36
- Noop ( Cursor < SizeRestrictedBuffer > ) ,
37
- Zstd ( zstd:: Decoder < ' static , Cursor < SizeRestrictedBuffer > > ) ,
38
- }
39
-
40
- pub struct TimestampedObservationsIter {
41
- decoder : ObservationDecoder ,
24
+ pub struct TimestampedObservationsIterImpl < C : ObservationCodec > {
25
+ decoder : C :: Decoder ,
42
26
sample_types_len : usize ,
43
27
}
44
28
45
- impl ObservationEncoder {
46
- pub fn try_new (
47
- encoding_type : EncodingType ,
48
- size_hint : usize ,
49
- max_capacity : usize ,
50
- ) -> io:: Result < Self > {
51
- let output_buffer = SizeRestrictedBuffer :: try_new ( size_hint, max_capacity) ?;
52
- match encoding_type {
53
- EncodingType :: None => Ok ( ObservationEncoder :: Noop ( output_buffer) ) ,
54
- EncodingType :: Zstd => Ok ( ObservationEncoder :: Zstd ( zstd:: Encoder :: new (
55
- output_buffer,
56
- 1 ,
57
- ) ?) ) ,
58
- }
59
- }
60
-
61
- pub fn try_into_decoder ( self ) -> io:: Result < ObservationDecoder > {
62
- match self {
63
- ObservationEncoder :: Noop ( buffer) => Ok ( ObservationDecoder :: Noop ( Cursor :: new ( buffer) ) ) ,
64
- ObservationEncoder :: Zstd ( encoder) => match encoder. try_finish ( ) {
65
- Ok ( buffer) => Ok ( ObservationDecoder :: Zstd ( zstd:: Decoder :: with_buffer (
66
- Cursor :: new ( buffer) ,
67
- ) ?) ) ,
68
- Err ( ( _encoder, error) ) => Err ( error) ,
69
- } ,
70
- }
71
- }
72
- }
73
-
74
- impl Write for ObservationEncoder {
75
- fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
76
- match self {
77
- ObservationEncoder :: Noop ( encoder) => encoder. write ( buf) ,
78
- ObservationEncoder :: Zstd ( encoder) => encoder. write ( buf) ,
79
- }
80
- }
81
-
82
- fn flush ( & mut self ) -> io:: Result < ( ) > {
83
- match self {
84
- ObservationEncoder :: Noop ( encoder) => encoder. flush ( ) ,
85
- ObservationEncoder :: Zstd ( encoder) => encoder. flush ( ) ,
86
- }
87
- }
88
- }
89
-
90
- impl Read for ObservationDecoder {
91
- fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
92
- match self {
93
- ObservationDecoder :: Noop ( decoder) => decoder. read ( buf) ,
94
- ObservationDecoder :: Zstd ( decoder) => decoder. read ( buf) ,
95
- }
96
- }
97
- }
98
-
99
- impl TimestampedObservations {
29
+ impl < C : ObservationCodec > TimestampedObservationsImpl < C > {
100
30
// As documented in the internal Datadog doc "Ruby timeline memory fragmentation impact
101
31
// investigation", allowing the timeline storage vec to slowly expand creates A LOT of
102
32
// memory fragmentation for apps that employ multiple threads.
@@ -108,26 +38,16 @@ impl TimestampedObservations {
108
38
// the profile as a whole would defintely exceed this.
109
39
const MAX_CAPACITY : usize = i32:: MAX as usize ;
110
40
111
- pub fn try_new ( encoding_type : EncodingType , sample_types_len : usize ) -> io:: Result < Self > {
41
+ pub fn try_new ( sample_types_len : usize ) -> io:: Result < Self > {
112
42
Ok ( Self {
113
- compressed_timestamped_data : ObservationEncoder :: try_new (
114
- encoding_type,
43
+ compressed_timestamped_data : C :: new_encoder (
115
44
Self :: DEFAULT_BUFFER_SIZE ,
116
45
Self :: MAX_CAPACITY ,
117
46
) ?,
118
47
sample_types_len,
119
48
} )
120
49
}
121
50
122
- pub const fn with_no_backing_store ( ) -> Self {
123
- Self {
124
- compressed_timestamped_data : ObservationEncoder :: Noop (
125
- SizeRestrictedBuffer :: zero_capacity ( ) ,
126
- ) ,
127
- sample_types_len : 0 ,
128
- }
129
- }
130
-
131
51
pub fn add ( & mut self , sample : Sample , ts : Timestamp , values : & [ i64 ] ) -> anyhow:: Result < ( ) > {
132
52
// We explicitly turn the data into a stream of bytes, feeding it to the compressor.
133
53
// @ivoanjo: I played with introducing a structure to serialize it all-at-once, but it seems
@@ -153,19 +73,17 @@ impl TimestampedObservations {
153
73
Ok ( ( ) )
154
74
}
155
75
156
- pub fn into_iter ( self ) -> TimestampedObservationsIter {
76
+ pub fn into_iter ( self ) -> TimestampedObservationsIterImpl < C > {
157
77
#[ allow( clippy:: expect_used) ]
158
- TimestampedObservationsIter {
159
- decoder : self
160
- . compressed_timestamped_data
161
- . try_into_decoder ( )
78
+ TimestampedObservationsIterImpl {
79
+ decoder : C :: encoder_into_decoder ( self . compressed_timestamped_data )
162
80
. expect ( "failed to initialize timestamped observation decoder" ) ,
163
81
sample_types_len : self . sample_types_len ,
164
82
}
165
83
}
166
84
}
167
85
168
- impl Iterator for TimestampedObservationsIter {
86
+ impl < C : ObservationCodec > Iterator for TimestampedObservationsIterImpl < C > {
169
87
type Item = ( Sample , Timestamp , Vec < i64 > ) ;
170
88
171
89
fn next ( & mut self ) -> Option < Self :: Item > {
0 commit comments