@@ -85,6 +85,8 @@ struct conn {
85
85
86
86
struct state {
87
87
dispatch_semaphore_t sem ;
88
+ dispatch_queue_t vms_queue ;
89
+ dispatch_queue_t host_queue ;
88
90
struct conn * conns ; // TODO: avoid O(N) lookup
89
91
} _state ;
90
92
@@ -252,16 +254,14 @@ static interface_ref start(struct state *state, struct cli_options *cliopt) {
252
254
cliopt -> vmnet_nat66_prefix );
253
255
}
254
256
255
- dispatch_queue_t q = dispatch_queue_create (
256
- "io.github.lima-vm.socket_vmnet.start" , DISPATCH_QUEUE_SERIAL );
257
257
dispatch_semaphore_t sem = dispatch_semaphore_create (0 );
258
258
259
259
__block interface_ref iface ;
260
260
__block vmnet_return_t status ;
261
261
262
262
__block uint64_t max_bytes = 0 ;
263
263
iface = vmnet_start_interface (
264
- dict , q , ^(vmnet_return_t x_status , xpc_object_t x_param ) {
264
+ dict , state -> host_queue , ^(vmnet_return_t x_status , xpc_object_t x_param ) {
265
265
status = x_status ;
266
266
if (x_status == VMNET_SUCCESS ) {
267
267
print_vmnet_start_param (x_param );
@@ -271,17 +271,14 @@ static interface_ref start(struct state *state, struct cli_options *cliopt) {
271
271
dispatch_semaphore_signal (sem );
272
272
});
273
273
dispatch_semaphore_wait (sem , DISPATCH_TIME_FOREVER );
274
- dispatch_release (q );
275
274
xpc_release (dict );
276
275
if (status != VMNET_SUCCESS ) {
277
276
ERRORF ("vmnet_start_interface: [%d] %s" , status , vmnet_strerror (status ));
278
277
return NULL ;
279
278
}
280
279
281
- dispatch_queue_t event_q = dispatch_queue_create (
282
- "io.github.lima-vm.socket_vmnet.events" , DISPATCH_QUEUE_CONCURRENT );
283
280
vmnet_interface_set_event_callback (
284
- iface , VMNET_INTERFACE_PACKETS_AVAILABLE , event_q ,
281
+ iface , VMNET_INTERFACE_PACKETS_AVAILABLE , state -> host_queue ,
285
282
^(interface_event_t __attribute__ ((unused )) x_event_id ,
286
283
xpc_object_t x_event ) {
287
284
uint64_t estim_count = xpc_dictionary_get_uint64 (
@@ -298,21 +295,17 @@ static void signalhandler(int signal) {
298
295
siglongjmp (jmpbuf , 1 );
299
296
}
300
297
301
- static void stop (interface_ref iface ) {
298
+ static void stop (struct state * state , interface_ref iface ) {
302
299
if (iface == NULL ) {
303
300
return ;
304
301
}
305
- dispatch_queue_t q = dispatch_queue_create (
306
- "io.github.lima-vm.socket_vmnet.stop" , DISPATCH_QUEUE_SERIAL );
307
302
dispatch_semaphore_t sem = dispatch_semaphore_create (0 );
308
303
__block vmnet_return_t status ;
309
- vmnet_stop_interface (iface , q , ^(vmnet_return_t x_status ) {
304
+ vmnet_stop_interface (iface , state -> host_queue , ^(vmnet_return_t x_status ) {
310
305
status = x_status ;
311
306
dispatch_semaphore_signal (sem );
312
307
});
313
308
dispatch_semaphore_wait (sem , DISPATCH_TIME_FOREVER );
314
- dispatch_release (q );
315
- // TODO: release event_q ?
316
309
if (status != VMNET_SUCCESS ) {
317
310
ERRORF ("vmnet_stop_interface: [%d] %s" , status , vmnet_strerror (status ));
318
311
}
@@ -376,8 +369,6 @@ int main(int argc, char *argv[]) {
376
369
debug = getenv ("DEBUG" ) != NULL ;
377
370
int rc = 1 , listen_fd = -1 ;
378
371
__block interface_ref iface = NULL ;
379
- dispatch_queue_t q = dispatch_queue_create (
380
- "io.github.lima-vm.socket_vmnet.accept" , DISPATCH_QUEUE_CONCURRENT );
381
372
382
373
struct cli_options * cliopt = cli_options_parse (argc , argv );
383
374
assert (cliopt != NULL );
@@ -420,6 +411,15 @@ int main(int argc, char *argv[]) {
420
411
struct state state ;
421
412
memset (& state , 0 , sizeof (state ));
422
413
state .sem = dispatch_semaphore_create (1 );
414
+
415
+ // Queue for vm connections, allowing processing vms requests in parallel.
416
+ state .vms_queue = dispatch_queue_create (
417
+ "io.github.lima-vm.socket_vmnet.vms" , DISPATCH_QUEUE_CONCURRENT );
418
+
419
+ // Queue for processing vmnet events.
420
+ state .host_queue = dispatch_queue_create (
421
+ "io.github.lima-vm.socket_vmnet.host" , DISPATCH_QUEUE_SERIAL );
422
+
423
423
iface = start (& state , cliopt );
424
424
if (iface == NULL ) {
425
425
// Error already logged.
@@ -442,16 +442,15 @@ int main(int argc, char *argv[]) {
442
442
goto done ;
443
443
}
444
444
struct state * state_p = & state ;
445
- dispatch_async (q , ^{
445
+ dispatch_async (state . vms_queue , ^{
446
446
on_accept (state_p , accept_fd , iface );
447
447
});
448
448
}
449
449
rc = 0 ;
450
450
done :
451
451
DEBUGF ("shutting down with rc=%d" , rc );
452
- dispatch_release (q );
453
452
if (iface != NULL ) {
454
- stop (iface );
453
+ stop (& state , iface );
455
454
}
456
455
if (listen >= 0 ) {
457
456
close (listen_fd );
@@ -460,6 +459,8 @@ int main(int argc, char *argv[]) {
460
459
unlink (cliopt -> pidfile );
461
460
close (pid_fd );
462
461
}
462
+ dispatch_release (state .vms_queue );
463
+ dispatch_release (state .host_queue );
463
464
cli_options_destroy (cliopt );
464
465
return rc ;
465
466
}
0 commit comments