Skip to content

Commit f84aa32

Browse files
committed
cleaner aios, save on frees
1 parent 1a0da5d commit f84aa32

File tree

8 files changed

+158
-74
lines changed

8 files changed

+158
-74
lines changed

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export(.mirai_scm)
2929
export(call_aio)
3030
export(context)
3131
export(dial)
32+
export(is_error_value)
3233
export(is_nul_byte)
3334
export(listen)
3435
export(logging)

NEWS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
#### New Features
44

5-
* Aio values `$result`, `$raw` or `$data` now resolve without requiring `call_aio()`. Access the values directly and an NA 'unresolved value' will be returned if the Aio operation is yet to complete.
6-
* Integer error values generated by all receive functions are now classed 'errorValue' to be immediately distinguishable from possible message values.
5+
* Aio values `$result`, `$raw` and `$data` now resolve without requiring `call_aio()`. Access the values directly and an NA 'unresolved value' will be returned if the Aio operation is yet to complete.
76
* `unresolved()` added as an auxiliary function to query whether an Aio is unresolved, for use in control flow statements.
7+
* Integer error values generated by receive functions are now classed 'errorValue'. `is_error_value()` helper function included.
88
* `is_nul_byte()` added as a helper function for request/reply setups.
99
* `survey_time()` added as a convenience function for surveyor/respondent patterns.
1010
* `logging()` function to specify a global package logging level - 'error' or 'info'. Automatically polls the environment variable 'NANONEXT_LOG' on package load and then each time `logging(level = "check")` is called, allowing this to be set externally.

