@@ -53,6 +53,24 @@ static void isaio_complete(void *arg) {
5353
5454}
5555
56+ static void raio_complete (void * arg ) {
57+
58+ nano_aio * raio = (nano_aio * ) arg ;
59+ int res = nng_aio_result (raio -> aio );
60+ if (res == 0 ) {
61+ nng_msg * msg = nng_aio_get_msg (raio -> aio );
62+ raio -> data = msg ;
63+ nng_pipe p = nng_msg_get_pipe (msg );
64+ res = - (int ) p .id ;
65+ }
66+
67+ raio -> result = res ;
68+
69+ if (raio -> cb != NULL )
70+ later2 (raio_invoke_cb , raio -> cb );
71+
72+ }
73+
5674static void iraio_complete (void * arg ) {
5775
5876 nano_aio * iaio = (nano_aio * ) arg ;
@@ -500,7 +518,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
500518SEXP rnng_recv_aio (SEXP con , SEXP mode , SEXP timeout , SEXP cvar , SEXP bytes , SEXP clo ) {
501519
502520 const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration ) nano_integer (timeout );
503- const int signal = NANO_TAG (cvar ) == nano_CvSymbol ;
521+ int signal , interrupt ;
522+ if (cvar == R_NilValue ) {
523+ signal = 0 ;
524+ interrupt = 0 ;
525+ } else {
526+ signal = NANO_TAG (cvar ) == nano_CvSymbol ;
527+ interrupt = 1 - signal ;
528+ }
504529 nano_cv * ncv = signal ? (nano_cv * ) NANO_PTR (cvar ) : NULL ;
505530 nano_aio * raio ;
506531 SEXP aio , env , fun ;
@@ -516,7 +541,7 @@ SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEX
516541 raio -> mode = mod ;
517542 raio -> cb = NULL ;
518543
519- if ((xc = nng_aio_alloc (& raio -> aio , signal ? raio_complete_signal : raio_complete , raio )))
544+ if ((xc = nng_aio_alloc (& raio -> aio , signal ? raio_complete_signal : interrupt ? raio_complete_interrupt : raio_complete , raio )))
520545 goto exitlevel1 ;
521546
522547 nng_aio_set_timeout (raio -> aio , dur );
0 commit comments