Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions google/cloud/dataproc_spark_connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,10 @@ def handler(
total_tasks += stage.num_tasks
completed_tasks += stage.num_completed_tasks

# Don't show progress bar till we receive some tasks
if total_tasks == 0:
return

tqdm_pbar = notebook_tqdm
if environment.is_interactive_terminal():
tqdm_pbar = cli_tqdm
Expand Down Expand Up @@ -1044,13 +1048,11 @@ def handler(
@staticmethod
def _sql_lazy_transformation(req):
# Select SQL command
if req.plan and req.plan.command and req.plan.command.sql_command:
return (
"select"
in req.plan.command.sql_command.sql.strip().lower().split()
)

return False
try:
query = req.plan.command.sql_command.input.sql.query
return "select" in query.strip().lower().split()
except AttributeError:
return False

def _repr_html_(self) -> str:
if not self._active_s8s_session_id:
Expand Down
20 changes: 16 additions & 4 deletions tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)

from pyspark.sql.connect.client.core import ConfigResult
from pyspark.sql.connect.proto import Command, ConfigResponse, ExecutePlanRequest, Plan, SqlCommand, UserContext
from pyspark.sql.connect.proto import Command, ConfigResponse, ExecutePlanRequest, Plan, Relation, SQL, SqlCommand, UserContext
from unittest import mock


Expand Down Expand Up @@ -1680,7 +1680,13 @@ def test_sql_lazy_transformation(self):
test_execute_plan_request_1: ExecutePlanRequest = ExecutePlanRequest(
session_id="mock-session_id-from-super",
client_type="mock-client_type-from-super",
plan=Plan(command=Command(sql_command=SqlCommand(sql="SELECT 1"))),
plan=Plan(
command=Command(
sql_command=SqlCommand(
input=Relation(sql=SQL(query="SELECT 1"))
)
)
),
tags=["mock-tag-from-super"],
user_context=UserContext(user_id="mock-user-from-super"),
operation_id=test_uuid,
Expand All @@ -1690,7 +1696,11 @@ def test_sql_lazy_transformation(self):
client_type="mock-client_type-from-super",
plan=Plan(
command=Command(
sql_command=SqlCommand(sql="INSERT INTO test_table_2 ...")
sql_command=SqlCommand(
input=Relation(
sql=SQL(query="INSERT INTO test_table_2 ...")
)
)
)
),
tags=["mock-tag-from-super"],
Expand All @@ -1703,7 +1713,9 @@ def test_sql_lazy_transformation(self):
plan=Plan(
command=Command(
sql_command=SqlCommand(
sql="DROP TABLE IF EXISTS selections"
input=Relation(
sql=SQL(query="DROP TABLE IF EXISTS selections")
)
)
)
),
Expand Down