Skip to content

Commit bd46825

Browse files
committed
implement meanwhile (concurrency) pipe
1 parent 3c08760 commit bd46825

File tree

8 files changed

+278
-46
lines changed

8 files changed

+278
-46
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.2.0.9006
4+
Version: 0.2.0.9007
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library providing high-performance scalability protocols,
77
implementing a cross-platform standard for messaging and communications.

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ S3method(setopt,nanoSocket)
2828
S3method(start,nanoDialer)
2929
S3method(start,nanoListener)
3030
export("%>>%")
31+
export("%~%")
3132
export(.mirai_scm)
3233
export(call_aio)
3334
export(context)

NEWS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
# nanonext 0.2.0.9006 (development)
1+
# nanonext 0.2.0.9007 (development)
22

33
#### New Features
44

55
* Aio values `$result`, `$data` and `$raw` now resolve automatically without requiring `call_aio()`. Access the values directly and an 'unresolved' logical NA will be returned if the Aio operation is yet to complete.
66
* `unresolved()` added as an auxiliary function to query whether an Aio is unresolved, for use in control flow statements.
7-
* Implements the Deferred Evaluation Pipe `%>>%` for working with potentially unresolved values.
7+
* Implements the Deferred Evaluation Pipe `%>>%` and Meanwhile / Concurrency Pipe `%~%` for working with potentially unresolved values.
88
* Integer error values generated by receive functions are now classed 'errorValue'. `is_error_value()` helper function included.
99
* `is_nul_byte()` added as a helper function for request/reply setups.
1010
* `survey_time()` added as a convenience function for surveyor/respondent patterns.

R/pipe.R

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
#' @param f a function that accepts 'x' as its first argument.
99
#'
1010
#' @return The evaluated result, or if x is an 'unresolvedValue', an
11-
#' 'unresolvedExpr'.
11+
#' 'unresolvedExpr'. To provide certainty of return value, use together with
12+
#' a meanwhile (concurrency) pipe \code{\link{\%~\%}}.
1213
#'
1314
#' @details An 'unresolvedExpr' encapsulates the eventual evaluation result.
1415
#' Query its \code{$data} element for resolution. Once resolved, the object
@@ -21,10 +22,6 @@
2122
#' \code{\link{unresolved}} may be used on an 'unresolvedExpr' or its
2223
#' \code{$data} element to test for resolution.
2324
#'
24-
#' This function is marked [experimental], which means it is currently
25-
#' under development. Please note that the final implementation is likely to
26-
#' differ from the current version.
27-
#'
2825
#' @section Usage:
2926
#'
3027
#' Usage is similar to R's native \code{|>} pipe.
@@ -51,6 +48,16 @@
5148
#' res <- send_aio(s1, 1)
5249
#' b$data
5350
#'
51+
#' # use with a meanwhile (concurrency) pipe
52+
#' n <- 0L
53+
#' rec <- recv_aio(s2)
54+
#' a <- rec$data %>>% identical(data.frame()) %~% {
55+
#' if (n == 100) send_aio(s1, data.frame())
56+
#' cat("unresolved", n, "\n")
57+
#' n <- n + 1
58+
#' }
59+
#' a
60+
#'
5461
#' close(s1)
5562
#' close(s2)
5663
#' }
@@ -81,6 +88,71 @@
8188
}
8289
}
8390

