Skip to content

Commit 01524f3

Browse files
authored
feat: improve task polling (#409)
Implements an improved task polling mechanism with configurable wait time. This helps to minimize resource consumption on the Connect server while still providing responsive feedback. Resolves #184
1 parent 1486559 commit 01524f3

File tree

2 files changed

+66
-51
lines changed

2 files changed

+66
-51
lines changed

src/posit/connect/tasks.py

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
from __future__ import annotations
44

5-
import time
6-
75
from typing_extensions import overload
86

97
from . import resources
@@ -97,39 +95,53 @@ def update(self, *args, **kwargs) -> None:
9795
result = response.json()
9896
super().update(**result)
9997

100-
def wait_for(self, *, initial_wait: int = 1, max_wait: int = 10, backoff: float = 1.5) -> None:
98+
def wait_for(self, *, wait: int = 1, max_attempts: int | None = None) -> None:
10199
"""Wait for the task to finish.
102100
103101
Parameters
104102
----------
105-
initial_wait : int, default 1
106-
Initial wait time in seconds. First API request will use this as the wait parameter.
107-
max_wait : int, default 10
103+
wait : int, default 1
108104
Maximum wait time in seconds between polling requests.
109-
backoff : float, default 1.5
110-
Backoff multiplier for increasing wait times.
105+
max_attempts : int | None, default None
106+
Maximum number of polling attempts. If None, polling will continue indefinitely.
107+
108+
Raises
109+
------
110+
TimeoutError
111+
If the task does not finish within the maximum attempts.
112+
113+
Notes
114+
-----
115+
If the task finishes before the wait time or maximum attempts are reached, the function will return immediately. For example, if the wait time is set to 5 seconds and the task finishes in 2 seconds, the function will return after 2 seconds.
116+
117+
If the task does not finished after the maximum attempts, a TimeoutError will be raised. By default, the maximum attempts is None, which means the function will wait indefinitely until the task finishes.
111118
112119
Examples
113120
--------
114121
>>> task.wait_for()
115122
None
116123
117-
Notes
118-
-----
119-
This method implements an exponential backoff strategy to reduce the number of API calls
120-
while waiting for long-running tasks. The first request uses the initial_wait value,
121-
and subsequent requests increase the wait time by the backoff factor, up to max_wait. To disable exponential backoff, set backoff to 1.0.
122-
"""
123-
wait_time = initial_wait
124+
Waiting for a task to finish with a custom wait time.
124125
125-
while not self.is_finished:
126-
self.update()
126+
>>> task.wait_for(wait=5)
127+
None
127128
128-
# Wait client-side
129-
time.sleep(wait_time)
129+
Waiting for a task with a maximum number of attempts.
130130
131-
# Calculate next wait time with backoff
132-
wait_time = min(wait_time * backoff, max_wait)
131+
>>> task.wait_for(max_attempts=3)
132+
None
133+
"""
134+
attempts = 0
135+
while not self.is_finished:
136+
if max_attempts is not None and attempts >= max_attempts:
137+
break
138+
self.update(wait=wait)
139+
attempts += 1
140+
141+
if not self.is_finished:
142+
raise TimeoutError(
143+
f"Task {self['id']} did not finish within the specified wait time or maximum attempts."
144+
)
133145

134146

135147
class Tasks(resources.Resources):

tests/posit/connect/test_tasks.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from unittest import mock
22

3+
import pytest
34
import responses
45
from responses import BaseResponse, matchers
56

@@ -118,6 +119,7 @@ def test(self):
118119
responses.get(
119120
f"https://connect.example/__api__/v1/tasks/{uid}",
120121
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True},
122+
match=[matchers.query_param_matcher({"wait": 1})],
121123
),
122124
]
123125

@@ -127,35 +129,27 @@ def test(self):
127129
assert not task.is_finished
128130

129131
# invoke
130-
task.wait_for()
132+
task.wait_for(wait=1)
131133

132134
# assert
133135
assert task.is_finished
134136
assert mock_tasks_get[0].call_count == 1
135137
assert mock_tasks_get[1].call_count == 1
136138

137139
@responses.activate
138-
@mock.patch("time.sleep", autospec=True)
139-
def test_exponential_backoff(self, mock_sleep):
140+
def test_with_custom_wait(self):
140141
uid = "jXhOhdm5OOSkGhJw"
141142

142143
# behavior
143144
mock_tasks_get = [
144-
responses.get(
145-
f"https://connect.example/__api__/v1/tasks/{uid}",
146-
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
147-
),
148-
responses.get(
149-
f"https://connect.example/__api__/v1/tasks/{uid}",
150-
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
151-
),
152145
responses.get(
153146
f"https://connect.example/__api__/v1/tasks/{uid}",
154147
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
155148
),
156149
responses.get(
157150
f"https://connect.example/__api__/v1/tasks/{uid}",
158151
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True},
152+
match=[matchers.query_param_matcher({"wait": 5})],
159153
),
160154
]
161155

@@ -165,31 +159,19 @@ def test_exponential_backoff(self, mock_sleep):
165159
assert not task.is_finished
166160

167161
# invoke
168-
task.wait_for(initial_wait=1, max_wait=5, backoff=2.0)
162+
task.wait_for(wait=5)
169163

170164
# assert
171165
assert task.is_finished
172166
assert mock_tasks_get[0].call_count == 1
173167
assert mock_tasks_get[1].call_count == 1
174168

175-
# Verify sleep calls
176-
mock_sleep.assert_has_calls([mock.call(1), mock.call(2), mock.call(4)], any_order=False)
177-
178169
@responses.activate
179-
@mock.patch("time.sleep", autospec=True)
180-
def test_no_backoff(self, mock_sleep):
170+
def test_immediate_completion(self):
181171
uid = "jXhOhdm5OOSkGhJw"
182172

183173
# behavior
184174
mock_tasks_get = [
185-
responses.get(
186-
f"https://connect.example/__api__/v1/tasks/{uid}",
187-
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
188-
),
189-
responses.get(
190-
f"https://connect.example/__api__/v1/tasks/{uid}",
191-
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
192-
),
193175
responses.get(
194176
f"https://connect.example/__api__/v1/tasks/{uid}",
195177
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True},
@@ -199,18 +181,39 @@ def test_no_backoff(self, mock_sleep):
199181
# setup
200182
c = connect.Client("https://connect.example", "12345")
201183
task = c.tasks.get(uid)
202-
assert not task.is_finished
184+
assert task.is_finished
203185

204186
# invoke
205-
task.wait_for(initial_wait=2, max_wait=5, backoff=1.0)
187+
task.wait_for(wait=1)
206188

207189
# assert
208190
assert task.is_finished
209191
assert mock_tasks_get[0].call_count == 1
210-
assert mock_tasks_get[1].call_count == 1
211192

212-
# Verify sleep calls
213-
mock_sleep.assert_has_calls([mock.call(2), mock.call(2)], any_order=False)
193+
@responses.activate
194+
def test_maximum_attempts(self):
195+
uid = "jXhOhdm5OOSkGhJw"
196+
197+
# behavior
198+
mock_tasks_get = [
199+
responses.get(
200+
f"https://connect.example/__api__/v1/tasks/{uid}",
201+
json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False},
202+
),
203+
]
204+
205+
# setup
206+
c = connect.Client("https://connect.example", "12345")
207+
task = c.tasks.get(uid)
208+
assert not task.is_finished
209+
210+
# invoke and assert
211+
with pytest.raises(TimeoutError):
212+
task.wait_for(wait=1, max_attempts=1)
213+
214+
# assert
215+
assert not task.is_finished
216+
assert mock_tasks_get[0].call_count == 2 # 1 for initial check, 1 for timeout check
214217

215218

216219
class TestTasksGet:

0 commit comments

Comments
 (0)