Skip to content

Commit 12a266f

Browse files
authored
fix(trino): update query progress using cursor stats (#36872)
1 parent 5909e90 commit 12a266f

File tree

2 files changed

+358
-12
lines changed

2 files changed

+358
-12
lines changed

superset/db_engine_specs/trino.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import contextlib
2020
import logging
21+
import math
2122
import threading
2223
import time
2324
from typing import Any, TYPE_CHECKING
@@ -29,6 +30,7 @@
2930
from sqlalchemy.exc import NoSuchTableError
3031

3132
from superset import db
33+
from superset.common.db_query_status import QueryStatus
3234
from superset.constants import QUERY_CANCEL_KEY, QUERY_EARLY_CANCEL_KEY
3335
from superset.db_engine_specs.base import BaseEngineSpec, convert_inspector_columns
3436
from superset.db_engine_specs.exceptions import (
@@ -177,6 +179,9 @@ def handle_cursor(cls, cursor: Cursor, query: Query) -> None:
177179
`execute_with_cursor` instead, to handle this asynchronously.
178180
"""
179181

182+
execute_result = getattr(cursor, "_execute_result", None)
183+
execute_event = getattr(cursor, "_execute_event", None)
184+
180185
# Adds the executed query id to the extra payload so the query can be cancelled
181186
cancel_query_id = cursor.query_id
182187
logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
@@ -187,17 +192,51 @@ def handle_cursor(cls, cursor: Cursor, query: Query) -> None:
187192

188193
db.session.commit() # pylint: disable=consider-using-transaction
189194

190-
# if query cancelation was requested prior to the handle_cursor call, but
191-
# the query was still executed, trigger the actual query cancelation now
192-
if query.extra.get(QUERY_EARLY_CANCEL_KEY):
193-
cls.cancel_query(
194-
cursor=cursor,
195-
query=query,
196-
cancel_query_id=cancel_query_id,
197-
)
198-
199195
super().handle_cursor(cursor=cursor, query=query)
200196

197+
terminal_states = {"FINISHED", "FAILED", "CANCELED"}
198+
state = "QUEUED"
199+
progress = 0.0
200+
poll_interval = app.config["DB_POLL_INTERVAL_SECONDS"].get(cls.engine, 1)
201+
max_wait_time = app.config.get("SQLLAB_ASYNC_TIME_LIMIT_SEC", 21600)
202+
start_time = time.time()
203+
while state not in terminal_states:
204+
if time.time() - start_time > max_wait_time:
205+
logger.warning("Query %d: Progress polling timed out", query.id)
206+
break
207+
# Check for errors raised in execute_thread
208+
if execute_result is not None and execute_result.get("error"):
209+
break
210+
211+
# Check if execute_event is set (thread completed)
212+
if execute_event is not None and execute_event.is_set():
213+
break
214+
215+
# if query cancelation was requested prior to the handle_cursor call, but
216+
# the query was still executed, trigger the actual query cancelation now
217+
if query.extra.get(QUERY_EARLY_CANCEL_KEY) or query.status in [
218+
QueryStatus.STOPPED,
219+
QueryStatus.TIMED_OUT,
220+
]:
221+
cls.cancel_query(
222+
cursor=cursor,
223+
query=query,
224+
cancel_query_id=cancel_query_id,
225+
)
226+
break
227+
228+
info = getattr(cursor, "stats", {}) or {}
229+
state = info.get("state", "UNKNOWN")
230+
completed_splits = float(info.get("completedSplits", 0))
231+
total_splits = float(info.get("totalSplits", 1) or 1)
232+
progress = math.floor((completed_splits / (total_splits or 1)) * 100)
233+
234+
if progress != query.progress:
235+
query.progress = progress
236+
db.session.commit() # pylint: disable=consider-using-transaction
237+
238+
time.sleep(poll_interval)
239+
201240
@classmethod
202241
def execute_with_cursor(
203242
cls,
@@ -262,6 +301,10 @@ def _execute(
262301
while not cursor.query_id and not execute_event.is_set():
263302
time.sleep(0.1)
264303

304+
# Pass additional attributes to check whether an error occurred in the
305+
# execute thread running in parallel while updating progress through the cursor.
306+
cursor._execute_result = execute_result
307+
cursor._execute_event = execute_event
265308
logger.debug("Query %d: Handling cursor", query_id)
266309
cls.handle_cursor(cursor, query)
267310

0 commit comments

Comments
 (0)