R/utils.R

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,22 @@ nng_error <- function(error) {
6464
#'
6565
#' @export
6666
#'
67-
is_nul_byte <- function(x) {
67+
is_nul_byte <- function(x) identical(x, as.raw(0L))
6868

69-
identical(x, as.raw(0L))
70-
71-
}
69+
#' Is Error Value
70+
#'
71+
#' Is the object an error value generated by NNG.
72+
#'
73+
#' @param x an object.
74+
#'
75+
#' @return Logical value TRUE if 'x' is of class 'errorValue', FALSE otherwise.
76+
#'
77+
#' @examples
78+
#' is_error_value(1L)
79+
#'
80+
#' @export
81+
#'
82+
is_error_value <- function(x) inherits(x, "errorValue")
7283

7384
#' @export
7485
#'

README.Rmd

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ n$recv(mode = "double")
176176

177177
{nanonext} implements true async send and receive, leveraging NNG as a massively-scalable concurrency framework.
178178

179-
`send_aio()` and `recv_aio()` functions return immediately but perform their operations async. Their results can be called using `call_aio()` when required.
179+
`send_aio()` and `recv_aio()` functions return immediately but perform their operations async.
180180

181181
```{r async}
182182
library(nanonext)
@@ -185,41 +185,67 @@ s2 <- socket("pair", dial = "inproc://nano")
185185
186186
```
187187

188-
For a 'sendAio' object, calling the result causes it to be stored in the AIO as `$result`. An exit code of 0 denotes a successful send.
188+
For a 'sendAio' object, the result is stored at `$result`. An exit code of 0 denotes a successful send.
189+
190+
- send is successful as long as the message has been accepted by the socket for sending
191+
- the message may be buffered within the system
192+
- for acknowledgement of receipt, an RPC setup is required (see next section)
189193

190194
```{r async2}
191195
192196
res <- send_aio(s1, data.frame(a = 1, b = 2))
193-
call_aio(res)
194-
res
195197
res$result
196198
197199
```
198200

199-
For a 'recvAio' object, calling the message causes it to be stored in the AIO as `$raw` (if kept) and `$data`.
201+
For a 'recvAio' object, the message is stored at `$data`, and the raw message at `$raw` (if kept).
200202

201203
```{r async3}
202204
203205
msg <- recv_aio(s2)
204-
call_aio(msg)
205-
msg
206206
msg$data
207+
msg$raw
207208
208209
```
209210

210-
The values can also be accessed directly from the call as per the example below:
211+
If the async operation is yet to complete, a logical NA 'unresolved value' will be returned. In the below example an async receive is requested, but no mesages are waiting (yet to be sent).
211212

212213
```{r async4}
213214
215+
msg <- recv_aio(s2)
216+
msg$data
217+
218+
```
219+
220+
For use in control flow statements, `unresolved` can be used. Note that calling this function queries for resolution itself and may cause a previously unresolved Aio to resolve.
221+
222+
```{r async5}
223+
224+
# unresolved() already queries for resolution so no need for it again within the while clause
225+
226+
while (unresolved(msg)) {
227+
# do stuff here before checking resolution again
228+
send_aio(s1, "resolved")
229+
}
230+
231+
msg$data
232+
```
233+
234+
The values may also be called explicitly using `call_aio()`. This will wait for completion of the Aio (blocking).
235+
236+
```{r async6}
237+
238+
# will wait for completion then return the resolved Aio
239+
call_aio(msg)
240+
241+
# to access the resolved value directly (waiting if required)
214242
call_aio(msg)$data
215243
216244
close(s1)
217245
close(s2)
218246
219247
```
220248

221-
As an example of possible applications, the {mirai} package <https://shikokuchuo.net/mirai/> (available on CRAN) uses {nanonext} as the back-end to provide asynchronous execution of arbitrary R code.
222-
223249
[&laquo; Back to ToC](#table-of-contents)
224250

225251
### RPC and Distributed Computing
@@ -272,6 +298,8 @@ In this example the calculation is returned, but other operations may reside ent
272298

273299
In such a case, using `call_aio()` confirms that the operation has completed (or it will wait for completion) and calls the return value of the function, which may typically be NULL or an exit code.
274300

301+
The {mirai} package <https://shikokuchuo.net/mirai/> (available on CRAN) uses {nanonext} as the back-end to provide asynchronous execution of arbitrary R code using the RPC model.
302+
275303
[&laquo; Back to ToC](#table-of-contents)
276304

277305
### Publisher Subscriber Model

README.md

Lines changed: 74 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -224,59 +224,90 @@ n$recv(mode = "double")
224224
massively-scalable concurrency framework.
225225

226226
`send_aio()` and `recv_aio()` functions return immediately but perform
227-
their operations async. Their results can be called using `call_aio()`
228-
when required.
227+
their operations async.
229228

230229
``` r
231230
library(nanonext)
232231
s1 <- socket("pair", listen = "inproc://nano")
233232
s2 <- socket("pair", dial = "inproc://nano")
234233
```
235234

236-
For a ‘sendAio’ object, calling the result causes it to be stored in the
237-
AIO as `$result`. An exit code of 0 denotes a successful send.
235+
For a ‘sendAio’ object, the result is stored at `$result`. An exit code
236+
of 0 denotes a successful send.
237+
238+
- send is successful as long as the message has been accepted by the
239+
socket for sending
240+
- the message may be buffered within the system
241+
- for acknowledgement of receipt, an RPC setup is required (see next
242+
section)
238243

239244
``` r
240245
res <- send_aio(s1, data.frame(a = 1, b = 2))
241-
call_aio(res)
242-
res
243-
#> < sendAio >
244-
#> - $result for send result
245246
res$result
246247
#> [1] 0
247248
```
248249

249-
For a ‘recvAio’ object, calling the message causes it to be stored in
250-
the AIO as `$raw` (if kept) and `$data`.
250+
For a ‘recvAio’ object, the message is stored at `$data`, and the raw
251+
message at `$raw` (if kept).
251252

252253
``` r
253254
msg <- recv_aio(s2)
254-
call_aio(msg)
255-
msg
256-
#> < recvAio >
257-
#> - $data for message data
258-
#> - $raw for raw message
259255
msg$data
260256
#> a b
261257
#> 1 1 2
258+
msg$raw
259+
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
260+
#> [26] 03 13 00 00 00 02 00 00 00 0e 00 00 00 01 3f f0 00 00 00 00 00 00 00 00 00
261+
#> [51] 0e 00 00 00 01 40 00 00 00 00 00 00 00 00 00 04 02 00 00 00 01 00 04 00 09
262+
#> [76] 00 00 00 05 6e 61 6d 65 73 00 00 00 10 00 00 00 02 00 04 00 09 00 00 00 01
263+
#> [101] 61 00 04 00 09 00 00 00 01 62 00 00 04 02 00 00 00 01 00 04 00 09 00 00 00
264+
#> [126] 05 63 6c 61 73 73 00 00 00 10 00 00 00 01 00 04 00 09 00 00 00 0a 64 61 74
265+
#> [151] 61 2e 66 72 61 6d 65 00 00 04 02 00 00 00 01 00 04 00 09 00 00 00 09 72 6f
266+
#> [176] 77 2e 6e 61 6d 65 73 00 00 00 0d 00 00 00 02 80 00 00 00 ff ff ff ff 00 00
267+
#> [201] 00 fe
268+
```
269+
270+
If the async operation is yet to complete, a logical NA ‘unresolved
271+
value’ will be returned. In the below example an async receive is
272+
requested, but no mesages are waiting (yet to be sent).
273+
274+
``` r
275+
msg <- recv_aio(s2)
276+
msg$data
277+
#> < unresolved value >
262278
```
263279

264-
The values can also be accessed directly from the call as per the
265-
example below:
280+
For use in control flow statements, `unresolved` can be used. Note that
281+
calling this function queries for resolution itself and may cause a
282+
previously unresolved Aio to resolve.
266283

267284
``` r
285+
# unresolved() already queries for resolution so no need for it again within the while clause
286+
287+
while (unresolved(msg)) {
288+
# do stuff here before checking resolution again
289+
send_aio(s1, "resolved")
290+
}
291+
292+
msg$data
293+
#> [1] "resolved"
294+
```
295+
296+
The values may also be called explicitly using `call_aio()`. This will
297+
wait for completion of the Aio (blocking).
298+
299+
``` r
300+
# will wait for completion then return the resolved Aio
301+
call_aio(msg)
302+
303+
# to access the resolved value directly (waiting if required)
268304
call_aio(msg)$data
269-
#> a b
270-
#> 1 1 2
305+
#> [1] "resolved"
271306

272307
close(s1)
273308
close(s2)
274309
```
275310

276-
As an example of possible applications, the {mirai} package
277-
<https://shikokuchuo.net/mirai/> (available on CRAN) uses {nanonext} as
278-
the back-end to provide asynchronous execution of arbitrary R code.
279-
280311
[« Back to ToC](#table-of-contents)
281312

282313
### RPC and Distributed Computing
@@ -328,7 +359,7 @@ aio
328359
#> < recvAio >
329360
#> - $data for message data
330361
str(aio$data)
331-
#> num [1:100000000] -1.211 -0.218 0.105 -0.608 0.256 ...
362+
#> num [1:100000000] -0.5969 0.269 0.0237 -1.1824 1.806 ...
332363
```
333364

334365
In this example the calculation is returned, but other operations may
@@ -338,6 +369,10 @@ In such a case, using `call_aio()` confirms that the operation has
338369
completed (or it will wait for completion) and calls the return value of
339370
the function, which may typically be NULL or an exit code.
340371

372+
The {mirai} package <https://shikokuchuo.net/mirai/> (available on CRAN)
373+
uses {nanonext} as the back-end to provide asynchronous execution of
374+
arbitrary R code using the RPC model.
375+
341376
[« Back to ToC](#table-of-contents)
342377

343378
### Publisher Subscriber Model
@@ -356,38 +391,38 @@ an environment variable `NANONEXT_LOG`.
356391
``` r
357392
# set logging level to include information events ------------------------------
358393
logging(level = "info")
359-
#> 2022-03-03 23:42:57 [ log level ] set to: info
394+
#> 2022-03-04 09:57:23 [ log level ] set to: info
360395

361396
pub <- socket("pub", listen = "inproc://nanobroadcast")
362-
#> 2022-03-03 23:42:57 [ sock open ] id: 9 | protocol: pub
363-
#> 2022-03-03 23:42:57 [ list start ] sock: 9 | url: inproc://nanobroadcast
397+
#> 2022-03-04 09:57:23 [ sock open ] id: 9 | protocol: pub
398+
#> 2022-03-04 09:57:23 [ list start ] sock: 9 | url: inproc://nanobroadcast
364399
sub <- socket("sub", dial = "inproc://nanobroadcast")
365-
#> 2022-03-03 23:42:57 [ sock open ] id: 10 | protocol: sub
366-
#> 2022-03-03 23:42:57 [ dial start ] sock: 10 | url: inproc://nanobroadcast
400+
#> 2022-03-04 09:57:23 [ sock open ] id: 10 | protocol: sub
401+
#> 2022-03-04 09:57:23 [ dial start ] sock: 10 | url: inproc://nanobroadcast
367402

368403
# subscribing to a specific topic 'examples' -----------------------------------
369404
sub |> subscribe(topic = "examples")
370-
#> 2022-03-03 23:42:57 [ subscribe ] sock: 10 | topic: examples
405+
#> 2022-03-04 09:57:23 [ subscribe ] sock: 10 | topic: examples
371406
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
372407
sub |> recv(mode = "character", keep.raw = FALSE)
373408
#> [1] "examples" "this is an example"
374409

375410
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
376411
sub |> recv(mode = "character", keep.raw = FALSE)
377-
#> 2022-03-03 23:42:57 [ 8 ] Try again
412+
#> 2022-03-04 09:57:23 [ 8 ] Try again
378413

379414
# specify NULL to subscribe to ALL topics --------------------------------------
380415
sub |> subscribe(topic = NULL)
381-
#> 2022-03-03 23:42:57 [ subscribe ] sock: 10 | topic: ALL
416+
#> 2022-03-04 09:57:23 [ subscribe ] sock: 10 | topic: ALL
382417
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
383418
sub |> recv("character", keep.raw = FALSE)
384419
#> [1] "newTopic" "this is a new topic"
385420

386421
sub |> unsubscribe(topic = NULL)
387-
#> 2022-03-03 23:42:57 [ unsubscribe ] sock: 10 | topic: ALL
422+
#> 2022-03-04 09:57:23 [ unsubscribe ] sock: 10 | topic: ALL
388423
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
389424
sub |> recv("character", keep.raw = FALSE)
390-
#> 2022-03-03 23:42:57 [ 8 ] Try again
425+
#> 2022-03-04 09:57:23 [ 8 ] Try again
391426

392427
# however the topics explicitly subscribed to are still received ---------------
393428
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
@@ -396,7 +431,7 @@ sub |> recv(mode = "character", keep.raw = FALSE)
396431

397432
# set logging level back to the default of errors only -------------------------
398433
logging(level = "error")
399-
#> 2022-03-03 23:42:57 [ log level ] set to: error
434+
#> 2022-03-04 09:57:23 [ log level ] set to: error
400435

401436
close(pub)
402437
close(sub)
@@ -447,7 +482,7 @@ aio2$data
447482
# after the survey expires, the second resolves into a timeout error -----------
448483
Sys.sleep(0.5)
449484
aio2$data
450-
#> 2022-03-03 23:42:57 [ 5 ] Timed out
485+
#> 2022-03-04 09:57:24 [ 5 ] Timed out
451486
#> 'errorValue' int 5
452487

453488
close(sur)
@@ -468,11 +503,11 @@ ncurl("http://httpbin.org/headers")
468503
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
469504
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
470505
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
471-
#> [76] 2d 36 32 32 31 35 32 38 31 2d 37 65 32 32 39 65 31 62 36 65 30 31 62 63 38
472-
#> [101] 33 36 31 37 61 61 64 31 64 22 0a 20 20 7d 0a 7d 0a
506+
#> [76] 2d 36 32 32 31 65 32 38 34 2d 32 30 63 63 63 36 35 30 35 31 37 35 36 37 37
507+
#> [101] 61 30 37 30 31 63 64 36 30 22 0a 20 20 7d 0a 7d 0a
473508
#>
474509
#> $data
475-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62215281-7e229e1b6e01bc83617aad1d\"\n }\n}\n"
510+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6221e284-20ccc6505175677a0701cd60\"\n }\n}\n"
476511
```
477512

478513
For advanced use, supports additional HTTP methods such as POST or PUT.

man/is_error_value.Rd

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)