Skip to content

Commit 063a8e6

Browse files
committed
documentation updates + safer close method for streams
1 parent f7038aa commit 063a8e6

File tree

6 files changed

+72
-54
lines changed

6 files changed

+72
-54
lines changed

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#### New Features
44

55
* `$context()` method added for creating new contexts from nano Objects using supported protocols (i.e. req, rep, sub, surveyor, respondent) - this replaces the `context()` function for nano Objects.
6+
* `subscribe()` and `unsubscribe()` now accept a topic of any atomic type (not just character), allowing pub/sub to be used with integer, double, logical, complex, or raw vectors.
67
* Added convenience auxiliary functions `is_nano()` and `is_aio()`.
7-
* `subscribe()` / `unsubscribe()` now accept a topic of any atomic type (not just character), allowing pub/sub to be used when sending integer, double, logical, complex, or raw vectors.
88

99
#### Updates
1010

1111
* Protocol-specific helpers `subscribe()`, `unsubscribe()`, and `survey_time()` gain nanoContext methods.
12+
* Closing a stream now strips all attributes on the object rendering it a nil external pointer - this is for safety, eliminating a potential crash if attempting to re-use a closed stream.
1213
* For receives, if an error occurs in unserialisation or data conversion (e.g. mode was incorrectly specified), the received raw vector is now available at both `$raw` and `$data` if `keep.raw = TRUE`.
1314
* Setting 'NANONEXT_TLS=1' now allows the downloaded NNG library to be built against a system mbedtls installation.
1415
* Setting 'NANONEXT_ARM' is no longer required on platforms such as Raspberry Pi - the package configure script should now detect platforms that require the libatomic linker flag to be set automatically.

R/messenger.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
#' Messenger
44
#'
5-
#' Multi-threaded console-based 2-way messaging system based on NNG scalability
6-
#' protocols.
5+
#' Multi-threaded console-based 2-way instant messaging system based on NNG
6+
#' scalability protocols.
77
#'
88
#' @param url a URL to connect to, specifying the transport and address as
99
#' a character string e.g. 'tcp://127.0.0.1:5555' (see \link{transports}).

