Skip to content

Commit 287a5c6

Browse files
SNOW-2241405: dbapi ingest as stringtype when came across unsupported type (#3751)
1 parent 6bf28d1 commit 287a5c6

File tree

11 files changed

+69
-52
lines changed

11 files changed

+69
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#### Improvements
2222

23+
- Unsupported types in `DataFrameReader.dbapi`(PuPr) are ingested as `StringType` now.
2324
- Improved error message to list available columns when dataframe cannot resolve given column name.
2425
- Added a new option `cacheResult` to `DataFrameReader.xml` that allows users to cache the result of the XML reader to a temporary table after calling `xml`. It helps improve performance when subsequent operations are performed on the same DataFrame.
2526

src/snowflake/snowpark/_internal/data_source/drivers/databricks_driver.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
VariantType,
1919
TimestampType,
2020
TimestampTimeZone,
21+
StringType,
2122
)
2223

2324
if TYPE_CHECKING:
@@ -57,7 +58,10 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
5758
all_columns = []
5859
for column_name, column_type, _ in schema:
5960
column_type = convert_map_to_use.get(column_type, column_type)
60-
data_type = type_string_to_type_object(column_type)
61+
try:
62+
data_type = type_string_to_type_object(column_type)
63+
except ValueError:
64+
data_type = StringType()
6165
if column_type.lower() == "timestamp":
6266
# by default https://docs.databricks.com/aws/en/sql/language-manual/data-types/timestamp-type
6367
data_type = TimestampType(TimestampTimeZone.LTZ)

src/snowflake/snowpark/_internal/data_source/drivers/oracledb_driver.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
6767
oracledb.DB_TYPE_XMLTYPE: StringType,
6868
oracledb.DB_TYPE_OBJECT: VariantType,
6969
oracledb.DB_TYPE_VECTOR: VectorType,
70-
oracledb.DB_TYPE_CURSOR: None, # NOT SUPPORTED
70+
# oracledb.DB_TYPE_CURSOR: None, # NOT SUPPORTED
7171
}
7272

7373
fields = []
@@ -77,10 +77,7 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
7777
precision = column.precision
7878
scale = column.scale
7979
null_ok = column.null_ok
80-
snow_type = convert_map_to_use.get(type_code, None)
81-
if snow_type is None:
82-
# TODO: SNOW-1912068 support types that we don't have now
83-
raise NotImplementedError(f"oracledb type not supported: {type_code}")
80+
snow_type = convert_map_to_use.get(type_code, StringType)
8481
if type_code == oracledb.DB_TYPE_TIMESTAMP_TZ:
8582
data_type = snow_type(TimestampTimeZone.TZ)
8683
elif type_code == oracledb.DB_TYPE_TIMESTAMP_LTZ:

src/snowflake/snowpark/_internal/data_source/drivers/psycopg2_driver.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,10 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
189189
try:
190190
type_code = Psycopg2TypeCode(type_code)
191191
except ValueError:
192-
raise NotImplementedError(
193-
f"Postgres type not supported: {type_code} for column: {name}"
194-
)
195-
snow_type = BASE_POSTGRES_TYPE_TO_SNOW_TYPE.get(type_code)
196-
if snow_type is None:
197-
raise NotImplementedError(
198-
f"Postgres type not supported: {type_code} for column: {name}"
199-
)
200-
if Psycopg2TypeCode(type_code) == Psycopg2TypeCode.NUMERICOID:
192+
# not supported type is now handled as string type in below code
193+
type_code = None
194+
snow_type = BASE_POSTGRES_TYPE_TO_SNOW_TYPE.get(type_code, StringType)
195+
if type_code == Psycopg2TypeCode.NUMERICOID:
201196
if not self.validate_numeric_precision_scale(precision, scale):
202197
logger.debug(
203198
f"Snowpark does not support column"

src/snowflake/snowpark/_internal/data_source/drivers/pymsql_driver.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,7 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
157157
scale,
158158
null_ok,
159159
) = col
160-
snow_type = BASE_PYMYSQL_TYPE_TO_SNOW_TYPE.get(type_code, None)
161-
if snow_type is None:
162-
raise NotImplementedError(f"mysql type not supported: {type_code}")
160+
snow_type = BASE_PYMYSQL_TYPE_TO_SNOW_TYPE.get(type_code, StringType)
163161
if type_code in (PymysqlTypeCode.DECIMAL, PymysqlTypeCode.NEWDECIMAL):
164162
# we did -2 here because what driver returned is precision + 2, mysql store + 2 precision internally
165163
precision -= 2

src/snowflake/snowpark/_internal/data_source/drivers/pyodbc_driver.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ def to_snow_type(self, schema: List[Any]) -> StructType:
5959
scale,
6060
null_ok,
6161
) = column
62-
snow_type = BASE_PYODBC_TYPE_TO_SNOW_TYPE.get(type_code, None)
63-
if snow_type is None:
64-
raise NotImplementedError(f"sql server type not supported: {type_code}")
62+
snow_type = BASE_PYODBC_TYPE_TO_SNOW_TYPE.get(type_code, StringType)
6563
if type_code in (int, decimal.Decimal):
6664
if not self.validate_numeric_precision_scale(precision, scale):
6765
logger.debug(

tests/integ/datasource/test_databricks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,11 @@ def test_unit_udtf_ingestion():
250250
else:
251251
# Keep other types as is
252252
assert value == expected_row[index]
253+
254+
255+
def test_unsupported_type():
256+
257+
schema = DatabricksDriver(
258+
create_databricks_connection, DBMS_TYPE.DATABRICKS_DB
259+
).to_snow_type([("test_col", "unsupported_type", True)])
260+
assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)])