91+
#' Meanwhile (Concurrency) Pipe
92+
#'
93+
#' Pipe an expression to execute in a loop whilst waiting for a potentially
94+
#' 'unresolvedExpr' to resolve.
95+
#'
96+
#' @param x the output of a deferred evaluation pipe \code{\link{\%>>\%}} i.e.
97+
#' possibly an 'unresolvedExpr'.
98+
#' @param expr an expression to evaluate, repeated as in a while loop, whilst
99+
#' 'x' remains unresolved.
100+
#'
101+
#' @return The evaluated result of 'x'.
102+
#'
103+
#' @details Intended to be used at the end of an expression involving the
104+
#' deferred evaluation pipe \code{\link{\%>>\%}} to:
105+
#'
106+
#' (i) provide certainty of the return value, which will always be the
107+
#' evaluated result rather than a 'resolvedExpr'.
108+
#'
109+
#' (ii) make it convenient to write concurrent code that runs whilst the
110+
#' expression is resolving.
111+
#'
112+
#' @section Usage:
113+
#'
114+
#' \code{x \%~\% expr}
115+
#'
116+
#' is equivalent to the following expression, finally returning \code{x}:
117+
#'
118+
#' \code{if (unresolved(x)) while (unresolved(x <- x$data)) {expr}; x}
119+
#'
120+
#' @examples
121+
#' if (interactive()) {
122+
#' # Only run examples in interactive R sessions
123+
#'
124+
#' s1 <- socket("pair", listen = "tcp://127.0.0.1:6546")
125+
#' s2 <- socket("pair", dial = "tcp://127.0.0.1:6546")
126+
#'
127+
#' n <- 0L
128+
#' rec <- recv_aio(s2)
129+
#' a <- rec$data %>>% identical(data.frame()) %~% {
130+
#' if (n == 100) send_aio(s1, data.frame())
131+
#' cat("unresolved", n, "\n")
132+
#' n <- n + 1
133+
#' }
134+
#' a
135+
#'
136+
#' msg <- recv_aio(s2)
137+
#' # execute the following send from another R process for a better demonstration:
138+
#' r <- send_aio(s1, data.frame())
139+
#' b <- msg$data %>>% identical(data.frame()) %~% call_aio(msg)
140+
#' b
141+
#'
142+
#' close(s1)
143+
#' close(s2)
144+
#' }
145+
#'
146+
#' @export
147+
#'
148+
`%~%` <- function(x, expr) {
149+
call <- substitute(expr)
150+
if (unresolved(x)) while (unresolved(x <- x$data)) {
151+
eval(call, envir = parent.frame(2L), enclos = baseenv())
152+
}
153+
x
154+
}
155+
84156
#' @export
85157
#'
86158
print.unresolvedExpr <- function(x, ...) {

README.Rmd

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ Implemented transports:
4848
3. [Cross-language Exchange](#cross-language-exchange)
4949
4. [Async and Concurrency](#async-and-concurrency)
5050
5. [Deferred Evaluation Pipe](#deferred-evaluation-pipe)
51-
6. [RPC and Distributed Computing](#rpc-and-distributed-computing)
52-
7. [Publisher / Subscriber Model](#publisher-subscriber-model)
53-
8. [Surveyor / Repondent Model](#surveyor-respondent-model)
54-
9. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
55-
10. [Building from source](#building-from-source)
56-
11. [Links](#links)
51+
6. [Meanwhile (Concurrency) Pipe](#meanwhile-concurrency-pipe)
52+
7. [RPC and Distributed Computing](#rpc-and-distributed-computing)
53+
8. [Publisher / Subscriber Model](#publisher-subscriber-model)
54+
9. [Surveyor / Repondent Model](#surveyor-respondent-model)
55+
10. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
56+
11. [Building from source](#building-from-source)
57+
12. [Links](#links)
5758

5859
### Installation
5960

@@ -271,10 +272,10 @@ The pipe operator semantics are similar to R's base pipe `|>`:
271272
`x %>>% f()` is equivalent to `f(x)` <br />
272273
`x %>>% f(y)` is equivalent to `f(x, y)`
273274

274-
```{r async6}
275+
```{r pipe1}
275276
276-
s1 <- socket("pair", listen = "inproc://cecicestunepipe")
277-
s2 <- socket("pair", dial = "inproc://cecicestunepipe")
277+
s1 <- socket("pair", listen = "inproc://dep")
278+
s2 <- socket("pair", dial = "inproc://dep")
278279
279280
# request an aysnc receive with no messages waiting
280281
msg <- recv_aio(s2)
@@ -296,6 +297,43 @@ close(s2)
296297

297298
[&laquo; Back to ToC](#table-of-contents)
298299

300+
### Meanwhile (Concurrency) Pipe
301+
302+
The deferred evaluation pipe sequence may be terminated with a meanwhile (or concurrency) pipe `%~%` which:
303+
304+
(i) provides certainty of the return value, which will always be the evaluated result rather than a 'resolvedExpr'
305+
306+
(ii) makes it convenient to write concurrent code which runs whilst the expression is resolving
307+
308+
`x %~% expr`
309+
310+
is equivalent to the following expression, finally returning x:
311+
312+
`if (unresolved(x)) while (unresolved(x <- x$data)) {expr}; x`
313+
314+
Use it in the following way:
315+
316+
```{r pipe2}
317+
318+
s1 <- socket("pair", listen = "inproc://meanwhile")
319+
s2 <- socket("pair", dial = "inproc://meanwhile")
320+
321+
n <- 1L
322+
rec <- recv_aio(s2)
323+
a <- rec$data %>>% identical(data.frame()) %~% {
324+
if (n == 5) send_aio(s1, data.frame())
325+
cat("unresolved", n, "\n")
326+
n <- n + 1
327+
}
328+
a
329+
330+
close(s1)
331+
close(s2)
332+
333+
```
334+
335+
[&laquo; Back to ToC](#table-of-contents)
336+
299337
### RPC and Distributed Computing
300338

301339
{nanonext} implements remote procedure calls (RPC) using NNG's req/rep protocol to provide a basis for distributed computing.

README.md

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,13 @@ Implemented transports:
4949
3. [Cross-language Exchange](#cross-language-exchange)
5050
4. [Async and Concurrency](#async-and-concurrency)
5151
5. [Deferred Evaluation Pipe](#deferred-evaluation-pipe)
52-
6. [RPC and Distributed Computing](#rpc-and-distributed-computing)
53-
7. [Publisher / Subscriber Model](#publisher-subscriber-model)
54-
8. [Surveyor / Repondent Model](#surveyor-respondent-model)
55-
9. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
56-
10. [Building from source](#building-from-source)
57-
11. [Links](#links)
52+
6. [Meanwhile (Concurrency) Pipe](#meanwhile-concurrency-pipe)
53+
7. [RPC and Distributed Computing](#rpc-and-distributed-computing)
54+
8. [Publisher / Subscriber Model](#publisher-subscriber-model)
55+
9. [Surveyor / Repondent Model](#surveyor-respondent-model)
56+
10. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
57+
11. [Building from source](#building-from-source)
58+
12. [Links](#links)
5859

5960
### Installation
6061

@@ -344,8 +345,8 @@ The pipe operator semantics are similar to R’s base pipe `|>`:
344345
`f(x)` <br /> `x %>>% f(y)` is equivalent to `f(x, y)`
345346

346347
``` r
347-
s1 <- socket("pair", listen = "inproc://cecicestunepipe")
348-
s2 <- socket("pair", dial = "inproc://cecicestunepipe")
348+
s1 <- socket("pair", listen = "inproc://dep")
349+
s2 <- socket("pair", dial = "inproc://dep")
349350

350351
# request an aysnc receive with no messages waiting
351352
msg <- recv_aio(s2)
@@ -372,6 +373,50 @@ close(s2)
372373

373374
[« Back to ToC](#table-of-contents)
374375

376+
### Meanwhile (Concurrency) Pipe
377+
378+
The deferred evaluation pipe sequence may be terminated with a meanwhile
379+
(or concurrency) pipe `%~%` which:
380+
381+
1) provides certainty of the return value, which will always be the
382+
evaluated result rather than a ‘resolvedExpr’
383+
384+
2) makes it convenient to write concurrent code which runs whilst the
385+
expression is resolving
386+
387+
`x %~% expr`
388+
389+
is equivalent to the following expression, finally returning x:
390+
391+
`if (unresolved(x)) while (unresolved(x <- x$data)) {expr}; x`
392+
393+
Use it in the following way:
394+
395+
``` r
396+
s1 <- socket("pair", listen = "inproc://meanwhile")
397+
s2 <- socket("pair", dial = "inproc://meanwhile")
398+
399+
n <- 1L
400+
rec <- recv_aio(s2)
401+
a <- rec$data %>>% identical(data.frame()) %~% {
402+
if (n == 5) send_aio(s1, data.frame())
403+
cat("unresolved", n, "\n")
404+
n <- n + 1
405+
}
406+
#> unresolved 1
407+
#> unresolved 2
408+
#> unresolved 3
409+
#> unresolved 4
410+
#> unresolved 5
411+
a
412+
#> [1] TRUE
413+
414+
close(s1)
415+
close(s2)
416+
```
417+
418+
[« Back to ToC](#table-of-contents)
419+
375420
### RPC and Distributed Computing
376421

377422
{nanonext} implements remote procedure calls (RPC) using NNG’s req/rep
@@ -421,7 +466,7 @@ aio
421466
#> < recvAio >
422467
#> - $data for message data
423468
aio$data |> str()
424-
#> num [1:100000000] -0.82 0.147 1.761 0.425 0.182 ...
469+
#> num [1:100000000] 0.925 -0.323 1.47 -1.237 -0.555 ...
425470
```
426471

427472
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -456,37 +501,37 @@ an environment variable `NANONEXT_LOG`.
456501

457502
``` r
458503
logging(level = "info")
459-
#> 2022-03-07 22:11:21 [ log level ] set to: info
504+
#> 2022-03-09 09:28:33 [ log level ] set to: info
460505

461506
pub <- socket("pub", listen = "inproc://nanobroadcast")
462-
#> 2022-03-07 22:11:21 [ sock open ] id: 11 | protocol: pub
463-
#> 2022-03-07 22:11:21 [ list start ] sock: 11 | url: inproc://nanobroadcast
507+
#> 2022-03-09 09:28:33 [ sock open ] id: 13 | protocol: pub
508+
#> 2022-03-09 09:28:33 [ list start ] sock: 13 | url: inproc://nanobroadcast
464509
sub <- socket("sub", dial = "inproc://nanobroadcast")
465-
#> 2022-03-07 22:11:21 [ sock open ] id: 12 | protocol: sub
466-
#> 2022-03-07 22:11:21 [ dial start ] sock: 12 | url: inproc://nanobroadcast
510+
#> 2022-03-09 09:28:33 [ sock open ] id: 14 | protocol: sub
511+
#> 2022-03-09 09:28:33 [ dial start ] sock: 14 | url: inproc://nanobroadcast
467512

468513
sub |> subscribe(topic = "examples")
469-
#> 2022-03-07 22:11:21 [ subscribe ] sock: 12 | topic: examples
514+
#> 2022-03-09 09:28:33 [ subscribe ] sock: 14 | topic: examples
470515
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
471516
sub |> recv(mode = "character", keep.raw = FALSE)
472517
#> [1] "examples" "this is an example"
473518

474519
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
475520
sub |> recv(mode = "character", keep.raw = FALSE)
476-
#> 2022-03-07 22:11:21 [ 8 ] Try again
521+
#> 2022-03-09 09:28:33 [ 8 ] Try again
477522

478523
# specify NULL to subscribe to ALL topics
479524
sub |> subscribe(topic = NULL)
480-
#> 2022-03-07 22:11:21 [ subscribe ] sock: 12 | topic: ALL
525+
#> 2022-03-09 09:28:33 [ subscribe ] sock: 14 | topic: ALL
481526
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
482527
sub |> recv("character", keep.raw = FALSE)
483528
#> [1] "newTopic" "this is a new topic"
484529

485530
sub |> unsubscribe(topic = NULL)
486-
#> 2022-03-07 22:11:21 [ unsubscribe ] sock: 12 | topic: ALL
531+
#> 2022-03-09 09:28:33 [ unsubscribe ] sock: 14 | topic: ALL
487532
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
488533
sub |> recv("character", keep.raw = FALSE)
489-
#> 2022-03-07 22:11:21 [ 8 ] Try again
534+
#> 2022-03-09 09:28:33 [ 8 ] Try again
490535

491536
# however the topics explicitly subscribed to are still received
492537
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
@@ -495,7 +540,7 @@ sub |> recv(mode = "character", keep.raw = FALSE)
495540

496541
# set logging level back to the default of errors only
497542
logging(level = "error")
498-
#> 2022-03-07 22:11:21 [ log level ] set to: error
543+
#> 2022-03-09 09:28:33 [ log level ] set to: error
499544

500545
close(pub)
501546
close(sub)
@@ -546,7 +591,7 @@ aio2$data
546591
# after the survey expires, the second resolves into a timeout error
547592
Sys.sleep(0.5)
548593
aio2$data
549-
#> 2022-03-07 22:11:22 [ 5 ] Timed out
594+
#> 2022-03-09 09:28:34 [ 5 ] Timed out
550595
#> 'errorValue' int 5
551596

552597
close(sur)
@@ -572,11 +617,11 @@ ncurl("http://httpbin.org/headers")
572617
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
573618
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
574619
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
575-
#> [76] 2d 36 32 32 36 38 33 30 61 2d 33 61 62 32 30 31 39 64 35 34 64 33 66 63 66
576-
#> [101] 30 32 37 30 39 66 62 62 32 22 0a 20 20 7d 0a 7d 0a
620+
#> [76] 2d 36 32 32 38 37 33 34 32 2d 31 62 36 66 35 31 39 39 30 62 65 33 61 64 38
621+
#> [101] 36 36 66 61 32 37 34 61 30 22 0a 20 20 7d 0a 7d 0a
577622
#>
578623
#> $data
579-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6226830a-3ab2019d54d3fcf02709fbb2\"\n }\n}\n"
624+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62287342-1b6f51990be3ad866fa274a0\"\n }\n}\n"
580625
```
581626

582627
For advanced use, supports additional HTTP methods such as POST or PUT.

0 commit comments

Comments
 (0)