README.Rmd

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ knitr::opts_chunk$set(
2424

2525
R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library providing high-performance scalability protocols, implementing a cross-platform standard for messaging and communications. Serves as a concurrency framework for building distributed applications, utilising 'Aio' objects which automatically resolve upon completion of asynchronous operations.
2626

27-
Designed for performance and reliability, the NNG library is written in C and {nanonext} is a lightweight wrapper depending on no other packages. Provides the interface for code and processes to communicate with each other - receive data generated in Python, perform analysis in R, and send results to a C++ program – all on the same computer or on networks spanning the globe.
27+
Designed for performance and reliability, the NNG library is written in C and {nanonext} is a lightweight zero-dependency wrapper. Provides the interface for code and processes to communicate with each other - receive data generated in Python, perform analysis in R, and send results to a C++ program – all on the same computer or on networks spanning the globe.
2828

2929
Implemented scalability protocols:
3030

@@ -35,13 +35,19 @@ Implemented scalability protocols:
3535
- Request/Reply (I ask, you answer)
3636
- Survey (everyone votes)
3737

38-
Implemented transports:
38+
Supported transports:
3939

4040
- inproc (intra-process)
4141
- IPC (inter-process)
42-
- TCP/IP (IPv4 or IPv6)
42+
- TCP (IPv4 or IPv6)
4343
- WebSocket
4444

45+
Provided web tools:
46+
47+
- ncurl - (async) http(s) client
48+
- stream - secure websockets client (and generic low-level socket interface)
49+
- messenger - console-based instant messaging
50+
4551
### Table of Contents
4652

4753
1. [Installation](#installation)
@@ -350,7 +356,7 @@ sub |> recv(mode = "character", keep.raw = FALSE)
350356
351357
```
352358

353-
The subscribed topic can be any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent as well.
359+
The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received.
354360

355361
```{r pub2}
356362
@@ -445,17 +451,22 @@ In this respect, it may be used as a performant and lightweight method for makin
445451

446452
`stream()` exposes NNG's low-level byte stream interface for communicating with raw sockets. This may be used for connecting to arbitrary non-NNG endpoints.
447453

454+
The stream interface can be used to communicate with websocket servers. Where TLS is enabled in the NNG library, connecting to secure websockets is configured automatically. The argument `textframes = TRUE` can be specified where the websocket server uses text rather than binary frames.
455+
448456
```{r stream}
449457
450-
s <- stream(dial = "wss://demo.piesocket.com/v3/channel_1", textframes = TRUE)
458+
s <- stream(dial = "wss://stream.binance.com:9443/ws/btcusdt@kline_1m", textframes = TRUE)
451459
s
452-
s |> recv()
453460
454-
```
461+
s |> recv(keep.raw = FALSE)
455462
456-
The stream interface can be used to communicate with websocket servers. Where TLS is enabled in the NNG library, connecting to secure websockets is configured automatically. Here, the argument `textframes = TRUE` can be specified where the websocket server uses text rather than binary frames.
463+
s |> recv(keep.raw = FALSE)
464+
465+
close(s)
466+
467+
```
457468

458-
The same API for Sockets can equally be used on Streams: `send()` and `recv()`, as well as their asynchronous counterparts `send_aio()` and `recv_aio()`. This affords a great deal of flexibility in ingesting and processing streaming data.
469+
The same API for Sockets is available for use on Streams: `send()` and `recv()`, as well as their asynchronous counterparts `send_aio()` and `recv_aio()`. This affords a great deal of flexibility in ingesting and processing streaming data.
459470

460471
[&laquo; Back to ToC](#table-of-contents)
461472

@@ -485,8 +496,8 @@ If system installations of 'libnng' and 'libmbedtls' development headers are det
485496

486497
Otherwise, the environment variable `Sys.setenv(NANONEXT_TLS=1)` may be set prior to installation if:
487498

488-
- your system installations of 'libnng' (built with TLS support) and 'libmbedtls' are in different locations; or
489-
- you have a system installation of 'libmbedtls' but not 'libnng' and want nanonext to download and build a more recent version of 'libnng' than available in system repositories against this.
499+
- system installations of 'libnng' (built with TLS support) and 'libmbedtls' are in different locations; or
500+
- there is a system installation of 'libmbedtls' but not 'libnng' - in which case nanonext will download and build the latest release of 'libnng' against this.
490501

491502
### Links
492503

README.md

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ utilising ‘Aio’ objects which automatically resolve upon completion of
2222
asynchronous operations.
2323

2424
Designed for performance and reliability, the NNG library is written in
25-
C and {nanonext} is a lightweight wrapper depending on no other
26-
packages. Provides the interface for code and processes to communicate
27-
with each other - receive data generated in Python, perform analysis in
28-
R, and send results to a C++ program – all on the same computer or on
29-
networks spanning the globe.
25+
C and {nanonext} is a lightweight zero-dependency wrapper. Provides the
26+
interface for code and processes to communicate with each other -
27+
receive data generated in Python, perform analysis in R, and send
28+
results to a C++ program – all on the same computer or on networks
29+
spanning the globe.
3030

3131
Implemented scalability protocols:
3232

@@ -37,13 +37,20 @@ Implemented scalability protocols:
3737
- Request/Reply (I ask, you answer)
3838
- Survey (everyone votes)
3939

40-
Implemented transports:
40+
Supported transports:
4141

4242
- inproc (intra-process)
4343
- IPC (inter-process)
44-
- TCP/IP (IPv4 or IPv6)
44+
- TCP (IPv4 or IPv6)
4545
- WebSocket
4646

47+
Provided web tools:
48+
49+
- ncurl - (async) http(s) client
50+
- stream - secure websockets client (and generic low-level socket
51+
interface)
52+
- messenger - console-based instant messaging
53+
4754
### Table of Contents
4855

4956
1. [Installation](#installation)
@@ -388,7 +395,7 @@ aio
388395
#> < recvAio >
389396
#> - $data for message data
390397
aio$data |> str()
391-
#> num [1:100000000] 0.173 -0.103 -0.522 -0.812 0.532 ...
398+
#> num [1:100000000] -1.074 0.245 0.744 -0.502 -1.666 ...
392399
```
393400

394401
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -448,9 +455,9 @@ sub |> recv(mode = "character", keep.raw = FALSE)
448455
#> [1] "examples" "this example will still be received"
449456
```
450457

451-
The subscribed topic can be any atomic type (not just character),
452-
allowing integer, double, logical, complex and raw vectors to be sent as
453-
well.
458+
The subscribed topic can be of any atomic type (not just character),
459+
allowing integer, double, logical, complex and raw vectors to be sent
460+
and received.
454461

455462
``` r
456463

@@ -540,11 +547,11 @@ ncurl("http://httpbin.org/headers")
540547
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
541548
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
542549
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
543-
#> [76] 2d 36 32 37 34 35 31 35 30 2d 32 61 62 61 34 61 31 61 36 39 63 37 36 61 66
544-
#> [101] 62 32 38 30 61 30 65 66 62 22 0a 20 20 7d 0a 7d 0a
550+
#> [76] 2d 36 32 37 37 61 32 61 32 2d 34 39 36 62 38 32 38 61 34 38 63 35 65 61 66
551+
#> [101] 33 35 38 66 33 37 65 31 34 22 0a 20 20 7d 0a 7d 0a
545552
#>
546553
#> $data
547-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62745150-2aba4a1a69c76afb280a0efb\"\n }\n}\n"
554+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6277a2a2-496b828a48c5eaf358f37e14\"\n }\n}\n"
548555
```
549556

550557
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -560,7 +567,7 @@ res
560567
#> - $raw for raw message
561568

562569
call_aio(res)$data
563-
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62745150-4a03a30d6fcd147d71c51fdf\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
570+
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6277a2a2-6ae6efe31d378db911a099f5\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
564571
```
565572

566573
In this respect, it may be used as a performant and lightweight method
@@ -574,30 +581,31 @@ for making REST API requests.
574581
communicating with raw sockets. This may be used for connecting to
575582
arbitrary non-NNG endpoints.
576583

584+
The stream interface can be used to communicate with websocket servers.
585+
Where TLS is enabled in the NNG library, connecting to secure websockets
586+
is configured automatically. The argument `textframes = TRUE` can be
587+
specified where the websocket server uses text rather than binary
588+
frames.
589+
577590
``` r
578591

579-
s <- stream(dial = "wss://demo.piesocket.com/v3/channel_1", textframes = TRUE)
592+
s <- stream(dial = "wss://stream.binance.com:9443/ws/btcusdt@kline_1m", textframes = TRUE)
580593
s
581594
#> < nanoStream >
582595
#> - type: dialer
583-
#> - url: wss://demo.piesocket.com/v3/channel_1
596+
#> - url: wss://stream.binance.com:9443/ws/btcusdt@kline_1m
584597
#> - textframes: TRUE
585-
s |> recv()
586-
#> $raw
587-
#> [1] 7b 22 65 72 72 6f 72 22 3a 22 4d 69 73 73 69 6e 67 20 61 70 69 4b 65 79 22
588-
#> [26] 7d
589-
#>
590-
#> $data
591-
#> [1] "{\"error\":\"Missing apiKey\"}"
592-
```
593598

594-
The stream interface can be used to communicate with websocket servers.
595-
Where TLS is enabled in the NNG library, connecting to secure websockets
596-
is configured automatically. Here, the argument `textframes = TRUE` can
597-
be specified where the websocket server uses text rather than binary
598-
frames.
599+
s |> recv(keep.raw = FALSE)
600+
#> [1] "{\"e\":\"kline\",\"E\":1652007590579,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652007540000,\"T\":1652007599999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1351258350,\"L\":1351258570,\"o\":\"34718.38000000\",\"c\":\"34726.74000000\",\"h\":\"34726.74000000\",\"l\":\"34717.06000000\",\"v\":\"3.78973000\",\"n\":221,\"x\":false,\"q\":\"131577.38153510\",\"V\":\"2.16208000\",\"Q\":\"75065.38564240\",\"B\":\"0\"}}"
601+
602+
s |> recv(keep.raw = FALSE)
603+
#> [1] "{\"e\":\"kline\",\"E\":1652007592588,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652007540000,\"T\":1652007599999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1351258350,\"L\":1351258593,\"o\":\"34718.38000000\",\"c\":\"34729.58000000\",\"h\":\"34729.58000000\",\"l\":\"34717.06000000\",\"v\":\"4.01301000\",\"n\":244,\"x\":false,\"q\":\"139331.29327650\",\"V\":\"2.27211000\",\"Q\":\"78886.44058120\",\"B\":\"0\"}}"
604+
605+
close(s)
606+
```
599607

600-
The same API for Sockets can equally be used on Streams: `send()` and
608+
The same API for Sockets is available for use on Streams: `send()` and
601609
`recv()`, as well as their asynchronous counterparts `send_aio()` and
602610
`recv_aio()`. This affords a great deal of flexibility in ingesting and
603611
processing streaming data.
@@ -639,11 +647,11 @@ appropriately.
639647
Otherwise, the environment variable `Sys.setenv(NANONEXT_TLS=1)` may be
640648
set prior to installation if:
641649

642-
- your system installations of ‘libnng’ (built with TLS support) and
650+
- system installations of ‘libnng’ (built with TLS support) and
643651
‘libmbedtls’ are in different locations; or
644-
- you have a system installation of ‘libmbedtls’ but not ‘libnng’ and
645-
want nanonext to download and build a more recent version of
646-
‘libnng’ than available in system repositories against this.
652+
- there is a system installation of ‘libmbedtls’ but not ‘libnng’ - in
653+
which case nanonext will download and build the latest release of
654+
‘libnng’ against this.
647655

648656
### Links
649657

man/messenger.Rd

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/utils.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,9 +451,7 @@ SEXP rnng_stream_close(SEXP stream) {
451451
nng_stream *sp = (nng_stream *) R_ExternalPtrAddr(stream);
452452
nng_stream_free(sp);
453453
R_ClearExternalPtr(stream);
454-
Rf_setAttrib(stream, nano_DialerSymbol, R_NilValue);
455-
Rf_setAttrib(stream, nano_ListenerSymbol, R_NilValue);
456-
Rf_setAttrib(stream, nano_UrlSymbol, R_NilValue);
454+
SET_ATTRIB(stream, R_NilValue);
457455

458456
return Rf_ScalarInteger(0);
459457

0 commit comments

Comments
 (0)