Skip to content

Commit 05adb45

Browse files
cancel/interrupt: send 'withdraw' to message board
1 parent ce404bd commit 05adb45

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0-9005
3+
Version: 0.3.0-9006
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),

R/pico_p2p.R

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,30 @@ pico_p2p_have_future <- function(p, future, duration = getOption("future.p2p.dur
134134
pico_send_message_dataframe(p, m)
135135
}
136136

137+
138+
pico_p2p_withdraw_future <- function(p, future_id = future_id, from = p$user, ...) {
139+
debug <- isTRUE(getOption("future.p2p.debug"))
140+
if (debug) {
141+
mdebug_push("pico_p2p_widthdraw_future() ...")
142+
mdebugf("Future: %s", future_id)
143+
on.exit({
144+
mdebug_pop()
145+
})
146+
}
147+
148+
stopifnot(length(from) == 1L, is.character(from), nzchar(from))
149+
150+
m <- data.frame(
151+
when = now_str(),
152+
type = "withdraw",
153+
from = from,
154+
future = future_id
155+
)
156+
157+
pico_send_message_dataframe(p, m)
158+
}
159+
160+
137161
pico_p2p_take_on_future <- function(p, to, future, duration = 60, from = p$user, ...) {
138162
debug <- isTRUE(getOption("future.p2p.debug"))
139163
if (debug) {
@@ -142,7 +166,7 @@ pico_p2p_take_on_future <- function(p, to, future, duration = 60, from = p$user,
142166
mdebugf("Duration: %g seconds", duration)
143167
on.exit({
144168
mdebug_pop()
145-
})
169+
})
146170
}
147171

148172
stopifnot(length(future) == 1L, is.character(future), nzchar(future))
@@ -310,6 +334,7 @@ pico_p2p_dispatch_future <- function(future) {
310334
pico_p2p_have_future <- import_future.p2p("pico_p2p_have_future")
311335
pico_p2p_wait_for <- import_future.p2p("pico_p2p_wait_for")
312336
pico_p2p_send_future <- import_future.p2p("pico_p2p_send_future")
337+
pico_p2p_withdraw_future <- import_future.p2p("pico_p2p_withdraw_future")
313338
pico_p2p_receive_result <- import_future.p2p("pico_p2p_receive_result")
314339

315340
update_parent <- function(msg, ...) {
@@ -354,15 +379,15 @@ pico_p2p_dispatch_future <- function(future) {
354379

355380
## Check for interrupts
356381
if ("interrupt" %in% listen_parent()) {
357-
## TODO: Withdraw future
382+
m0 <- pico_p2p_withdraw_future(pico, future_id = future_id)
358383
return(list(type = "event", value = "interrupted"))
359384
}
360385

361386
m2 <- pico_p2p_wait_for(pico, type = "offer", futures = m1[["future"]], expires = m1[["expires"]])
362387

363388
## Check for interrupts
364389
if ("interrupt" %in% listen_parent()) {
365-
## TODO: Withdraw future
390+
m0 <- pico_p2p_withdraw_future(pico, future_id = future_id)
366391
return(list(type = "event", value = "interrupted"))
367392
}
368393

@@ -380,7 +405,7 @@ pico_p2p_dispatch_future <- function(future) {
380405

381406
## Check for interrupts
382407
if ("interrupt" %in% listen_parent()) {
383-
## TODO: Withdraw future
408+
m0 <- pico_p2p_withdraw_future(pico, future_id = future_id)
384409
return(list(type = "event", value = "interrupted"))
385410
}
386411

@@ -390,7 +415,7 @@ pico_p2p_dispatch_future <- function(future) {
390415
tryCatch({
391416
file <- pico_p2p_receive_result(pico, via = via, path = path)
392417
}, interrupt = function(int) {
393-
## TODO: Withdraw future
418+
m0 <- pico_p2p_withdraw_future(pico, future_id = future_id)
394419
return(list(type = "event", value = "interrupted"))
395420
})
396421

0 commit comments

Comments
 (0)