Skip to content

Commit cb3d97c

Browse files
committed
improved allocation logic + some more tidy ups
1 parent f1a1924 commit cb3d97c

File tree

5 files changed

+37
-49
lines changed

5 files changed

+37
-49
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.12.0.9019
4+
Version: 0.12.0.9020
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# nanonext 0.12.0.9019 (development)
1+
# nanonext 0.12.0.9020 (development)
22

33
*Please note this version contains breaking behavioural changes - see updates below.*
44

src/aio.c

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -666,16 +666,11 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
666666
iov.iov_len = buf.cur - nst->textframes;
667667
iov.iov_buf = saio->data;
668668

669-
if ((xc = nng_aio_alloc(&saio->aio, isaio_complete, saio))) {
670-
R_Free(saio->data);
671-
goto exitlevel1;
672-
}
669+
if ((xc = nng_aio_alloc(&saio->aio, isaio_complete, saio)))
670+
goto exitlevel2;
673671

674-
if ((xc = nng_aio_set_iov(saio->aio, 1u, &iov))) {
675-
nng_aio_free(saio->aio);
676-
R_Free(saio->data);
677-
goto exitlevel1;
678-
}
672+
if ((xc = nng_aio_set_iov(saio->aio, 1u, &iov)))
673+
goto exitlevel3;
679674

680675
nng_aio_set_timeout(saio->aio, dur);
681676
nng_stream_send(sp, saio->aio);
@@ -703,6 +698,10 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
703698
UNPROTECT(3);
704699
return env;
705700

701+
exitlevel3:
702+
nng_aio_free(saio->aio);
703+
exitlevel2:
704+
R_Free(saio->data);
706705
exitlevel1:
707706
R_Free(saio);
708707
NANO_FREE(buf);
@@ -1225,6 +1224,7 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12251224
nng_ctx *ctx = (nng_ctx *) R_ExternalPtrAddr(con);
12261225
SEXP aio, env, fun;
12271226
nano_buf buf;
1227+
nano_aio *saio, *raio;
12281228
nng_msg *msg;
12291229
int xc;
12301230

@@ -1237,7 +1237,7 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12371237
nano_serialize_next(&buf, data); break;
12381238
}
12391239

1240-
nano_aio *saio = R_Calloc(1, nano_aio);
1240+
saio = R_Calloc(1, nano_aio);
12411241
#ifdef NANONEXT_LEGACY_NNG
12421242
saio->data = ctx;
12431243
#endif
@@ -1255,16 +1255,13 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12551255
nng_aio_set_msg(saio->aio, msg);
12561256
nng_ctx_send(*ctx, saio->aio);
12571257

1258-
nano_aio *raio = R_Calloc(1, nano_aio);
1258+
raio = R_Calloc(1, nano_aio);
12591259
raio->type = RECVAIO;
12601260
raio->mode = mod;
12611261
raio->next = saio;
12621262

1263-
if ((xc = nng_aio_alloc(&raio->aio, signal ? request_complete_signal : raio_complete, raio))) {
1264-
R_Free(raio);
1265-
nng_aio_free(saio->aio);
1266-
goto exitlevel1;
1267-
}
1263+
if ((xc = nng_aio_alloc(&raio->aio, signal ? request_complete_signal : raio_complete, raio)))
1264+
goto exitlevel2;
12681265

12691266
nng_aio_set_timeout(raio->aio, dur);
12701267
nng_ctx_recv(*ctx, raio->aio);
@@ -1287,6 +1284,9 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12871284
UNPROTECT(3);
12881285
return env;
12891286

1287+
exitlevel2:
1288+
R_Free(raio);
1289+
nng_aio_free(saio->aio);
12901290
exitlevel1:
12911291
R_Free(saio);
12921292
NANO_FREE(buf);

