Skip to content

Commit 2774097

Browse files
committed
simplify streams further
1 parent 43b4819 commit 2774097

File tree

4 files changed

+74
-84
lines changed

4 files changed

+74
-84
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.11.0.9011
4+
Version: 0.11.0.9012
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# nanonext 0.11.0.9011 (development)
1+
# nanonext 0.11.0.9012 (development)
22

33
#### New Features
44

src/utils.c

Lines changed: 65 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,20 @@
2323

2424
// internals -------------------------------------------------------------------
2525

26-
typedef struct nano_stream_listener_s {
27-
nng_stream *stream;
28-
nng_stream_listener *list;
29-
nng_tls_config *tls;
30-
} nano_stream_listener;
26+
typedef enum nano_stream_typ {
27+
DIALER,
28+
LISTENER
29+
} nano_stream_typ;
3130

32-
typedef struct nano_stream_dialer_s {
31+
typedef struct nano_stream_s {
3332
nng_stream *stream;
34-
nng_stream_dialer *dial;
33+
nano_stream_typ type;
34+
union {
35+
nng_stream_dialer *dial;
36+
nng_stream_listener *list;
37+
} endpoint;
3538
nng_tls_config *tls;
36-
} nano_stream_dialer;
39+
} nano_stream;
3740

3841
SEXP mk_error_ncurl(const int xc) {
3942

@@ -52,28 +55,19 @@ SEXP mk_error_ncurl(const int xc) {
5255

5356
// finalizers ------------------------------------------------------------------
5457

55-
static void stream_listener_finalizer(SEXP xptr) {
56-
57-
if (R_ExternalPtrAddr(xptr) == NULL) return;
58-
nano_stream_listener *xp = (nano_stream_listener *) R_ExternalPtrAddr(xptr);
59-
nng_stream_close(xp->stream);
60-
nng_stream_listener_close(xp->list);
61-
nng_stream_free(xp->stream);
62-
nng_stream_listener_free(xp->list);
63-
if (xp->tls != NULL)
64-
nng_tls_config_free(xp->tls);
65-
R_Free(xp);
66-
67-
}
68-
69-
static void stream_dialer_finalizer(SEXP xptr) {
58+
static void stream_finalizer(SEXP xptr) {
7059

7160
if (R_ExternalPtrAddr(xptr) == NULL) return;
72-
nano_stream_dialer *xp = (nano_stream_dialer *) R_ExternalPtrAddr(xptr);
61+
nano_stream *xp = (nano_stream *) R_ExternalPtrAddr(xptr);
7362
nng_stream_close(xp->stream);
74-
nng_stream_dialer_close(xp->dial);
7563
nng_stream_free(xp->stream);
76-
nng_stream_dialer_free(xp->dial);
64+
if (xp->type == DIALER) {
65+
nng_stream_dialer_close(xp->endpoint.dial);
66+
nng_stream_dialer_free(xp->endpoint.dial);
67+
} else {
68+
nng_stream_listener_close(xp->endpoint.list);
69+
nng_stream_listener_free(xp->endpoint.list);
70+
}
7771
if (xp->tls != NULL)
7872
nng_tls_config_free(xp->tls);
7973
R_Free(xp);
@@ -374,8 +368,9 @@ SEXP rnng_stream_dial(SEXP url, SEXP textframes, SEXP tls) {
374368
const char *add = CHAR(STRING_ELT(url, 0));
375369
if (tls != R_NilValue && R_ExternalPtrTag(tls) != nano_TlsSymbol)
376370
Rf_error("'tls' is not a valid TLS Configuration");
377-
nano_stream_dialer *nsd = R_Calloc(1, nano_stream_dialer);
378-
nsd->tls = NULL;
371+
nano_stream *nst = R_Calloc(1, nano_stream);
372+
nst->type = DIALER;
373+
nst->tls = NULL;
379374
nng_url *up;
380375
nng_aio *aiop;
381376
int xc, frames = 0;
@@ -384,35 +379,35 @@ SEXP rnng_stream_dial(SEXP url, SEXP textframes, SEXP tls) {
384379
if ((xc = nng_url_parse(&up, add)))
385380
goto exitlevel1;
386381

387-
xc = nng_stream_dialer_alloc_url(&nsd->dial, up);
382+
xc = nng_stream_dialer_alloc_url(&nst->endpoint.dial, up);
388383
if (xc)
389384
goto exitlevel2;
390385

391386
if (!strcmp(up->u_scheme, "ws") || !strcmp(up->u_scheme, "wss")) {
392387
frames = *NANO_INTEGER(textframes);
393388
if (frames &&
394-
((xc = nng_stream_dialer_set_bool(nsd->dial, "ws:recv-text", 1)) ||
395-
(xc = nng_stream_dialer_set_bool(nsd->dial, "ws:send-text", 1))))
389+
((xc = nng_stream_dialer_set_bool(nst->endpoint.dial, "ws:recv-text", 1)) ||
390+
(xc = nng_stream_dialer_set_bool(nst->endpoint.dial, "ws:send-text", 1))))
396391
goto exitlevel3;
397392
}
398393

399394
if (!strcmp(up->u_scheme, "wss")) {
400395

401396
if (tls == R_NilValue) {
402-
if ((xc = nng_tls_config_alloc(&nsd->tls, NNG_TLS_MODE_CLIENT)))
397+
if ((xc = nng_tls_config_alloc(&nst->tls, NNG_TLS_MODE_CLIENT)))
403398
goto exitlevel3;
404399

405-
if ((xc = nng_tls_config_server_name(nsd->tls, up->u_hostname)) ||
406-
(xc = nng_tls_config_auth_mode(nsd->tls, NNG_TLS_AUTH_MODE_NONE)) ||
407-
(xc = nng_stream_dialer_set_ptr(nsd->dial, NNG_OPT_TLS_CONFIG, nsd->tls)))
400+
if ((xc = nng_tls_config_server_name(nst->tls, up->u_hostname)) ||
401+
(xc = nng_tls_config_auth_mode(nst->tls, NNG_TLS_AUTH_MODE_NONE)) ||
402+
(xc = nng_stream_dialer_set_ptr(nst->endpoint.dial, NNG_OPT_TLS_CONFIG, nst->tls)))
408403
goto exitlevel4;
409404
} else {
410405

411-
nsd->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
412-
nng_tls_config_hold(nsd->tls);
406+
nst->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
407+
nng_tls_config_hold(nst->tls);
413408

414-
if ((xc = nng_tls_config_server_name(nsd->tls, up->u_hostname)) ||
415-
(xc = nng_stream_dialer_set_ptr(nsd->dial, NNG_OPT_TLS_CONFIG, nsd->tls)))
409+
if ((xc = nng_tls_config_server_name(nst->tls, up->u_hostname)) ||
410+
(xc = nng_stream_dialer_set_ptr(nst->endpoint.dial, NNG_OPT_TLS_CONFIG, nst->tls)))
416411
goto exitlevel4;
417412
}
418413

@@ -421,18 +416,18 @@ SEXP rnng_stream_dial(SEXP url, SEXP textframes, SEXP tls) {
421416
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
422417
goto exitlevel4;
423418

424-
nng_stream_dialer_dial(nsd->dial, aiop);
419+
nng_stream_dialer_dial(nst->endpoint.dial, aiop);
425420
nng_aio_wait(aiop);
426421
if ((xc = nng_aio_result(aiop)))
427422
goto exitlevel5;
428423

429-
nsd->stream = nng_aio_get_output(aiop, 0);
424+
nst->stream = nng_aio_get_output(aiop, 0);
430425

431426
nng_aio_free(aiop);
432427
nng_url_free(up);
433428

434-
PROTECT(sd = R_MakeExternalPtr(nsd, nano_StreamSymbol, R_NilValue));
435-
R_RegisterCFinalizerEx(sd, stream_dialer_finalizer, TRUE);
429+
PROTECT(sd = R_MakeExternalPtr(nst, nano_StreamSymbol, R_NilValue));
430+
R_RegisterCFinalizerEx(sd, stream_finalizer, TRUE);
436431
Rf_setAttrib(sd, R_ModeSymbol, Rf_mkString("dialer"));
437432
Rf_setAttrib(sd, nano_StateSymbol, Rf_mkString("opened"));
438433
Rf_setAttrib(sd, nano_UrlSymbol, url);
@@ -449,14 +444,14 @@ SEXP rnng_stream_dial(SEXP url, SEXP textframes, SEXP tls) {
449444
exitlevel5:
450445
nng_aio_free(aiop);
451446
exitlevel4:
452-
if (nsd->tls != NULL)
453-
nng_tls_config_free(nsd->tls);
447+
if (nst->tls != NULL)
448+
nng_tls_config_free(nst->tls);
454449
exitlevel3:
455-
nng_stream_dialer_free(nsd->dial);
450+
nng_stream_dialer_free(nst->endpoint.dial);
456451
exitlevel2:
457452
nng_url_free(up);
458453
exitlevel1:
459-
R_Free(nsd);
454+
R_Free(nst);
460455
ERROR_OUT(xc);
461456

462457
}
@@ -466,8 +461,9 @@ SEXP rnng_stream_listen(SEXP url, SEXP textframes, SEXP tls) {
466461
const char *add = CHAR(STRING_ELT(url, 0));
467462
if (tls != R_NilValue && R_ExternalPtrTag(tls) != nano_TlsSymbol)
468463
Rf_error("'tls' is not a valid TLS Configuration");
469-
nano_stream_listener *nsl = R_Calloc(1, nano_stream_listener);
470-
nsl->tls = NULL;
464+
nano_stream *nst = R_Calloc(1, nano_stream);
465+
nst->type = LISTENER;
466+
nst->tls = NULL;
471467
nng_url *up;
472468
nng_aio *aiop;
473469
int xc, frames = 0;
@@ -476,57 +472,57 @@ SEXP rnng_stream_listen(SEXP url, SEXP textframes, SEXP tls) {
476472
if ((xc = nng_url_parse(&up, add)))
477473
goto exitlevel1;
478474

479-
xc = nng_stream_listener_alloc_url(&nsl->list, up);
475+
xc = nng_stream_listener_alloc_url(&nst->endpoint.list, up);
480476
if (xc)
481477
goto exitlevel2;
482478

483479
if (!strcmp(up->u_scheme, "ws") || !strcmp(up->u_scheme, "wss")) {
484480
frames = *NANO_INTEGER(textframes);
485481
if (frames &&
486-
((xc = nng_stream_listener_set_bool(nsl->list, "ws:recv-text", 1)) ||
487-
(xc = nng_stream_listener_set_bool(nsl->list, "ws:send-text", 1))))
482+
((xc = nng_stream_listener_set_bool(nst->endpoint.list, "ws:recv-text", 1)) ||
483+
(xc = nng_stream_listener_set_bool(nst->endpoint.list, "ws:send-text", 1))))
488484
goto exitlevel3;
489485
}
490486

491487
if (!strcmp(up->u_scheme, "wss")) {
492488

493489
if (tls == R_NilValue) {
494-
if ((xc = nng_tls_config_alloc(&nsl->tls, NNG_TLS_MODE_SERVER)))
490+
if ((xc = nng_tls_config_alloc(&nst->tls, NNG_TLS_MODE_SERVER)))
495491
goto exitlevel3;
496492

497-
if ((xc = nng_tls_config_auth_mode(nsl->tls, NNG_TLS_AUTH_MODE_NONE)) ||
498-
(xc = nng_stream_listener_set_ptr(nsl->list, NNG_OPT_TLS_CONFIG, nsl->tls)))
493+
if ((xc = nng_tls_config_auth_mode(nst->tls, NNG_TLS_AUTH_MODE_NONE)) ||
494+
(xc = nng_stream_listener_set_ptr(nst->endpoint.list, NNG_OPT_TLS_CONFIG, nst->tls)))
499495
goto exitlevel4;
500496
} else {
501497

502-
nsl->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
503-
nng_tls_config_hold(nsl->tls);
498+
nst->tls = (nng_tls_config *) R_ExternalPtrAddr(tls);
499+
nng_tls_config_hold(nst->tls);
504500

505-
if ((xc = nng_tls_config_server_name(nsl->tls, up->u_hostname)) ||
506-
(xc = nng_stream_listener_set_ptr(nsl->list, NNG_OPT_TLS_CONFIG, nsl->tls)))
501+
if ((xc = nng_tls_config_server_name(nst->tls, up->u_hostname)) ||
502+
(xc = nng_stream_listener_set_ptr(nst->endpoint.list, NNG_OPT_TLS_CONFIG, nst->tls)))
507503
goto exitlevel4;
508504
}
509505

510506
}
511507

512-
if ((xc = nng_stream_listener_listen(nsl->list)))
508+
if ((xc = nng_stream_listener_listen(nst->endpoint.list)))
513509
goto exitlevel4;
514510

515511
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
516512
goto exitlevel4;
517513

518-
nng_stream_listener_accept(nsl->list, aiop);
514+
nng_stream_listener_accept(nst->endpoint.list, aiop);
519515
nng_aio_wait(aiop);
520516
if ((xc = nng_aio_result(aiop)))
521517
goto exitlevel5;
522518

523-
nsl->stream = nng_aio_get_output(aiop, 0);
519+
nst->stream = nng_aio_get_output(aiop, 0);
524520

525521
nng_aio_free(aiop);
526522
nng_url_free(up);
527523

528-
PROTECT(sl = R_MakeExternalPtr(nsl, nano_StreamSymbol, R_NilValue));
529-
R_RegisterCFinalizerEx(sl, stream_listener_finalizer, TRUE);
524+
PROTECT(sl = R_MakeExternalPtr(nst, nano_StreamSymbol, R_NilValue));
525+
R_RegisterCFinalizerEx(sl, stream_finalizer, TRUE);
530526
Rf_setAttrib(sl, R_ModeSymbol, Rf_mkString("listener"));
531527
Rf_setAttrib(sl, nano_StateSymbol, Rf_mkString("opened"));
532528
Rf_setAttrib(sl, nano_UrlSymbol, url);
@@ -543,14 +539,14 @@ SEXP rnng_stream_listen(SEXP url, SEXP textframes, SEXP tls) {
543539
exitlevel5:
544540
nng_aio_free(aiop);
545541
exitlevel4:
546-
if (nsl->tls != NULL)
547-
nng_tls_config_free(nsl->tls);
542+
if (nst->tls != NULL)
543+
nng_tls_config_free(nst->tls);
548544
exitlevel3:
549-
nng_stream_listener_free(nsl->list);
545+
nng_stream_listener_free(nst->endpoint.list);
550546
exitlevel2:
551547
nng_url_free(up);
552548
exitlevel1:
553-
R_Free(nsl);
549+
R_Free(nst);
554550
ERROR_OUT(xc);
555551

556552
}
@@ -560,13 +556,7 @@ SEXP rnng_stream_close(SEXP stream) {
560556
if (R_ExternalPtrTag(stream) != nano_StreamSymbol)
561557
Rf_error("'stream' is not a valid or active Stream");
562558

563-
SEXP mode = Rf_getAttrib(stream, R_ModeSymbol);
564-
const char *mod = CHAR(STRING_ELT(mode, 0));
565-
if (!strncmp(mod, "dialer", 6)) {
566-
stream_dialer_finalizer(stream);
567-
} else {
568-
stream_listener_finalizer(stream);
569-
}
559+
stream_finalizer(stream);
570560
R_SetExternalPtrTag(stream, R_NilValue);
571561
R_ClearExternalPtr(stream);
572562
Rf_setAttrib(stream, nano_StateSymbol, Rf_mkString("closed"));

vignettes/nanonext.Rmd

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ call_aio(aio)
206206
aio
207207
#> < recvAio | $data >
208208
aio$data |> str()
209-
#> num [1:100000000] -1.6 0.478 -0.703 0.836 -1.161 ...
209+
#> num [1:100000000] -0.9504 -0.9926 -0.015 0.0319 0.9306 ...
210210
```
211211

212212
As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete.
@@ -325,7 +325,7 @@ Additionally, the convenience function `write_cert()` can automatically generate
325325
cert <- write_cert(cn = "127.0.0.1")
326326
str(cert)
327327
#> List of 2
328-
#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKAIBAAKCAgEAw9gD40ZaDXjjlEIcif8TSx9o5S8L0ScLUX+WGSxTqhK3EUqq\nO7xx54/lf80z"| __truncated__
328+
#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKgIBAAKCAgEArOkChHl0yUMW+4UT44pGJSvCp4ejQ4lyCNMzOLzs8ota2qH0\npliEjesxv3Y8"| __truncated__
329329
#> $ client: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ ""
330330

331331
ser <- tls_config(server = cert$server)
@@ -483,7 +483,7 @@ ncurl("https://postman-echo.com/get")
483483
#> NULL
484484
#>
485485
#> $data
486-
#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-659dc265-615facef09d5ba5559a96d9e\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}"
486+
#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-659e628a-051f02da266430d33ca79bad\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}"
487487
```
488488

489489
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -500,10 +500,10 @@ res
500500

501501
call_aio(res)$headers
502502
#> $date
503-
#> [1] "Tue, 09 Jan 2024 22:02:14 GMT"
503+
#> [1] "Wed, 10 Jan 2024 09:25:31 GMT"
504504

505505
res$data
506-
#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-659dc266-54cf05c0780d8a2458a972d8\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}"
506+
#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-659e628a-7ab5f14023fce74710bbee51\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}"
507507
```
508508

509509
In this respect, it may be used as a performant and lightweight method for making REST API requests.
@@ -527,7 +527,7 @@ transact(sess)
527527
#>
528528
#> $headers
529529
#> $headers$Date
530-
#> [1] "Tue, 09 Jan 2024 22:02:14 GMT"
530+
#> [1] "Wed, 10 Jan 2024 09:25:31 GMT"
531531
#>
532532
#> $headers$`Content-Type`
533533
#> [1] "application/json; charset=utf-8"
@@ -538,7 +538,7 @@ transact(sess)
538538
#> [41] 72 77 61 72 64 65 64 2d 70 72 6f 74 6f 22 3a 20 22 68 74 74 70 73 22 2c 0a 20 20 20 20 22 78 2d 66 6f 72 77 61 72 64 65
539539
#> [81] 64 2d 70 6f 72 74 22 3a 20 22 34 34 33 22 2c 0a 20 20 20 20 22 68 6f 73 74 22 3a 20 22 70 6f 73 74 6d 61 6e 2d 65 63 68
540540
#> [121] 6f 2e 63 6f 6d 22 2c 0a 20 20 20 20 22 78 2d 61 6d 7a 6e 2d 74 72 61 63 65 2d 69 64 22 3a 20 22 52 6f 6f 74 3d 31 2d 36
541-
#> [161] 35 39 64 63 32 36 36 2d 33 63 35 39 33 63 64 37 32 32 61 61 38 65 65 30 32 63 33 37 35 37 36 38 22 2c 0a 20 20 20 20 22
541+
#> [161] 35 39 65 36 32 38 62 2d 37 66 38 32 36 39 63 33 30 64 63 35 36 34 37 61 34 61 31 39 35 36 32 65 22 2c 0a 20 20 20 20 22
542542
#> [201] 63 6f 6e 74 65 6e 74 2d 74 79 70 65 22 3a 20 22 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 22 2c 0a 20 20 20 20 22
543543
#> [241] 61 75 74 68 6f 72 69 7a 61 74 69 6f 6e 22 3a 20 22 42 65 61 72 65 72 20 41 50 49 4b 45 59 22 0a 20 20 7d 2c 0a 20 20 22
544544
#> [281] 75 72 6c 22 3a 20 22 68 74 74 70 73 3a 2f 2f 70 6f 73 74 6d 61 6e 2d 65 63 68 6f 2e 63 6f 6d 2f 67 65 74 22 0a 7d

0 commit comments

Comments
 (0)