Skip to content

Commit a20fd4a

Browse files
authored
Refactor to use eval_mirai_with_cancel() (#387)
1 parent 21552e1 commit a20fd4a

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

R/daemon.R

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,18 +141,15 @@ daemon <- function(
141141
xc <- 1L
142142
break
143143
}
144-
cancel <- recv_aio(sock, mode = 8L, cv = substitute())
145-
data <- eval_mirai(m)
146-
stop_aio(cancel)
147144
(task >= maxtasks || maxtime && mclock() >= maxtime) && {
148145
.mark()
149-
send(sock, data, mode = 1L, block = TRUE)
146+
send(sock, eval_mirai_with_cancel(m, sock), mode = 1L, block = TRUE)
150147
aio <- recv_aio(sock, mode = 8L, cv = cv)
151148
xc <- 2L + (task >= maxtasks)
152149
wait(cv)
153150
break
154151
}
155-
send(sock, data, mode = 1L, block = TRUE)
152+
send(sock, eval_mirai_with_cancel(m, sock), mode = 1L, block = TRUE)
156153
if (cleanup) do_cleanup()
157154
task <- task + 1L
158155
}
@@ -168,8 +165,7 @@ daemon <- function(
168165
xc <- 1L
169166
break
170167
}
171-
data <- eval_mirai(m)
172-
send(ctx, data, mode = 1L, block = TRUE)
168+
send(ctx, eval_mirai(m), mode = 1L, block = TRUE)
173169
if (cleanup) do_cleanup()
174170
(task >= maxtasks || maxtime && mclock() >= maxtime) && {
175171
xc <- 2L + (task >= maxtasks)
@@ -200,8 +196,8 @@ daemon <- function(
200196
pipe_notify(sock, cv, remove = TRUE, flag = flag_value())
201197
dial(sock, url = url, autostart = NA, fail = 2L)
202198
`[[<-`(., "sock", sock)
203-
data <- eval_mirai(recv(sock, mode = 1L, block = TRUE))
204-
send(sock, data, mode = 1L, block = TRUE) || until(cv, .limit_short)
199+
m <- recv(sock, mode = 1L, block = TRUE)
200+
send(sock, eval_mirai(m), mode = 1L, block = TRUE) || until(cv, .limit_short)
205201
}
206202

207203
# internals --------------------------------------------------------------------
@@ -225,6 +221,12 @@ eval_mirai <- function(._mirai_.) {
225221
)
226222
}
227223

224+
eval_mirai_with_cancel <- function(._mirai_., sock) {
225+
cancel <- recv_aio(sock, mode = 8L, cv = substitute())
226+
on.exit(stop_aio(cancel))
227+
eval_mirai(._mirai_.)
228+
}
229+
228230
dial_sync_socket <- function(sock, url, autostart = NA, tls = NULL) {
229231
cv <- cv()
230232
pipe_notify(sock, cv, add = TRUE)

tests/tests.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
279279
test_zero(daemons(0))
280280
Sys.sleep(0.5)
281281
test_zero(daemons(n = 1L, url = local_url(), dispatcher = TRUE))
282-
task <- mirai(TRUE)
282+
task <- mirai(substitute())
283283
url <- nextget("url")
284284
test_equal(3L, daemon(url = url, maxtasks = 1L, cleanup = 0L, dispatcher = TRUE))
285285
test_zero(daemons(n = 0L))

0 commit comments

Comments
 (0)