Skip to content

Commit b0d22ea

Browse files
committed
optimise stream textframes handling
1 parent 632f000 commit b0d22ea

File tree

8 files changed

+287
-291
lines changed

8 files changed

+287
-291
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.11.0.9012
4+
Version: 0.11.0.9013
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# nanonext 0.11.0.9012 (development)
1+
# nanonext 0.11.0.9013 (development)
22

33
#### New Features
44

@@ -7,10 +7,10 @@
77
#### Updates
88

99
* More compact print methods for 'recvAio', 'sendAio', 'ncurlAio', 'ncurlSession' and 'tlsConfig' objects.
10+
* `random()` now explicitly limits argument 'n' to values between 0 and 1024.
1011
* `next_config()` now returns a pairlist (of the registered serialization functions) rather than a list (for efficiency).
1112
* Using mode 'next', serialization functions with incorrect signatures are now simply ignored rather than raise errors.
12-
* `random()` now explicitly limits argument 'n' to values between 0 and 1024.
13-
* 'nanoStream' objects simplified internally, and gain the attributes 'mode' and 'state'.
13+
* 'nanoStream' objects simplified internally with updated attributes 'mode' and 'state'.
1414
* Deprecated function `.until()` is removed.
1515
* Eliminates potential memory leaks along certain error paths.
1616
* Fixes bug which prevented much higher TLS performance when using the bundled 'libnng' source.

