Skip to content

Commit 9fb6a76

Browse files
Merge branch 'main' into backend-refactors
2 parents fe3acb1 + b3a6f55 commit 9fb6a76

File tree

6 files changed

+121
-262
lines changed

6 files changed

+121
-262
lines changed

src/databricks/sql/client.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,7 @@ def __enter__(self) -> "Connection":
282282
return self
283283

284284
def __exit__(self, exc_type, exc_value, traceback):
285-
try:
286-
self.close()
287-
except BaseException as e:
288-
logger.warning(f"Exception during connection close in __exit__: {e}")
289-
if exc_type is None:
290-
raise
291-
return False
285+
self.close()
292286

293287
def __del__(self):
294288
if self.open:
@@ -418,14 +412,7 @@ def __enter__(self) -> "Cursor":
418412
return self
419413

420414
def __exit__(self, exc_type, exc_value, traceback):
421-
try:
422-
logger.debug("Cursor context manager exiting, calling close()")
423-
self.close()
424-
except BaseException as e:
425-
logger.warning(f"Exception during cursor close in __exit__: {e}")
426-
if exc_type is None:
427-
raise
428-
return False
415+
self.close()
429416

430417
def __iter__(self):
431418
if self.active_result_set:
@@ -1092,21 +1079,7 @@ def cancel(self) -> None:
10921079
def close(self) -> None:
10931080
"""Close cursor"""
10941081
self.open = False
1095-
1096-
# Close active operation handle if it exists
1097-
if self.active_command_id:
1098-
try:
1099-
self.backend.close_command(self.active_command_id)
1100-
except RequestError as e:
1101-
if isinstance(e.args[1], CursorAlreadyClosedError):
1102-
logger.info("Operation was canceled by a prior request")
1103-
else:
1104-
logging.warning(f"Error closing operation handle: {e}")
1105-
except Exception as e:
1106-
logging.warning(f"Error closing operation handle: {e}")
1107-
finally:
1108-
self.active_command_id = None
1109-
1082+
self.active_op_handle = None
11101083
if self.active_result_set:
11111084
self._close_and_clear_active_result_set()
11121085

tests/e2e/common/large_queries_mixin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ def test_query_with_large_narrow_result_set(self):
8383
assert row[0] == row_id
8484

