Skip to content

Commit 31e6c83

Browse files
cleanup
Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 686ade4 commit 31e6c83

File tree

3 files changed

+65
-72
lines changed

3 files changed

+65
-72
lines changed

src/databricks/sql/backend/thrift_backend.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,10 @@ def get_query_state(self, command_id: CommandId) -> CommandState:
913913
poll_resp = self._poll_for_status(thrift_handle)
914914
operation_state = poll_resp.operationState
915915
self._check_command_not_in_error_or_closed_state(thrift_handle, poll_resp)
916-
return CommandState.from_thrift_state(operation_state)
916+
state = CommandState.from_thrift_state(operation_state)
917+
if state is None:
918+
raise ValueError(f"Invalid operation state: {operation_state}")
919+
return state
917920

918921
@staticmethod
919922
def _check_direct_results_for_error(t_spark_direct_results):
@@ -1175,7 +1178,6 @@ def _handle_execute_response(self, resp, cursor):
11751178
execute_response,
11761179
arrow_schema_bytes,
11771180
) = self._results_message_to_execute_response(resp, final_operation_state)
1178-
execute_response.command_id = command_id
11791181
return execute_response, arrow_schema_bytes
11801182

11811183
def _handle_execute_response_async(self, resp, cursor):
@@ -1237,7 +1239,7 @@ def cancel_command(self, command_id: CommandId) -> None:
12371239
if not thrift_handle:
12381240
raise ValueError("Not a valid Thrift command ID")
12391241

1240-
logger.debug("Cancelling command {}".format(command_id.guid))
1242+
logger.debug("Cancelling command {}".format(guid_to_hex_id(command_id.guid)))
12411243
req = ttypes.TCancelOperationReq(thrift_handle)
12421244
self.make_request(self._client.CancelOperation, req)
12431245

src/databricks/sql/backend/types.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,6 @@ def __init__(
285285
backend_type: BackendType,
286286
guid: Any,
287287
secret: Optional[Any] = None,
288-
operation_type: Optional[int] = None,
289-
has_result_set: bool = False,
290-
modified_row_count: Optional[int] = None,
291288
):
292289
"""
293290
Initialize a CommandId.
@@ -296,17 +293,34 @@ def __init__(
296293
backend_type: The type of backend (THRIFT or SEA)
297294
guid: The primary identifier for the command
298295
secret: The secret part of the identifier (only used for Thrift)
299-
operation_type: The operation type (only used for Thrift)
300-
has_result_set: Whether the command has a result set
301-
modified_row_count: The number of rows modified by the command
302296
"""
303297

304298
self.backend_type = backend_type
305299
self.guid = guid
306300
self.secret = secret
307-
self.operation_type = operation_type
308-
self.has_result_set = has_result_set
309-
self.modified_row_count = modified_row_count
301+
302+
303+
def __str__(self) -> str:
304+
"""
305+
Return a string representation of the CommandId.
306+
307+
For SEA backend, returns the guid.
308+
For Thrift backend, returns a format like "guid|secret".
309+
310+
Returns:
311+
A string representation of the command ID
312+
"""
313+
314+
if self.backend_type == BackendType.SEA:
315+
return str(self.guid)
316+
elif self.backend_type == BackendType.THRIFT:
317+
secret_hex = (
318+
guid_to_hex_id(self.secret)
319+
if isinstance(self.secret, bytes)
320+
else str(self.secret)
321+
)
322+
return f"{self.to_hex_guid()}|{secret_hex}"
323+
return str(self.guid)
310324

311325
@classmethod
312326
def from_thrift_handle(cls, operation_handle):
@@ -329,9 +343,6 @@ def from_thrift_handle(cls, operation_handle):
329343
BackendType.THRIFT,
330344
guid_bytes,
331345
secret_bytes,
332-
operation_handle.operationType,
333-
operation_handle.hasResultSet,
334-
operation_handle.modifiedRowCount,
335346
)
336347

337348
@classmethod
@@ -364,9 +375,6 @@ def to_thrift_handle(self):
364375
handle_identifier = ttypes.THandleIdentifier(guid=self.guid, secret=self.secret)
365376
return ttypes.TOperationHandle(
366377
operationId=handle_identifier,
367-
operationType=self.operation_type,
368-
hasResultSet=self.has_result_set,
369-
modifiedRowCount=self.modified_row_count,
370378
)
371379

372380
def to_sea_statement_id(self):

src/databricks/sql/result_set.py

Lines changed: 37 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import time
66
import pandas
77

8+
from databricks.sql.backend.sea.backend import SeaDatabricksClient
9+
810
try:
911
import pyarrow
1012
except ImportError:
@@ -13,6 +15,7 @@
1315
if TYPE_CHECKING:
1416
from databricks.sql.backend.thrift_backend import ThriftDatabricksClient
1517
from databricks.sql.client import Connection
18+
from databricks.sql.backend.databricks_client import DatabricksClient
1619
from databricks.sql.thrift_api.TCLIService import ttypes
1720
from databricks.sql.types import Row
1821
from databricks.sql.exc import Error, RequestError, CursorAlreadyClosedError
@@ -31,21 +34,37 @@ class ResultSet(ABC):
3134