R/nano.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,8 @@ print.nanoListener <- function(x, ...) {
321321
#'
322322
print.nanoStream <- function(x, ...) {
323323

324-
cat(sprintf("< nanoStream >\n - mode: %s\n - state: %s\n - url: %s\n - textframes: %s\n",
325-
attr(x, "mode"), attr(x, "state"), attr(x, "url"), attr(x, "textframes")), file = stdout())
324+
cat(sprintf("< nanoStream >\n - mode: %s\n - state: %s\n - url: %s\n",
325+
attr(x, "mode"), attr(x, "state"), attr(x, "url")), file = stdout())
326326
invisible(x)
327327

328328
}

src/aio.c

Lines changed: 6 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616

1717
// nanonext - C level - Async Functions ----------------------------------------
1818

19-
#define NANONEXT_SIGNALS
19+
#define NANONEXT_HTTP
2020
#define NANONEXT_SUPPLEMENTALS
21+
#define NANONEXT_SIGNALS
2122
#include "nanonext.h"
2223

2324
// internals -------------------------------------------------------------------
@@ -265,28 +266,6 @@ static void iraio_complete_signal(void *arg) {
265266

266267
// finalisers ------------------------------------------------------------------
267268

268-
void dialer_finalizer(SEXP xptr) {
269-
270-
if (R_ExternalPtrAddr(xptr) == NULL) return;
271-
nano_dialer *xp = (nano_dialer *) R_ExternalPtrAddr(xptr);
272-
nng_dialer_close(xp->dial);
273-
if (xp->tls != NULL)
274-
nng_tls_config_free(xp->tls);
275-
R_Free(xp);
276-
277-
}
278-
279-
void listener_finalizer(SEXP xptr) {
280-
281-
if (R_ExternalPtrAddr(xptr) == NULL) return;
282-
nano_listener *xp = (nano_listener *) R_ExternalPtrAddr(xptr);
283-
nng_listener_close(xp->list);
284-
if (xp->tls != NULL)
285-
nng_tls_config_free(xp->tls);
286-
R_Free(xp);
287-
288-
}
289-
290269
static void saio_finalizer(SEXP xptr) {
291270

292271
if (R_ExternalPtrAddr(xptr) == NULL) return;
@@ -396,224 +375,6 @@ static void session_finalizer(SEXP xptr) {
396375

397376
}
398377

399-
// dialers and listeners -------------------------------------------------------
400-
401-
SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
402-
403-
if (R_ExternalPtrTag(socket) != nano_SocketSymbol)
404-
Rf_error("'socket' is not a valid Socket");
405-
406-
const uint8_t sec = tls != R_NilValue;
407-
408-
if (sec && R_ExternalPtrTag(tls) != nano_TlsSymbol)
409-
Rf_error("'tls' is not a valid TLS Configuration");
410-
411-
nng_socket *sock = (nng_socket *) R_ExternalPtrAddr(socket);
412-
const int start = *NANO_INTEGER(autostart);
413-
const char *ur = CHAR(STRING_ELT(url, 0));
414-
nano_dialer *dp = R_Calloc(1, nano_dialer);
415-
SEXP dialer, klass, attr, newattr;
416-
nng_url *up;
417-
int xc;
418-
419-
if (sec) {
420-
if ((xc = nng_dialer_create(&dp->dial, *sock, ur)))
421-
goto exitlevel1;
422-
dp->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
423-
nng_tls_config_hold(dp->tls);
424-
if ((xc = nng_url_parse(&up, ur)))
425-
goto exitlevel2;
426-
if ((xc = nng_tls_config_server_name(dp->tls, up->u_hostname)) ||
427-
(xc = nng_dialer_set_ptr(dp->dial, NNG_OPT_TLS_CONFIG, dp->tls)))
428-
goto exitlevel3;
429-
nng_url_free(up);
430-
}
431-
432-
switch (start) {
433-
case 0:
434-
xc = sec ? 0 : nng_dialer_create(&dp->dial, *sock, ur);
435-
break;
436-
case 1:
437-
xc = sec ? nng_dialer_start(dp->dial, NNG_FLAG_NONBLOCK) : nng_dial(*sock, ur, &dp->dial, NNG_FLAG_NONBLOCK);
438-
break;
439-
default:
440-
xc = sec ? nng_dialer_start(dp->dial, 0) : nng_dial(*sock, ur, &dp->dial, 0);
441-
}
442-
if (xc)
443-
goto exitlevel1;
444-
445-
PROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue));
446-
R_RegisterCFinalizerEx(dialer, dialer_finalizer, TRUE);
447-
448-
klass = Rf_allocVector(STRSXP, 2);
449-
Rf_classgets(dialer, klass);
450-
SET_STRING_ELT(klass, 0, Rf_mkChar("nanoDialer"));
451-
SET_STRING_ELT(klass, 1, Rf_mkChar("nano"));
452-
Rf_setAttrib(dialer, nano_IdSymbol, Rf_ScalarInteger(nng_dialer_id(dp->dial)));
453-
Rf_setAttrib(dialer, nano_UrlSymbol, url);
454-
Rf_setAttrib(dialer, nano_StateSymbol, Rf_mkString(start ? "started" : "not started"));
455-
Rf_setAttrib(dialer, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock)));
456-
457-
attr = Rf_getAttrib(socket, nano_DialerSymbol);
458-
if (attr == R_NilValue) {
459-
PROTECT(newattr = Rf_allocVector(VECSXP, 1));
460-
SET_VECTOR_ELT(newattr, 0, dialer);
461-
} else {
462-
R_xlen_t xlen = Rf_xlength(attr);
463-
PROTECT(newattr = Rf_allocVector(VECSXP, xlen + 1));
464-
for (R_xlen_t i = 0; i < xlen; i++)
465-
SET_VECTOR_ELT(newattr, i, VECTOR_ELT(attr, i));
466-
SET_VECTOR_ELT(newattr, xlen, dialer);
467-
}
468-
Rf_setAttrib(socket, nano_DialerSymbol, newattr);
469-
470-
UNPROTECT(2);
471-
return nano_success;
472-
473-
exitlevel3:
474-
nng_url_free(up);
475-
exitlevel2:
476-
nng_tls_config_free(dp->tls);
477-
exitlevel1:
478-
R_Free(dp);
479-
if (*NANO_INTEGER(error)) ERROR_OUT(xc);
480-
ERROR_RET(xc);
481-
482-
}
483-
484-
SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
485-
486-
if (R_ExternalPtrTag(socket) != nano_SocketSymbol)
487-
Rf_error("'socket' is not a valid Socket");
488-
489-
const uint8_t sec = tls != R_NilValue;
490-
491-
if (sec && R_ExternalPtrTag(tls) != nano_TlsSymbol)
492-
Rf_error("'tls' is not a valid TLS Configuration");
493-
494-
nng_socket *sock = (nng_socket *) R_ExternalPtrAddr(socket);
495-
const int start = *NANO_INTEGER(autostart);
496-
const char *ur = CHAR(STRING_ELT(url, 0));
497-
nano_listener *lp = R_Calloc(1, nano_listener);
498-
SEXP listener, klass, attr, newattr;
499-
nng_url *up;
500-
int xc;
501-
502-
if (sec) {
503-
if ((xc = nng_listener_create(&lp->list, *sock, ur)))
504-
goto exitlevel1;
505-
lp->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
506-
nng_tls_config_hold(lp->tls);
507-
if ((xc = nng_url_parse(&up, ur)))
508-
goto exitlevel2;
509-
if ((xc = nng_tls_config_server_name(lp->tls, up->u_hostname)) ||
510-
(xc = nng_listener_set_ptr(lp->list, NNG_OPT_TLS_CONFIG, lp->tls)))
511-
goto exitlevel3;
512-
nng_url_free(up);
513-
}
514-
515-
if (start) {
516-
xc = sec ? nng_listener_start(lp->list, 0) : nng_listen(*sock, ur, &lp->list, 0);
517-
} else {
518-
xc = sec ? 0 : nng_listener_create(&lp->list, *sock, ur);
519-
}
520-
if (xc)
521-
goto exitlevel1;
522-
523-
PROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue));
524-
R_RegisterCFinalizerEx(listener, listener_finalizer, TRUE);
525-
526-
klass = Rf_allocVector(STRSXP, 2);
527-
Rf_classgets(listener, klass);
528-
SET_STRING_ELT(klass, 0, Rf_mkChar("nanoListener"));
529-
SET_STRING_ELT(klass, 1, Rf_mkChar("nano"));
530-
Rf_setAttrib(listener, nano_IdSymbol, Rf_ScalarInteger(nng_listener_id(lp->list)));
531-
Rf_setAttrib(listener, nano_UrlSymbol, url);
532-
Rf_setAttrib(listener, nano_StateSymbol, Rf_mkString(start ? "started" : "not started"));
533-
Rf_setAttrib(listener, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock)));
534-
535-
attr = Rf_getAttrib(socket, nano_ListenerSymbol);
536-
if (attr == R_NilValue) {
537-
PROTECT(newattr = Rf_allocVector(VECSXP, 1));
538-
SET_VECTOR_ELT(newattr, 0, listener);
539-
} else {
540-
R_xlen_t xlen = Rf_xlength(attr);
541-
PROTECT(newattr = Rf_allocVector(VECSXP, xlen + 1));
542-
for (R_xlen_t i = 0; i < xlen; i++)
543-
SET_VECTOR_ELT(newattr, i, VECTOR_ELT(attr, i));
544-
SET_VECTOR_ELT(newattr, xlen, listener);
545-
}
546-
Rf_setAttrib(socket, nano_ListenerSymbol, newattr);
547-
548-
UNPROTECT(2);
549-
return nano_success;
550-
551-
exitlevel3:
552-
nng_url_free(up);
553-
exitlevel2:
554-
nng_tls_config_free(lp->tls);
555-
exitlevel1:
556-
R_Free(lp);
557-
if (*NANO_INTEGER(error)) ERROR_OUT(xc);
558-
ERROR_RET(xc);
559-
560-
}
561-
562-
SEXP rnng_dialer_start(SEXP dialer, SEXP async) {
563-
564-
if (R_ExternalPtrTag(dialer) != nano_DialerSymbol)
565-
Rf_error("'dialer' is not a valid Dialer");
566-
nng_dialer *dial = (nng_dialer *) R_ExternalPtrAddr(dialer);
567-
const int flags = (*NANO_INTEGER(async) == 1) * NNG_FLAG_NONBLOCK;
568-
const int xc = nng_dialer_start(*dial, flags);
569-
if (xc)
570-
ERROR_RET(xc);
571-
572-
Rf_setAttrib(dialer, nano_StateSymbol, Rf_mkString("started"));
573-
return nano_success;
574-
575-
}
576-
577-
SEXP rnng_listener_start(SEXP listener) {
578-
579-
if (R_ExternalPtrTag(listener) != nano_ListenerSymbol)
580-
Rf_error("'listener' is not a valid Listener");
581-
nng_listener *list = (nng_listener *) R_ExternalPtrAddr(listener);
582-
const int xc = nng_listener_start(*list, 0);
583-
if (xc)
584-
ERROR_RET(xc);
585-
586-
Rf_setAttrib(listener, nano_StateSymbol, Rf_mkString("started"));
587-
return nano_success;
588-
589-
}
590-
591-
SEXP rnng_dialer_close(SEXP dialer) {
592-
593-
if (R_ExternalPtrTag(dialer) != nano_DialerSymbol)
594-
Rf_error("'dialer' is not a valid Dialer");
595-
nng_dialer *dial = (nng_dialer *) R_ExternalPtrAddr(dialer);
596-
const int xc = nng_dialer_close(*dial);
597-
if (xc)
598-
ERROR_RET(xc);
599-
Rf_setAttrib(dialer, nano_StateSymbol, Rf_mkString("closed"));
600-
return nano_success;
601-
602-
}
603-
604-
SEXP rnng_listener_close(SEXP listener) {
605-
606-
if (R_ExternalPtrTag(listener) != nano_ListenerSymbol)
607-
Rf_error("'listener' is not a valid Listener");
608-
nng_listener *list = (nng_listener *) R_ExternalPtrAddr(listener);
609-
const int xc = nng_listener_close(*list);
610-
if (xc)
611-
ERROR_RET(xc);
612-
Rf_setAttrib(listener, nano_StateSymbol, Rf_mkString("closed"));
613-
return nano_success;
614-
615-
}
616-
617378
// core aio --------------------------------------------------------------------
618379

