Skip to content

Commit f62c061

Browse files
committed
deferred evaluation pipe handles symbols
1 parent c4a8e9b commit f62c061

File tree

4 files changed

+143
-36
lines changed

4 files changed

+143
-36
lines changed

R/utils.R

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ ncurl <- function(http, ...) {
219219

220220
#' Deferred Evaluation Pipe
221221
#'
222-
#' Pipe a possibly unresolved value forward into a function. [experimental]
222+
#' Pipe a possibly unresolved value forward into a function.
223223
#'
224224
#' @param x a value that is possibly an 'unresolvedValue'.
225225
#' @param f a function that accepts 'x' as its first argument.
@@ -235,6 +235,14 @@ ncurl <- function(http, ...) {
235235
#' under development. Please note that the final implementation is likely to
236236
#' differ from the current version.
237237
#'
238+
#' @section Usage:
239+
#'
240+
#' \code{x \%>>\% f} is equivalent to \code{f(x)}
241+
#'
242+
#' \code{x \%>>\% f()} is equivalent to \code{f(x)}
243+
#'
244+
#' \code{x \%>>\% f(y)} is equivalent to \code{f(x, y)}
245+
#'
238246
#' @examples
239247
#' if (interactive()) {
240248
#' # Only run examples in interactive R sessions
@@ -267,9 +275,13 @@ ncurl <- function(http, ...) {
267275
} else {
268276
x <- substitute(x)
269277
y <- substitute(f)
270-
f <- y[[1L]]
271-
y[[1L]] <- NULL
272-
eval(as.call(c(f, x, y)), envir = parent.frame(2L), enclos = baseenv())
278+
if (is.symbol(y)) {
279+
eval(as.call(c(y, x)), envir = parent.frame(2L), enclos = baseenv())
280+
} else {
281+
f <- y[[1L]]
282+
y[[1L]] <- NULL
283+
eval(as.call(c(f, x, y)), envir = parent.frame(2L), enclos = baseenv())
284+
}
273285
}
274286
}
275287

README.Rmd

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ Implemented transports:
4747
2. [Interfaces](#interfaces)
4848
3. [Cross-language Exchange](#cross-language-exchange)
4949
4. [Async and Concurrency](#async-and-concurrency)
50-
5. [RPC and Distributed Computing](#rpc-and-distributed-computing)
51-
6. [Publisher / Subscriber Model](#publisher-subscriber-model)
52-
7. [Surveyor / Repondent Model](#surveyor-respondent-model)
53-
8. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
54-
9. [Building from source](#building-from-source)
55-
10. [Links](#links)
50+
5. [Deferred Evaluation Pipe](#deferred-evaluation-pipe)
51+
6. [RPC and Distributed Computing](#rpc-and-distributed-computing)
52+
7. [Publisher / Subscriber Model](#publisher-subscriber-model)
53+
8. [Surveyor / Repondent Model](#surveyor-respondent-model)
54+
9. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
55+
10. [Building from source](#building-from-source)
56+
11. [Links](#links)
5657

5758
### Installation
5859

@@ -177,7 +178,7 @@ n$recv(mode = "double")
177178
{nanonext} implements true async send and receive, leveraging NNG as a massively-scaleable concurrency framework.
178179

179180
```{r async}
180-
library(nanonext)
181+
181182
s1 <- socket("pair", listen = "inproc://nano")
182183
s2 <- socket("pair", dial = "inproc://nano")
183184
@@ -239,7 +240,7 @@ msg$data
239240

240241
The values may also be called explicitly using `call_aio()`. This will wait for completion of the Aio (blocking).
241242

242-
```{r async6}
243+
```{r async7}
243244
244245
# will wait for completion then return the resolved Aio
245246
call_aio(msg)
@@ -254,6 +255,44 @@ close(s2)
254255

255256
[&laquo; Back to ToC](#table-of-contents)
256257

258+
### Deferred Evaluation Pipe
259+
260+
{nanonext} implements a deferred evaluation pipe `%>>%` for working with potentially unresolved values.
261+
262+
Simply pipe the value forward into a function or series of functions and it either evaluates or returns an 'unresolvedExpr', where the result may be accessed at `$data`. This will also return an 'unresolvedExpr' recursively by design whilst unresolved. However `$data` resolves to the evaluated expression when the original value does.
263+
264+
It is possible to use `unresolved()` around the `$data` field to test for resolution, as in the example below.
265+
266+
The pipe operator semantics are similar to R's base pipe `|>`:
267+
268+
`x %>>% f` is equivalent to `f(x)` <br />
269+
`x %>>% f()` is equivalent to `f(x)` <br />
270+
`x %>>% f(y)` is equivalent to `f(x, y)`
271+
272+
```{r async6}
273+
274+
s1 <- socket("pair", listen = "inproc://cecicestunepipe")
275+
s2 <- socket("pair", dial = "inproc://cecicestunepipe")
276+
277+
# request an aysnc receive with no messages waiting
278+
msg <- recv_aio(s2)
279+
280+
res <- msg$data %>>% c(2, 3) %>>% as.character()
281+
res
282+
unresolved(res$data)
283+
284+
# sending a message causes both 'msg' and 'res' to resolve
285+
s <- send_aio(s1, 1)
286+
unresolved(res$data)
287+
res$data
288+
289+
close(s1)
290+
close(s2)
291+
292+
```
293+
294+
[&laquo; Back to ToC](#table-of-contents)
295+
257296
### RPC and Distributed Computing
258297

259298
{nanonext} implements remote procedure calls (RPC) using NNG's req/rep protocol to provide a basis for distributed computing.

README.md

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ Implemented transports:
4848
2. [Interfaces](#interfaces)
4949
3. [Cross-language Exchange](#cross-language-exchange)
5050
4. [Async and Concurrency](#async-and-concurrency)
51-
5. [RPC and Distributed Computing](#rpc-and-distributed-computing)
52-
6. [Publisher / Subscriber Model](#publisher-subscriber-model)
53-
7. [Surveyor / Repondent Model](#surveyor-respondent-model)
54-
8. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
55-
9. [Building from source](#building-from-source)
56-
10. [Links](#links)
51+
5. [Deferred Evaluation Pipe](#deferred-evaluation-pipe)
52+
6. [RPC and Distributed Computing](#rpc-and-distributed-computing)
53+
7. [Publisher / Subscriber Model](#publisher-subscriber-model)
54+
8. [Surveyor / Repondent Model](#surveyor-respondent-model)
55+
9. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
56+
10. [Building from source](#building-from-source)
57+
11. [Links](#links)
5758

5859
### Installation
5960

@@ -225,7 +226,6 @@ n$recv(mode = "double")
225226
massively-scaleable concurrency framework.
226227

227228
``` r
228-
library(nanonext)
229229
s1 <- socket("pair", listen = "inproc://nano")
230230
s2 <- socket("pair", dial = "inproc://nano")
231231
```
@@ -321,6 +321,52 @@ close(s2)
321321

322322
[« Back to ToC](#table-of-contents)
323323

324+
### Deferred Evaluation Pipe
325+
326+
{nanonext} implements a deferred evaluation pipe `%>>%` for working with
327+
potentially unresolved values.
328+
329+
Simply pipe the value forward into a function or series of functions and
330+
it either evaluates or returns an ‘unresolvedExpr’, where the result may
331+
be accessed at `$data`. This will also return an ‘unresolvedExpr’
332+
recursively by design whilst unresolved. However `$data` resolves to the
333+
evaluated expression when the original value does.
334+
335+
It is possible to use `unresolved()` around the `$data` field to test
336+
for resolution, as in the example below.
337+
338+
The pipe operator semantics are similar to R’s base pipe `|>`:
339+
340+
`x %>>% f` is equivalent to `f(x)` <br /> `x %>>% f()` is equivalent to
341+
`f(x)` <br /> `x %>>% f(y)` is equivalent to `f(x, y)`
342+
343+
``` r
344+
s1 <- socket("pair", listen = "inproc://cecicestunepipe")
345+
s2 <- socket("pair", dial = "inproc://cecicestunepipe")
346+
347+
# request an aysnc receive with no messages waiting
348+
msg <- recv_aio(s2)
349+
350+
res <- msg$data %>>% c(2, 3) %>>% as.character()
351+
res
352+
#> < unresolvedExpr >
353+
#> - $data for evaluated expression
354+
unresolved(res$data)
355+
#> [1] TRUE
356+
357+
# sending a message causes both 'msg' and 'res' to resolve
358+
s <- send_aio(s1, 1)
359+
unresolved(res$data)
360+
#> [1] FALSE
361+
res$data
362+
#> [1] "1" "2" "3"
363+
364+
close(s1)
365+
close(s2)
366+
```
367+
368+
[« Back to ToC](#table-of-contents)
369+
324370
### RPC and Distributed Computing
325371

326372
{nanonext} implements remote procedure calls (RPC) using NNG’s req/rep
@@ -370,7 +416,7 @@ aio
370416
#> < recvAio >
371417
#> - $data for message data
372418
aio$data |> str()
373-
#> num [1:100000000] 1.231 -2.553 1.338 0.882 0.158 ...
419+
#> num [1:100000000] 0.112 -0.331 -0.798 -1.467 0.692 ...
374420
```
375421

376422
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -405,37 +451,37 @@ an environment variable `NANONEXT_LOG`.
405451

406452
``` r
407453
logging(level = "info")
408-
#> 2022-03-05 20:14:17 [ log level ] set to: info
454+
#> 2022-03-06 22:41:13 [ log level ] set to: info
409455

410456
pub <- socket("pub", listen = "inproc://nanobroadcast")
411-
#> 2022-03-05 20:14:17 [ sock open ] id: 9 | protocol: pub
412-
#> 2022-03-05 20:14:17 [ list start ] sock: 9 | url: inproc://nanobroadcast
457+
#> 2022-03-06 22:41:13 [ sock open ] id: 11 | protocol: pub
458+
#> 2022-03-06 22:41:13 [ list start ] sock: 11 | url: inproc://nanobroadcast
413459
sub <- socket("sub", dial = "inproc://nanobroadcast")
414-
#> 2022-03-05 20:14:17 [ sock open ] id: 10 | protocol: sub
415-
#> 2022-03-05 20:14:17 [ dial start ] sock: 10 | url: inproc://nanobroadcast
460+
#> 2022-03-06 22:41:13 [ sock open ] id: 12 | protocol: sub
461+
#> 2022-03-06 22:41:13 [ dial start ] sock: 12 | url: inproc://nanobroadcast
416462

417463
sub |> subscribe(topic = "examples")
418-
#> 2022-03-05 20:14:17 [ subscribe ] sock: 10 | topic: examples
464+
#> 2022-03-06 22:41:13 [ subscribe ] sock: 12 | topic: examples
419465
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
420466
sub |> recv(mode = "character", keep.raw = FALSE)
421467
#> [1] "examples" "this is an example"
422468

423469
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
424470
sub |> recv(mode = "character", keep.raw = FALSE)
425-
#> 2022-03-05 20:14:17 [ 8 ] Try again
471+
#> 2022-03-06 22:41:13 [ 8 ] Try again
426472

427473
# specify NULL to subscribe to ALL topics
428474
sub |> subscribe(topic = NULL)
429-
#> 2022-03-05 20:14:17 [ subscribe ] sock: 10 | topic: ALL
475+
#> 2022-03-06 22:41:13 [ subscribe ] sock: 12 | topic: ALL
430476
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
431477
sub |> recv("character", keep.raw = FALSE)
432478
#> [1] "newTopic" "this is a new topic"
433479

434480
sub |> unsubscribe(topic = NULL)
435-
#> 2022-03-05 20:14:17 [ unsubscribe ] sock: 10 | topic: ALL
481+
#> 2022-03-06 22:41:13 [ unsubscribe ] sock: 12 | topic: ALL
436482
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
437483
sub |> recv("character", keep.raw = FALSE)
438-
#> 2022-03-05 20:14:17 [ 8 ] Try again
484+
#> 2022-03-06 22:41:13 [ 8 ] Try again
439485

440486
# however the topics explicitly subscribed to are still received
441487
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
@@ -444,7 +490,7 @@ sub |> recv(mode = "character", keep.raw = FALSE)
444490

445491
# set logging level back to the default of errors only
446492
logging(level = "error")
447-
#> 2022-03-05 20:14:17 [ log level ] set to: error
493+
#> 2022-03-06 22:41:13 [ log level ] set to: error
448494

449495
close(pub)
450496
close(sub)
@@ -495,7 +541,7 @@ aio2$data
495541
# after the survey expires, the second resolves into a timeout error
496542
Sys.sleep(0.5)
497543
aio2$data
498-
#> 2022-03-05 20:14:18 [ 5 ] Timed out
544+
#> 2022-03-06 22:41:13 [ 5 ] Timed out
499545
#> 'errorValue' int 5
500546

501547
close(sur)
@@ -521,11 +567,11 @@ ncurl("http://httpbin.org/headers")
521567
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
522568
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
523569
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
524-
#> [76] 2d 36 32 32 33 63 34 39 61 2d 33 32 62 31 31 63 66 37 36 61 31 61 62 33 61
525-
#> [101] 63 36 62 36 30 30 61 36 33 22 0a 20 20 7d 0a 7d 0a
570+
#> [76] 2d 36 32 32 35 33 38 38 61 2d 37 65 30 39 65 64 33 35 33 63 38 30 37 61 30
571+
#> [101] 34 37 37 37 65 36 65 32 62 22 0a 20 20 7d 0a 7d 0a
526572
#>
527573
#> $data
528-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6223c49a-32b11cf76a1ab3ac6b600a63\"\n }\n}\n"
574+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6225388a-7e09ed353c807a04777e6e2b\"\n }\n}\n"
529575
```
530576

531577
For advanced use, supports additional HTTP methods such as POST or PUT.

man/grapes-greater-than-greater-than-grapes.Rd

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)