8585
def test_long_running_query(self):
86-
"""Incrementally increase query size until it takes at least 5 minutes,
86+
"""Incrementally increase query size until it takes at least 3 minutes,
8787
and asserts that the query completes successfully.
8888
"""
8989
minutes = 60
90-
min_duration = 5 * minutes
90+
min_duration = 3 * minutes
9191

9292
duration = -1
9393
scale0 = 10000
@@ -113,5 +113,5 @@ def test_long_running_query(self):
113113
duration = time.time() - start
114114
current_fraction = duration / min_duration
115115
print("Took {} s with scale factor={}".format(duration, scale_factor))
116-
# Extrapolate linearly to reach 5 min and add 50% padding to push over the limit
116+
# Extrapolate linearly to reach 3 min and add 50% padding to push over the limit
117117
scale_factor = math.ceil(1.5 * scale_factor / current_fraction)

tests/e2e/common/staging_ingestion_tests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user):
4646
) as conn:
4747

4848
cursor = conn.cursor()
49-
query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' OVERWRITE"
49+
query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/16/file1.csv' OVERWRITE"
5050
cursor.execute(query)
5151

5252
# GET should succeed
@@ -57,7 +57,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user):
5757
extra_params={"staging_allowed_local_path": new_temp_path}
5858
) as conn:
5959
cursor = conn.cursor()
60-
query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'"
60+
query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/16/file1.csv' TO '{new_temp_path}'"
6161
cursor.execute(query)
6262

6363
with open(new_fh, "rb") as fp:
@@ -67,7 +67,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user):
6767

6868
# REMOVE should succeed
6969

70-
remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv'"
70+
remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/16/file1.csv'"
7171

7272
with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn:
7373
cursor = conn.cursor()
@@ -79,7 +79,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user):
7979
Error, match="Staging operation over HTTP was unsuccessful: 404"
8080
):
8181
cursor = conn.cursor()
82-
query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'"
82+
query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/16/file1.csv' TO '{new_temp_path}'"
8383
cursor.execute(query)
8484

8585
os.remove(temp_path)

tests/e2e/test_driver.py

Lines changed: 1 addition & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5353

54-
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
54+
from databricks.sql.exc import SessionAlreadyClosedError
5555

5656
log = logging.getLogger(__name__)
5757

@@ -809,142 +809,6 @@ def test_catalogs_returns_arrow_table(self):
809809
results = cursor.fetchall_arrow()
810810
assert isinstance(results, pyarrow.Table)
811811

812-
def test_close_connection_closes_cursors(self):
813-
814-
from databricks.sql.thrift_api.TCLIService import ttypes
815-
816-
with self.connection() as conn:
817-
cursor = conn.cursor()
818-
cursor.execute(
819-
"SELECT id, id `id2`, id `id3` FROM RANGE(1000000) order by RANDOM()"
820-
)
821-
ars = cursor.active_result_set
822-
823-
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
824-
# Cursor op state should be open before connection is closed
825-
status_request = ttypes.TGetOperationStatusReq(
826-
operationHandle=ars.command_id.to_thrift_handle(),
827-
getProgressUpdate=False,
828-
)
829-
op_status_at_server = ars.backend._client.GetOperationStatus(status_request)
830-
assert op_status_at_server.operationState != CommandState.CLOSED
831-
832-
conn.close()
833-
834-
# When connection closes, any cursor operations should no longer exist at the server
835-
with pytest.raises(SessionAlreadyClosedError) as cm:
836-
op_status_at_server = ars.backend._client.GetOperationStatus(
837-
status_request
838-
)
839-
840-
def test_closing_a_closed_connection_doesnt_fail(self, caplog):
841-
caplog.set_level(logging.DEBUG)
842-
# Second .close() call is when this context manager exits
843-
with self.connection() as conn:
844-
# First .close() call is explicit here
845-
conn.close()
846-
assert "Session appears to have been closed already" in caplog.text
847-
848-
conn = None
849-
try:
850-
with pytest.raises(KeyboardInterrupt):
851-
with self.connection() as c:
852-
conn = c
853-
raise KeyboardInterrupt("Simulated interrupt")
854-
finally:
855-
if conn is not None:
856-
assert (
857-
not conn.open
858-
), "Connection should be closed after KeyboardInterrupt"
859-
860-
def test_cursor_close_properly_closes_operation(self):
861-
"""Test that Cursor.close() properly closes the active operation handle on the server."""
862-
with self.connection() as conn:
863-
cursor = conn.cursor()
864-
try:
865-
cursor.execute("SELECT 1 AS test")
866-
assert cursor.active_command_id is not None
867-
cursor.close()
868-
assert cursor.active_command_id is None
869-
assert not cursor.open
870-
finally:
871-
if cursor.open:
872-
cursor.close()
873-
874-
conn = None
875-
cursor = None
876-
try:
877-
with self.connection() as c:
878-
conn = c
879-
with pytest.raises(KeyboardInterrupt):
880-
with conn.cursor() as cur:
881-
cursor = cur
882-
raise KeyboardInterrupt("Simulated interrupt")
883-
finally:
884-
if cursor is not None:
885-
assert (
886-
not cursor.open
887-
), "Cursor should be closed after KeyboardInterrupt"
888-
889-
def test_nested_cursor_context_managers(self):
890-
"""Test that nested cursor context managers properly close operations on the server."""
891-
with self.connection() as conn:
892-
with conn.cursor() as cursor1:
893-
cursor1.execute("SELECT 1 AS test1")
894-
assert cursor1.active_command_id is not None
895-
896-
with conn.cursor() as cursor2:
897-
cursor2.execute("SELECT 2 AS test2")
898-
assert cursor2.active_command_id is not None
899-
900-
# After inner context manager exit, cursor2 should be not open
901-
assert not cursor2.open
902-
assert cursor2.active_command_id is None
903-
904-
# After outer context manager exit, cursor1 should be not open
905-
assert not cursor1.open
906-
assert cursor1.active_command_id is None
907-
908-
def test_cursor_error_handling(self):
909-
"""Test that cursor close handles errors properly to prevent orphaned operations."""
910-
with self.connection() as conn:
911-
cursor = conn.cursor()
912-
913-
cursor.execute("SELECT 1 AS test")
914-
915-
op_handle = cursor.active_command_id
916-
917-
assert op_handle is not None
918-
919-
# Manually close the operation to simulate server-side closure
920-
conn.session.backend.close_command(op_handle)
921-
922-
cursor.close()
923-
924-
assert not cursor.open
925-
926-
def test_result_set_close(self):
927-
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
928-
with self.connection() as conn:
929-
cursor = conn.cursor()
930-
try:
931-
cursor.execute("SELECT * FROM RANGE(10)")
932-
933-
result_set = cursor.active_result_set
934-
assert result_set is not None
935-
936-
initial_op_state = result_set.op_state
937-
938-
result_set.close()
939-
940-
assert result_set.op_state == CommandState.CLOSED
941-
assert result_set.op_state != initial_op_state
942-
943-
# Closing the result set again should be a no-op and not raise exceptions
944-
result_set.close()
945-
finally:
946-
cursor.close()
947-
948812

949813
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
950814
# the 429/503 subsuites separate since they execute under different circumstances.

tests/unit/test_client.py

Lines changed: 13 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import databricks.sql
2424
import databricks.sql.client as client
2525
from databricks.sql import InterfaceError, DatabaseError, Error, NotSupportedError
26-
from databricks.sql.exc import RequestError, CursorAlreadyClosedError
2726
from databricks.sql.types import Row
2827
from databricks.sql.result_set import ResultSet, ThriftResultSet
2928
from databricks.sql.backend.types import CommandId, CommandState
@@ -261,15 +260,20 @@ def test_context_manager_closes_cursor(self):
261260
cursor.close = mock_close
262261
mock_close.assert_called_once_with()
263262

264-
cursor = client.Cursor(Mock(), Mock())
265-
cursor.close = Mock()
263+
@patch("%s.client.ThriftBackend" % PACKAGE_NAME)
264+
def test_context_manager_closes_connection(self, mock_client_class):
265+
instance = mock_client_class.return_value
266266

267-
try:
268-
with self.assertRaises(KeyboardInterrupt):
269-
with cursor:
270-
raise KeyboardInterrupt("Simulated interrupt")
271-
finally:
272-
cursor.close.assert_called()
267+
mock_open_session_resp = MagicMock(spec=TOpenSessionResp)()
268+
mock_open_session_resp.sessionHandle.sessionId = b"\x22"
269+
instance.open_session.return_value = mock_open_session_resp
270+
271+
with databricks.sql.connect(**self.DUMMY_CONNECTION_ARGS) as connection:
272+
pass
273+
274+
# Check the close session request has an id of x22
275+
close_session_id = instance.close_session.call_args[0][0].sessionId
276+
self.assertEqual(close_session_id, b"\x22")
273277

274278
def dict_product(self, dicts):
275279
"""
@@ -599,42 +603,6 @@ def test_access_current_query_id(self):
599603
cursor.close()
600604
self.assertIsNone(cursor.query_id)
601605