tests/integ/datasource/test_mysql.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
PymysqlTypeCode,
1515
)
1616
from snowflake.snowpark._internal.data_source.utils import DBMS_TYPE
17+
from snowflake.snowpark.types import StructType, StructField, StringType
1718
from tests.resources.test_data_source_dir.test_mysql_data import (
1819
mysql_real_data,
1920
MysqlType,
@@ -140,14 +141,6 @@ def test_dbapi_batch_fetch(
140141
assert df.order_by("ID").collect() == expected_result
141142

142143

143-
def test_type_conversion():
144-
invalid_type = MysqlType("ID", "UNKNOWN", None, None, None, None, False)
145-
with pytest.raises(NotImplementedError, match="mysql type not supported"):
146-
PymysqlDriver(create_connection_mysql, DBMS_TYPE.MYSQL_DB).to_snow_type(
147-
[invalid_type]
148-
)
149-
150-
151144
def test_pymysql_driver_coverage(caplog):
152145
mysql_driver = PymysqlDriver(create_connection_mysql, DBMS_TYPE.MYSQL_DB)
153146
mysql_driver.to_snow_type(
@@ -296,3 +289,11 @@ def test_server_side_cursor():
296289
assert isinstance(cursor, pymysql.cursors.SSCursor)
297290
cursor.close()
298291
conn.close()
292+
293+
294+
def test_unsupported_type():
295+
296+
schema = PymysqlDriver(create_connection_mysql, DBMS_TYPE.MYSQL_DB).to_snow_type(
297+
[("test_col", "unsupported_type", None, None, 0, 0, True)]
298+
)
299+
assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)])

tests/integ/datasource/test_oracledb.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import math
77
import sys
8+
from collections import namedtuple
89

910
import pytest
1011

@@ -18,6 +19,7 @@
1819
from snowflake.snowpark._internal.data_source.utils import (
1920
DBMS_TYPE,
2021
)
22+
from snowflake.snowpark.types import StructType, StructField, StringType
2123
from tests.parameters import ORACLEDB_CONNECTION_PARAMETERS
2224
from tests.resources.test_data_source_dir.test_data_source_data import (
2325
OracleDBType,
@@ -135,14 +137,6 @@ def test_dbapi_batch_fetch(
135137
assert df.order_by("ID").collect() == expected_result
136138

137139

138-
def test_type_conversion():
139-
invalid_type = OracleDBType("ID", "UNKNOWN", None, None, False)
140-
with pytest.raises(NotImplementedError, match="oracledb type not supported"):
141-
OracledbDriver(create_connection_oracledb, DBMS_TYPE.ORACLE_DB).to_snow_type(
142-
[invalid_type]
143-
)
144-
145-
146140
def test_oracledb_driver_coverage(caplog):
147141
oracledb_driver = OracledbDriver(create_connection_oracledb, DBMS_TYPE.ORACLE_DB)
148142
conn = oracledb_driver.prepare_connection(oracledb_driver.create_connection(), 0)
@@ -242,3 +236,15 @@ def test_double_quoted_column_name_oracledb(session, custom_schema):
242236
)
243237
]
244238
assert df.schema == oracledb_double_quoted_schema
239+
240+
241+
def test_unsupported_type():
242+
invalid_type = OracleDBType("ID", "UNKNOWN", None, None, False)
243+
MockDescription = namedtuple(
244+
"mock_description", ["name", "type_code", "precision", "scale", "null_ok"]
245+
)
246+
247+
schema = OracledbDriver(
248+
create_connection_oracledb, DBMS_TYPE.ORACLE_DB
249+
).to_snow_type([MockDescription("test_col", invalid_type, 0, 0, True)])
250+
assert schema == StructType([StructField("TEST_COL", StringType(), nullable=True)])

tests/integ/datasource/test_postgres.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -369,18 +369,26 @@ def test_unit_psycopg2_driver_to_snow_type_mapping():
369369
assert result.fields[3].datatype.scale == 0
370370

371371
# Test unsupported type code
372-
with pytest.raises(NotImplementedError, match="Postgres type not supported"):
373-
nonexisting_type_code = -1
374-
Psycopg2Driver(create_postgres_connection, DBMS_TYPE.POSTGRES_DB).to_snow_type(
375-
[("UNSUPPORTED_COL", nonexisting_type_code, None, None, None, None, True)]
376-
)
372+
nonexisting_type_code = -1
373+
schema = Psycopg2Driver(
374+
create_postgres_connection, DBMS_TYPE.POSTGRES_DB
375+
).to_snow_type(
376+
[("UNSUPPORTED_COL", nonexisting_type_code, None, None, None, None, True)]
377+
)
378+
assert schema == StructType(
379+
[StructField("UNSUPPORTED_COL", StringType(), nullable=True)]
380+
)
377381

378382
# Test unsupported type code
379-
with pytest.raises(NotImplementedError, match="Postgres type not supported"):
380-
unimplemented_code = Psycopg2TypeCode.ACLITEMOID
381-
Psycopg2Driver(create_postgres_connection, DBMS_TYPE.POSTGRES_DB).to_snow_type(
382-
[("UNSUPPORTED_COL", unimplemented_code, None, None, None, None, True)]
383-
)
383+
unimplemented_code = Psycopg2TypeCode.ACLITEMOID
384+
schema = Psycopg2Driver(
385+
create_postgres_connection, DBMS_TYPE.POSTGRES_DB
386+
).to_snow_type(
387+
[("UNSUPPORTED_COL", unimplemented_code, None, None, None, None, True)]
388+
)
389+
assert schema == StructType(
390+
[StructField("UNSUPPORTED_COL", StringType(), nullable=True)]
391+
)
384392

385393

386394
def test_unit_generate_select_query():

0 commit comments

Comments
 (0)