3235
def __init__(
3336
self,
34-
connection,
35-
backend,
37+
connection: "Connection",
38+
backend: "DatabricksClient",
3639
arraysize: int,
3740
buffer_size_bytes: int,
38-
command_id=None,
39-
status=None,
41+
command_id: CommandId,
42+
status: CommandState,
4043
has_been_closed_server_side: bool = False,
4144
has_more_rows: bool = False,
4245
results_queue=None,
4346
description=None,
4447
is_staging_operation: bool = False,
4548
):
46-
"""Initialize the base ResultSet with common properties."""
49+
"""
50+
A ResultSet manages the results of a single command.
51+
52+
Args:
53+
connection: The parent connection
54+
backend: The backend client
55+
arraysize: The max number of rows to fetch at a time (PEP-249)
56+
buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch
57+
command_id: The command ID
58+
status: The command status
59+
has_been_closed_server_side: Whether the command has been closed on the server
60+
has_more_rows: Whether the command has more rows
61+
results_queue: The results queue
62+
description: column description of the results
63+
is_staging_operation: Whether the command is a staging operation
64+
"""
65+
4766
self.connection = connection
48-
self.backend = backend # Store the backend client directly
67+
self.backend = backend
4968
self.arraysize = arraysize
5069
self.buffer_size_bytes = buffer_size_bytes
5170
self._next_row_index = 0
@@ -240,7 +259,7 @@ def _convert_arrow_table(self, table):
240259
res = df.to_numpy(na_value=None, dtype="object")
241260
return [ResultRow(*v) for v in res]
242261

243-
def merge_columnar(self, result1, result2):
262+
def merge_columnar(self, result1, result2) -> "ColumnTable":
244263
"""
245264
Function to merge / combining the columnar results into a single result
246265
:param result1:
@@ -387,12 +406,11 @@ class SeaResultSet(ResultSet):
387406

388407
def __init__(
389408
self,
390-
connection,
391-
sea_client,
409+
connection: "Connection",
410+
execute_response: "ExecuteResponse",
411+
sea_client: "SeaDatabricksClient",
392412
buffer_size_bytes: int = 104857600,
393413
arraysize: int = 10000,
394-
execute_response=None,
395-
sea_response=None,
396414
):
397415
"""
398416
Initialize a SeaResultSet with the response from a SEA query execution.
@@ -402,56 +420,21 @@ def __init__(
402420
sea_client: The SeaDatabricksClient instance for direct access
403421
buffer_size_bytes: Buffer size for fetching results
404422
arraysize: Default number of rows to fetch
405-
execute_response: Response from the execute command (new style)
406-
sea_response: Direct SEA response (legacy style)
423+
execute_response: Response from the execute command
407424
"""
408-
# Handle both initialization styles
409-
if execute_response is not None:
410-
# New style with ExecuteResponse
411-
command_id = execute_response.command_id
412-
status = execute_response.status
413-
has_been_closed_server_side = execute_response.has_been_closed_server_side
414-
has_more_rows = execute_response.has_more_rows
415-
results_queue = execute_response.results_queue
416-
description = execute_response.description
417-
is_staging_operation = execute_response.is_staging_operation
418-
self._response = getattr(execute_response, "sea_response", {})
419-
self.statement_id = command_id.to_sea_statement_id() if command_id else None
420-
elif sea_response is not None:
421-
# Legacy style with direct sea_response
422-
self._response = sea_response
423-
# Extract values from sea_response
424-
command_id = CommandId.from_sea_statement_id(
425-
sea_response.get("statement_id", "")
426-
)
427-
self.statement_id = sea_response.get("statement_id", "")
428-
429-
# Extract status
430-
status_data = sea_response.get("status", {})
431-
status = CommandState.from_sea_state(status_data.get("state", "PENDING"))
432-
433-
# Set defaults for other fields
434-
has_been_closed_server_side = False
435-
has_more_rows = False
436-
results_queue = None
437-
description = None
438-
is_staging_operation = False
439-
else:
440-
raise ValueError("Either execute_response or sea_response must be provided")
441425

442-
# Call parent constructor with common attributes
443426
super().__init__(
444427
connection=connection,
445428
backend=sea_client,
446429
arraysize=arraysize,
447430
buffer_size_bytes=buffer_size_bytes,
448-
command_id=command_id,
449-
status=status,
450-
has_been_closed_server_side=has_been_closed_server_side,
451-
has_more_rows=has_more_rows,
452-
results_queue=results_queue,
453-
description=description,
454-
is_staging_operation=is_staging_operation,
431+
command_id=execute_response.command_id,
432+
status=execute_response.status,
433+
has_been_closed_server_side=execute_response.has_been_closed_server_side,
434+
has_more_rows=execute_response.has_more_rows,
435+
results_queue=execute_response.results_queue,
436+
description=execute_response.description,
437+
is_staging_operation=execute_response.is_staging_operation,
455438
)
456439

457440
def _fill_results_buffer(self):

0 commit comments

Comments
 (0)