Skip to content

Commit b79e80a

Browse files
committed
Add select future combinator
1 parent 9ac204f commit b79e80a

File tree

3 files changed

+50
-8
lines changed

3 files changed

+50
-8
lines changed

examples/workflow.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515

1616

1717
from restate import Workflow, WorkflowContext, WorkflowSharedContext
18-
from restate.exceptions import TerminalError
18+
from restate import select
19+
from restate import TerminalError
20+
21+
from datetime import timedelta
22+
23+
TIMEOUT = timedelta(seconds=10)
1924

2025
payment = Workflow("payment")
2126

@@ -38,13 +43,17 @@ def payment_gateway():
3843
ctx.set("status", "waiting for the payment provider to approve")
3944

4045
# Wait for the payment to be verified
41-
result = await ctx.promise("verify.payment").value()
42-
if result == "approved":
43-
ctx.set("status", "payment approved")
44-
return { "success" : True }
4546

46-
ctx.set("status", "payment declined")
47-
raise TerminalError(message="Payment declined", status_code=401)
47+
match await select(result=ctx.promise("verify.payment"), timeout=ctx.sleep(TIMEOUT)):
48+
case ['result', "approved"]:
49+
ctx.set("status", "payment approved")
50+
return { "success" : True }
51+
case ['result', "declined"]:
52+
ctx.set("status", "payment declined")
53+
raise TerminalError(message="Payment declined", status_code=401)
54+
case ['timeout', _]:
55+
ctx.set("status", "payment verification timed out")
56+
raise TerminalError(message="Payment verification timed out", status_code=410)
4857

4958
@payment.handler()
5059
async def payment_verified(ctx: WorkflowSharedContext, result: str):

python/restate/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
# pylint: disable=line-too-long
2323
from .context import DurablePromise, RestateDurableFuture, RestateDurableCallFuture, RestateDurableSleepFuture, SendHandle
2424
from .exceptions import TerminalError
25-
from .asyncio import as_completed, gather, wait_completed
25+
from .asyncio import as_completed, gather, wait_completed, select
2626

2727
from .endpoint import app
2828

@@ -56,4 +56,5 @@ def test_harness(app, follow_logs = False, restate_image = ""): # type: ignore
5656
"gather",
5757
"as_completed",
5858
"wait_completed",
59+
"select"
5960
]

python/restate/asyncio.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,38 @@ async def gather(*futures: RestateDurableFuture[Any]) -> List[RestateDurableFutu
2727
pass
2828
return list(futures)
2929

30+
async def select(**kws: RestateDurableFuture[Any]) -> Tuple[str, Any]:
31+
"""
32+
Blocks until one of the futures is completed.
33+
34+
Example:
35+
36+
who, what = await select(car=f1, hotel=f2, flight=f3)
37+
if who == "car":
38+
print(what)
39+
elif who == "hotel":
40+
print(what)
41+
elif who == "flight":
42+
print(what)
43+
44+
works the best with matching:
45+
46+
match await select(result=ctx.promise("verify.payment"), timeout=ctx.sleep(timedelta(seconds=10))):
47+
case ['result', "approved"]:
48+
return { "success" : True }
49+
case ['result', "declined"]:
50+
raise TerminalError(message="Payment declined", status_code=401)
51+
case ['timeout', _]:
52+
raise TerminalError(message="Payment verification timed out", status_code=410)
53+
54+
"""
55+
if not kws:
56+
raise ValueError("At least one future must be passed.")
57+
reverse = { f: key for key, f in kws.items() }
58+
async for f in as_completed(*kws.values()):
59+
return [reverse[f], await f]
60+
assert False, "unreachable"
61+
3062
async def as_completed(*futures: RestateDurableFuture[Any]):
3163
"""
3264
Returns an iterator that yields the futures as they are completed.

0 commit comments

Comments
 (0)