3
3
4
4
use criterion:: { criterion_group, criterion_main, Criterion } ;
5
5
6
- use event_manager:: utilities:: subscribers:: {
7
- CounterInnerMutSubscriber , CounterSubscriber , CounterSubscriberWithData ,
8
- } ;
9
- use event_manager:: { EventManager , EventSubscriber , MutEventSubscriber , SubscriberOps } ;
6
+ use event_manager:: { BufferedEventManager , EventManager } ;
7
+ use std:: os:: fd:: AsFd ;
8
+ use std:: os:: fd:: FromRawFd ;
9
+ use std:: os:: fd:: OwnedFd ;
10
+ use std:: sync:: atomic:: AtomicU64 ;
11
+ use std:: sync:: atomic:: Ordering ;
10
12
use std:: sync:: { Arc , Mutex } ;
13
+ use vmm_sys_util:: epoll:: EventSet ;
11
14
12
15
// Test the performance of event manager when it manages a single subscriber type.
13
16
// The performance is assessed under stress, all added subscribers have active events.
14
17
fn run_basic_subscriber ( c : & mut Criterion ) {
15
- let no_of_subscribers = 200 ;
18
+ let no_of_subscribers = 200i32 ;
16
19
17
- let mut event_manager = EventManager :: < CounterSubscriber > :: new ( ) . unwrap ( ) ;
18
- for _ in 0 ..no_of_subscribers {
19
- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
20
- counter_subscriber. trigger_event ( ) ;
21
- event_manager. add_subscriber ( counter_subscriber) ;
22
- }
20
+ let mut event_manager =
21
+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
22
+
23
+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
24
+ // Create an eventfd that is initialized with 1 waiting event.
25
+ let event_fd = unsafe {
26
+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
27
+ assert_ne ! ( raw_fd, -1 ) ;
28
+ OwnedFd :: from_raw_fd ( raw_fd)
29
+ } ;
30
+
31
+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
32
+ match event_set {
33
+ EventSet :: IN => ( ) ,
34
+ EventSet :: ERROR => {
35
+ eprintln ! ( "Got error on the monitored event." ) ;
36
+ } ,
37
+ EventSet :: HANG_UP => {
38
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
39
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
40
+ } ,
41
+ _ => {
42
+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
43
+ }
44
+ }
45
+ } ) ) . unwrap ( ) ;
46
+
47
+ event_fd
48
+ } ) . collect :: < Vec < _ > > ( ) ;
23
49
24
50
c. bench_function ( "process_basic" , |b| {
25
51
b. iter ( || {
26
- let ev_count = event_manager. run ( ) . unwrap ( ) ;
27
- assert_eq ! ( ev_count, no_of_subscribers)
52
+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
28
53
} )
29
54
} ) ;
55
+
56
+ drop ( subscribers) ;
30
57
}
31
58
32
59
// Test the performance of event manager when the subscribers are wrapped in an Arc<Mutex>.
33
60
// The performance is assessed under stress, all added subscribers have active events.
34
61
fn run_arc_mutex_subscriber ( c : & mut Criterion ) {
35
- let no_of_subscribers = 200 ;
62
+ let no_of_subscribers = 200i32 ;
63
+
64
+ let mut event_manager =
65
+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
66
+
67
+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
68
+ // Create an eventfd that is initialized with 1 waiting event.
69
+ let event_fd = unsafe {
70
+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
71
+ assert_ne ! ( raw_fd, -1 ) ;
72
+ OwnedFd :: from_raw_fd ( raw_fd)
73
+ } ;
74
+ let counter = Arc :: new ( Mutex :: new ( 0u64 ) ) ;
75
+ let counter_clone = counter. clone ( ) ;
76
+
77
+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
78
+ match event_set {
79
+ EventSet :: IN => {
80
+ * counter_clone. lock ( ) . unwrap ( ) += 1 ;
81
+ } ,
82
+ EventSet :: ERROR => {
83
+ eprintln ! ( "Got error on the monitored event." ) ;
84
+ } ,
85
+ EventSet :: HANG_UP => {
86
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
87
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
88
+ } ,
89
+ _ => {
90
+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
91
+ }
92
+ }
93
+ } ) ) . unwrap ( ) ;
36
94
37
- let mut event_manager = EventManager :: < Arc < Mutex < CounterSubscriber > > > :: new ( ) . unwrap ( ) ;
38
- for _ in 0 ..no_of_subscribers {
39
- let counter_subscriber = Arc :: new ( Mutex :: new ( CounterSubscriber :: default ( ) ) ) ;
40
- counter_subscriber. lock ( ) . unwrap ( ) . trigger_event ( ) ;
41
- event_manager. add_subscriber ( counter_subscriber) ;
42
- }
95
+ ( event_fd, counter)
96
+ } ) . collect :: < Vec < _ > > ( ) ;
43
97
44
98
c. bench_function ( "process_with_arc_mutex" , |b| {
45
99
b. iter ( || {
46
- let ev_count = event_manager. run ( ) . unwrap ( ) ;
47
- assert_eq ! ( ev_count, no_of_subscribers)
100
+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
48
101
} )
49
102
} ) ;
103
+
104
+ drop ( subscribers) ;
50
105
}
51
106
52
107
// Test the performance of event manager when the subscribers are wrapped in an Arc, and they
53
108
// leverage inner mutability to update their internal state.
54
109
// The performance is assessed under stress, all added subscribers have active events.
55
110
fn run_subscriber_with_inner_mut ( c : & mut Criterion ) {
56
- let no_of_subscribers = 200 ;
111
+ let no_of_subscribers = 200i32 ;
112
+
113
+ let mut event_manager =
114
+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
115
+
116
+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
117
+ // Create an eventfd that is initialized with 1 waiting event.
118
+ let event_fd = unsafe {
119
+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
120
+ assert_ne ! ( raw_fd, -1 ) ;
121
+ OwnedFd :: from_raw_fd ( raw_fd)
122
+ } ;
123
+ let counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
124
+ let counter_clone = counter. clone ( ) ;
125
+
126
+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
127
+ match event_set {
128
+ EventSet :: IN => {
129
+ counter_clone. fetch_add ( 1 , Ordering :: SeqCst ) ;
130
+ } ,
131
+ EventSet :: ERROR => {
132
+ eprintln ! ( "Got error on the monitored event." ) ;
133
+ } ,
134
+ EventSet :: HANG_UP => {
135
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
136
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
137
+ } ,
138
+ _ => {
139
+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
140
+ }
141
+ }
142
+ } ) ) . unwrap ( ) ;
57
143
58
- let mut event_manager = EventManager :: < Arc < dyn EventSubscriber + Send + Sync > > :: new ( ) . unwrap ( ) ;
59
- for _ in 0 ..no_of_subscribers {
60
- let counter_subscriber = CounterInnerMutSubscriber :: default ( ) ;
61
- counter_subscriber. trigger_event ( ) ;
62
- event_manager. add_subscriber ( Arc :: new ( counter_subscriber) ) ;
63
- }
144
+ ( event_fd, counter)
145
+ } ) . collect :: < Vec < _ > > ( ) ;
64
146
65
147
c. bench_function ( "process_with_inner_mut" , |b| {
66
148
b. iter ( || {
67
- let ev_count = event_manager. run ( ) . unwrap ( ) ;
68
- assert_eq ! ( ev_count, no_of_subscribers)
149
+ assert_eq ! ( event_manager. wait( Some ( 0 ) ) , Ok ( no_of_subscribers) ) ;
69
150
} )
70
151
} ) ;
152
+
153
+ drop ( subscribers) ;
71
154
}
72
155
73
156
// Test the performance of event manager when it manages subscribers of different types, that are
@@ -76,63 +159,151 @@ fn run_subscriber_with_inner_mut(c: &mut Criterion) {
76
159
// The performance is assessed under stress, all added subscribers have active events, and the
77
160
// CounterSubscriberWithData subscribers have multiple active events.
78
161
fn run_multiple_subscriber_types ( c : & mut Criterion ) {
79
- let no_of_subscribers = 100 ;
162
+ let no_of_subscribers = 100i32 ;
163
+
164
+ let total = no_of_subscribers + ( no_of_subscribers * i32:: try_from ( EVENTS ) . unwrap ( ) ) ;
165
+
166
+ let mut event_manager =
167
+ BufferedEventManager :: with_capacity ( false , usize:: try_from ( total) . unwrap ( ) ) . unwrap ( ) ;
80
168
81
- let mut event_manager = EventManager :: < Arc < Mutex < dyn MutEventSubscriber > > > :: new ( )
82
- . expect ( "Cannot create event manager." ) ;
169
+ let subscribers = ( 0 ..no_of_subscribers) . map ( |_| {
170
+ // Create an eventfd that is initialized with 1 waiting event.
171
+ let event_fd = unsafe {
172
+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
173
+ assert_ne ! ( raw_fd, -1 ) ;
174
+ OwnedFd :: from_raw_fd ( raw_fd)
175
+ } ;
176
+ let counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
177
+ let counter_clone = counter. clone ( ) ;
83
178
84
- for i in 0 ..no_of_subscribers {
85
- // The `CounterSubscriberWithData` expects to receive a number as a parameter that
86
- // represents the number it can use as its inner Events data.
87
- let mut data_subscriber = CounterSubscriberWithData :: new ( i * no_of_subscribers) ;
88
- data_subscriber. trigger_all_counters ( ) ;
89
- event_manager. add_subscriber ( Arc :: new ( Mutex :: new ( data_subscriber) ) ) ;
179
+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
180
+ match event_set {
181
+ EventSet :: IN => {
182
+ counter_clone. fetch_add ( 1 , Ordering :: SeqCst ) ;
183
+ } ,
184
+ EventSet :: ERROR => {
185
+ eprintln ! ( "Got error on the monitored event." ) ;
186
+ } ,
187
+ EventSet :: HANG_UP => {
188
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
189
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
190
+ } ,
191
+ _ => {
192
+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
193
+ }
194
+ }
195
+ } ) ) . unwrap ( ) ;
90
196
91
- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
92
- counter_subscriber. trigger_event ( ) ;
93
- event_manager. add_subscriber ( Arc :: new ( Mutex :: new ( counter_subscriber) ) ) ;
94
- }
197
+ ( event_fd, counter)
198
+ } ) . collect :: < Vec < _ > > ( ) ;
199
+
200
+ const EVENTS : usize = 3 ;
201
+
202
+ let subscribers_with_data = ( 0 ..no_of_subscribers)
203
+ . map ( |_| {
204
+ let data = Arc :: new ( [ AtomicU64 :: new ( 0 ) , AtomicU64 :: new ( 0 ) , AtomicU64 :: new ( 0 ) ] ) ;
205
+ assert_eq ! ( data. len( ) , EVENTS ) ;
206
+
207
+ // Create eventfd's that are initialized with 1 waiting event.
208
+ let inner_subscribers = ( 0 ..EVENTS )
209
+ . map ( |_| unsafe {
210
+ let raw_fd = libc:: eventfd ( 1 , 0 ) ;
211
+ assert_ne ! ( raw_fd, -1 ) ;
212
+ OwnedFd :: from_raw_fd ( raw_fd)
213
+ } )
214
+ . collect :: < Vec < _ > > ( ) ;
215
+
216
+ for i in 0 ..EVENTS {
217
+ let data_clone = data. clone ( ) ;
218
+
219
+ event_manager
220
+ . add (
221
+ inner_subscribers[ i] . as_fd ( ) ,
222
+ EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP ,
223
+ Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
224
+ match event_set {
225
+ EventSet :: IN => {
226
+ data_clone[ i] . fetch_add ( 1 , Ordering :: SeqCst ) ;
227
+ }
228
+ EventSet :: ERROR => {
229
+ eprintln ! ( "Got error on the monitored event." ) ;
230
+ }
231
+ EventSet :: HANG_UP => {
232
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
233
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
234
+ }
235
+ _ => { }
236
+ }
237
+ } ) ,
238
+ )
239
+ . unwrap ( ) ;
240
+ }
241
+
242
+ ( inner_subscribers, data)
243
+ } )
244
+ . collect :: < Vec < _ > > ( ) ;
95
245
96
246
c. bench_function ( "process_dynamic_dispatch" , |b| {
97
247
b. iter ( || {
98
- let _ = event_manager. run ( ) . unwrap ( ) ;
248
+ assert_eq ! ( event_manager. wait ( Some ( 0 ) ) , Ok ( total ) ) ;
99
249
} )
100
250
} ) ;
251
+
252
+ drop ( subscribers) ;
253
+ drop ( subscribers_with_data) ;
101
254
}
102
255
103
256
// Test the performance of event manager when it manages a single subscriber type.
104
257
// Just a few of the events are active in this test scenario.
105
258
fn run_with_few_active_events ( c : & mut Criterion ) {
106
- let no_of_subscribers = 200 ;
259
+ let no_of_subscribers = 200i32 ;
260
+ let active = 1 + no_of_subscribers / 23 ;
261
+
262
+ let mut event_manager =
263
+ BufferedEventManager :: with_capacity ( false , no_of_subscribers as usize ) . unwrap ( ) ;
107
264
108
- let mut event_manager = EventManager :: < CounterSubscriber > :: new ( ) . unwrap ( ) ;
265
+ let subscribers = ( 0 ..no_of_subscribers) . map ( |i| {
266
+ // Create an eventfd that is initialized with 1 waiting event.
267
+ let event_fd = unsafe {
268
+ let raw_fd = libc:: eventfd ( ( i % 23 == 0 ) as u8 as u32 , 0 ) ;
269
+ assert_ne ! ( raw_fd, -1 ) ;
270
+ OwnedFd :: from_raw_fd ( raw_fd)
271
+ } ;
109
272
110
- for i in 0 ..no_of_subscribers {
111
- let mut counter_subscriber = CounterSubscriber :: default ( ) ;
112
- // Let's activate the events for a few subscribers (i.e. only the ones that are
113
- // divisible by 23). 23 is a random number that I just happen to like.
114
- if i % 23 == 0 {
115
- counter_subscriber. trigger_event ( ) ;
116
- }
117
- event_manager. add_subscriber ( counter_subscriber) ;
118
- }
273
+ event_manager. add ( event_fd. as_fd ( ) , EventSet :: IN | EventSet :: ERROR | EventSet :: HANG_UP , Box :: new ( move |_: & mut EventManager , event_set : EventSet | {
274
+ match event_set {
275
+ EventSet :: IN => ( ) ,
276
+ EventSet :: ERROR => {
277
+ eprintln ! ( "Got error on the monitored event." ) ;
278
+ } ,
279
+ EventSet :: HANG_UP => {
280
+ // TODO Do this: https://github.com/rust-vmm/event-manager/blob/main/src/utilities/subscribers.rs#L116:L118
281
+ panic ! ( "Cannot continue execution. Associated fd was closed." ) ;
282
+ } ,
283
+ _ => {
284
+ eprintln ! ( "Received spurious event from the event manager {event_set:#?}." ) ;
285
+ }
286
+ }
287
+ } ) ) . unwrap ( ) ;
288
+
289
+ event_fd
290
+ } ) . collect :: < Vec < _ > > ( ) ;
119
291
120
292
c. bench_function ( "process_dispatch_few_events" , |b| {
121
293
b. iter ( || {
122
- let _ = event_manager. run ( ) . unwrap ( ) ;
294
+ assert_eq ! ( event_manager. wait ( Some ( 0 ) ) , Ok ( active ) ) ;
123
295
} )
124
296
} ) ;
297
+
298
+ drop ( subscribers) ;
125
299
}
126
300
127
- criterion_group ! {
301
+ criterion_group ! (
128
302
name = benches;
129
303
config = Criterion :: default ( )
130
304
. sample_size( 200 )
131
305
. measurement_time( std:: time:: Duration :: from_secs( 40 ) ) ;
132
306
targets = run_basic_subscriber, run_arc_mutex_subscriber, run_subscriber_with_inner_mut,
133
- run_multiple_subscriber_types, run_with_few_active_events
134
- }
135
-
136
- criterion_main ! {
137
- benches
138
- }
307
+ run_multiple_subscriber_types, run_with_few_active_events
308
+ ) ;
309
+ criterion_main ! ( benches) ;
0 commit comments