Skip to content

Commit 3774c62

Browse files
authored
Set upper bound for dj.data polling and fix intermittent no data errors (#1857)
* Set upper bound to data polling Add retry to get final set of data after successful polling * Add tests
1 parent fd78188 commit 3774c62

File tree

2 files changed

+82
-3
lines changed

2 files changed

+82
-3
lines changed

datajunction-clients/python/datajunction/client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,21 @@ def _data( # pylint: disable=too-many-arguments,too-many-locals
417417
printed_links = True
418418
progress_bar.title = f"Status: {job_state.value}"
419419

420-
# Update the polling interval
420+
# Update the polling interval (cap at 10s to avoid long waits)
421421
time.sleep(poll_interval)
422-
poll_interval *= 2
422+
poll_interval = min(poll_interval * 2, 10)
423423

424-
# Return results if the job has finished
424+
# Return results if the job has finished. If the server returned
425+
# FINISHED with empty results, then re-poll a few
426+
# times before giving up.
425427
if job_state == models.QueryState.FINISHED:
428+
if results and not results.get("results"):
429+
for attempt in range(3):
430+
time.sleep(2**attempt)
431+
response = self._session.get(path, params=params)
432+
results = response.json()
433+
if results.get("results"):
434+
break
426435
return self.process_results(results)
427436
if job_state == models.QueryState.CANCELED: # pragma: no cover
428437
raise DJClientException("Query execution was canceled!")

datajunction-clients/python/tests/test_client.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,76 @@ def test_data(self, client):
399399
async_=True,
400400
)
401401

402+
# FINISHED with empty results triggers re-poll retry; data arrives on second poll
403+
finished_empty = type(
404+
"R",
405+
(),
406+
{
407+
"json": lambda self: {
408+
"state": "FINISHED",
409+
"results": [],
410+
"errors": [],
411+
"links": [],
412+
},
413+
"status_code": 200,
414+
},
415+
)()
416+
finished_with_data = type(
417+
"R",
418+
(),
419+
{
420+
"json": lambda self: {
421+
"state": "FINISHED",
422+
"results": [
423+
{
424+
"columns": [
425+
{
426+
"name": "default_DOT_hard_hat_DOT_city",
427+
"type": "str",
428+
"semantic_type": "dimension",
429+
"semantic_entity": "default.hard_hat.city",
430+
"semantic_name": "default.hard_hat.city",
431+
"node": "default.hard_hat",
432+
},
433+
{
434+
"name": "default_DOT_avg_repair_price",
435+
"type": "float",
436+
"semantic_type": "metric",
437+
"semantic_name": "default.avg_repair_price",
438+
"node": "default.avg_repair_price",
439+
},
440+
],
441+
"rows": [["Foo", 1.0], ["Bar", 2.0]],
442+
},
443+
],
444+
"errors": [],
445+
"links": [],
446+
},
447+
"status_code": 200,
448+
},
449+
)()
450+
original_get = client._session.get
451+
call_count = [0]
452+
453+
def mock_get(path, params=None, **kwargs):
454+
if "/data/" in str(path):
455+
call_count[0] += 1
456+
if call_count[0] == 1:
457+
return finished_empty
458+
return finished_with_data
459+
return original_get(path, params=params, **kwargs)
460+
461+
client._session.get = mock_get
462+
result = client.data(
463+
metrics=["default.avg_repair_price"],
464+
dimensions=["default.hard_hat.city"],
465+
)
466+
client._session.get = original_get
467+
assert list(result.columns) == [
468+
"default.hard_hat.city",
469+
"default.avg_repair_price",
470+
]
471+
402472
# Error propagation
403473
# with pytest.raises(DJClientException) as exc_info:
404474
# client.data(

0 commit comments

Comments
 (0)