619380
SEXP rnng_aio_result(SEXP env) {
@@ -911,18 +672,17 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
911672

912673
} else if (ptrtag == nano_StreamSymbol) {
913674

914-
const int frames = *NANO_INTEGER(Rf_getAttrib(con, nano_TextframesSymbol));
915-
916675
nano_encode(&buf, data);
917676

918-
nng_stream **sp = (nng_stream **) R_ExternalPtrAddr(con);
677+
nano_stream *nst = (nano_stream *) R_ExternalPtrAddr(con);
678+
nng_stream *sp = nst->stream;
919679
nng_iov iov;
920680

921681
saio = R_Calloc(1, nano_aio);
922682
saio->type = IOV_SENDAIO;
923683
saio->data = R_Calloc(buf.cur, unsigned char);
924684
memcpy(saio->data, buf.buf, buf.cur);
925-
iov.iov_len = buf.cur - (frames == 1);
685+
iov.iov_len = buf.cur - nst->textframes;
926686
iov.iov_buf = saio->data;
927687

928688
if ((xc = nng_aio_alloc(&saio->aio, isaio_complete, saio))) {
@@ -937,7 +697,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
937697
}
938698

939699
nng_aio_set_timeout(saio->aio, dur);
940-
nng_stream_send(*sp, saio->aio);
700+
nng_stream_send(sp, saio->aio);
941701

942702
PROTECT(aio = R_MakeExternalPtr(saio, nano_AioSymbol, R_NilValue));
943703
R_RegisterCFinalizerEx(aio, iaio_finalizer, TRUE);

0 commit comments

Comments
 (0)