602-
def test_cursor_close_handles_exception(self):
603-
"""Test that Cursor.close() handles exceptions from close_command properly."""
604-
mock_backend = Mock()
605-
mock_connection = Mock()
606-
mock_command_id = Mock()
607-
608-
mock_backend.close_command.side_effect = Exception("Test error")
609-
610-
cursor = client.Cursor(mock_connection, mock_backend)
611-
cursor.active_command_id = mock_command_id
612-
613-
cursor.close()
614-
615-
mock_backend.close_command.assert_called_once_with(mock_command_id)
616-
617-
self.assertIsNone(cursor.active_command_id)
618-
619-
self.assertFalse(cursor.open)
620-
621-
def test_cursor_context_manager_handles_exit_exception(self):
622-
"""Test that cursor's context manager handles exceptions during __exit__."""
623-
mock_backend = Mock()
624-
mock_connection = Mock()
625-
626-
cursor = client.Cursor(mock_connection, mock_backend)
627-
original_close = cursor.close
628-
cursor.close = Mock(side_effect=Exception("Test error during close"))
629-
630-
try:
631-
with cursor:
632-
raise ValueError("Test error inside context")
633-
except ValueError:
634-
pass
635-
636-
cursor.close.assert_called_once()
637-
638606
def test_connection_close_handles_cursor_close_exception(self):
639607
"""Test that _close handles exceptions from cursor.close() properly."""
640608
cursors_closed = []
@@ -670,49 +638,6 @@ def mock_close_normal():
670638
cursors_closed, [1, 2], "Both cursors should have close called"
671639
)
672640

673-
def test_resultset_close_handles_cursor_already_closed_error(self):
674-
"""Test that ResultSet.close() handles CursorAlreadyClosedError properly."""
675-
result_set = client.ThriftResultSet.__new__(client.ThriftResultSet)
676-
result_set.backend = Mock()
677-
result_set.backend.CLOSED_OP_STATE = "CLOSED"
678-
result_set.connection = Mock()
679-
result_set.connection.open = True
680-
result_set.op_state = "RUNNING"
681-
result_set.has_been_closed_server_side = False
682-
result_set.command_id = Mock()
683-
684-
class MockRequestError(Exception):
685-
def __init__(self):
686-
self.args = ["Error message", CursorAlreadyClosedError()]
687-
688-
result_set.backend.close_command.side_effect = MockRequestError()
689-
690-
original_close = client.ResultSet.close
691-
try:
692-
try:
693-
if (
694-
result_set.op_state != result_set.backend.CLOSED_OP_STATE
695-
and not result_set.has_been_closed_server_side
696-
and result_set.connection.open
697-
):
698-
result_set.backend.close_command(result_set.command_id)
699-
except MockRequestError as e:
700-
if isinstance(e.args[1], CursorAlreadyClosedError):
701-
pass
702-
finally:
703-
result_set.has_been_closed_server_side = True
704-
result_set.op_state = result_set.backend.CLOSED_OP_STATE
705-
706-
result_set.backend.close_command.assert_called_once_with(
707-
result_set.command_id
708-
)
709-
710-
assert result_set.has_been_closed_server_side is True
711-
712-
assert result_set.op_state == result_set.backend.CLOSED_OP_STATE
713-
finally:
714-
pass
715-
716641

717642
if __name__ == "__main__":
718643
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])

0 commit comments

Comments
 (0)