src/core.c

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {
6262
Rf_error("serialization exceeds max length of raw vector");
6363
}
6464
do {
65-
buf->len = buf->len * (double) (buf->len > 268435456 ? 1.2 : 2);
65+
buf->len += buf->len > NANONEXT_SERIAL_THR ? NANONEXT_SERIAL_THR : buf->len;
6666
} while (buf->len < req);
6767
buf->buf = R_Realloc(buf->buf, buf->len, unsigned char);
6868
}
@@ -75,26 +75,12 @@ static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {
7575
static void nano_skip_bytes(R_outpstream_t stream, void *src, int len) {
7676

7777
nano_buf *buf = (nano_buf *) stream->data;
78-
if (buf->len <= NANONEXT_SERIAL_HEADERS) {
78+
if (buf->len < NANONEXT_INIT_BUFSIZE) {
7979
buf->len = --buf->len ? buf->len : NANONEXT_INIT_BUFSIZE;
80-
return;
81-
}
82-
83-
size_t req = buf->cur + (size_t) len;
84-
if (req > buf->len) {
85-
if (req > R_XLEN_T_MAX) {
86-
if (buf->len) R_Free(buf->buf);
87-
Rf_error("serialization exceeds max length of raw vector");
88-
}
89-
do {
90-
buf->len = buf->len * (double) (buf->len > 268435456 ? 1.2 : 2);
91-
} while (buf->len < req);
92-
buf->buf = R_Realloc(buf->buf, buf->len, unsigned char);
80+
} else {
81+
nano_write_bytes(stream, src, len);
9382
}
9483

95-
memcpy(buf->buf + buf->cur, src, len);
96-
buf->cur += len;
97-
9884
}
9985

10086
static void nano_read_bytes(R_inpstream_t stream, void *dst, int len) {
@@ -271,7 +257,7 @@ void nano_serialize_next(nano_buf *buf, const SEXP object) {
271257
void nano_serialize_xdr(nano_buf *buf, const SEXP object, const int skip) {
272258

273259
NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
274-
buf->len = skip ? NANONEXT_SERIAL_HEADERS : buf->len;
260+
if (skip) buf->len = NANONEXT_SERIAL_HEADERS;
275261

276262
struct R_outpstream_st output_stream;
277263

@@ -1074,7 +1060,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
10741060

10751061
xc = nng_recv(*sock, &buf, &sz, NNG_FLAG_ALLOC + (flags < 0 || *NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
10761062
if (xc)
1077-
return mk_error(xc);
1063+
goto exitlevel1;
10781064

10791065
res = nano_decode(buf, sz, mod);
10801066
nng_free(buf, sz);
@@ -1083,13 +1069,13 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
10831069

10841070
nng_aio *aiop;
10851071
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
1086-
return mk_error(xc);
1072+
goto exitlevel1;
10871073
nng_aio_set_timeout(aiop, flags);
10881074
nng_recv_aio(*sock, aiop);
10891075
nng_aio_wait(aiop);
10901076
if ((xc = nng_aio_result(aiop))) {
10911077
nng_aio_free(aiop);
1092-
return mk_error(xc);
1078+
goto exitlevel1;
10931079
}
10941080
nng_msg *msgp = nng_aio_get_msg(aiop);
10951081
nng_aio_free(aiop);
@@ -1110,14 +1096,14 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
11101096
nng_aio *aiop;
11111097

11121098
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
1113-
return mk_error(xc);
1099+
goto exitlevel1;
11141100
nng_aio_set_timeout(aiop, flags < 0 ? 0 : flags > 0 ? flags : (*NANO_INTEGER(block) == 1) * NNG_DURATION_DEFAULT);
11151101
nng_ctx_recv(*ctxp, aiop);
11161102

11171103
nng_aio_wait(aiop);
11181104
if ((xc = nng_aio_result(aiop))) {
11191105
nng_aio_free(aiop);
1120-
return mk_error(xc);
1106+
goto exitlevel1;
11211107
}
11221108

11231109
msgp = nng_aio_get_msg(aiop);
@@ -1133,7 +1119,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
11331119

11341120
xc = nng_ctx_recvmsg(*ctxp, &msgp, (flags < 0 || *NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
11351121
if (xc)
1136-
return mk_error(xc);
1122+
goto exitlevel1;
11371123

11381124
buf = nng_msg_body(msgp);
11391125
sz = nng_msg_len(msgp);
@@ -1145,14 +1131,14 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
11451131
nng_aio *aiop;
11461132

11471133
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
1148-
return mk_error(xc);
1134+
goto exitlevel1;
11491135
nng_aio_set_timeout(aiop, flags);
11501136
nng_ctx_recv(*ctxp, aiop);
11511137

11521138
nng_aio_wait(aiop);
11531139
if ((xc = nng_aio_result(aiop))) {
11541140
nng_aio_free(aiop);
1155-
return mk_error(xc);
1141+
goto exitlevel1;
11561142
}
11571143

11581144
msgp = nng_aio_get_msg(aiop);
@@ -1179,11 +1165,11 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
11791165
iov.iov_buf = buf;
11801166

11811167
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
1182-
goto exitlevel1;
1168+
goto exitlevel2;
11831169

11841170
if ((xc = nng_aio_set_iov(aiop, 1u, &iov))) {
11851171
nng_aio_free(aiop);
1186-
goto exitlevel1;
1172+
goto exitlevel2;
11871173
}
11881174

11891175
nng_aio_set_timeout(aiop, flags ? flags : (*NANO_INTEGER(block) != 0) * NNG_DURATION_DEFAULT);
@@ -1192,7 +1178,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
11921178
nng_aio_wait(aiop);
11931179
if ((xc = nng_aio_result(aiop))) {
11941180
nng_aio_free(aiop);
1195-
goto exitlevel1;
1181+
goto exitlevel2;
11961182
}
11971183

11981184
sz = nng_aio_count(aiop);
@@ -1206,8 +1192,9 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
12061192

12071193
return res;
12081194

1209-
exitlevel1:
1195+
exitlevel2:
12101196
R_Free(buf);
1197+
exitlevel1:
12111198
return mk_error(xc);
12121199

12131200
}

src/nanonext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ typedef struct nano_cv_s {
158158
#define NANONEXT_INIT_BUFSIZE 8192
159159
#define NANONEXT_SERIAL_VER 3
160160
#define NANONEXT_SERIAL_HEADERS 6
161+
#define NANONEXT_SERIAL_THR 134217728
161162
#define NANONEXT_LD_STRLEN 21
162163
#define NANO_ALLOC(x, sz) \
163164
(x)->buf = R_Calloc(sz, unsigned char); \

0 commit comments

Comments
 (0)