Skip to content

Commit 53a99ed

Browse files
SNOW-2166197: add support for jdbc (#3591)
1 parent 5987d35 commit 53a99ed

File tree

10 files changed

+1893
-51
lines changed

10 files changed

+1893
-51
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- `credentials`
2121
- `encryption`
2222
- Added support for `Session.directory` and `Session.read.directory` to retrieve the list of all files on a stage with metadata.
23+
- Added support for `DataFrameReader.jdbc`(PrPr) that allows ingesting external data source with jdbc driver.
2324

2425
- Added support for the following scalar functions in `functions.py`:
2526
- `all_user_names`

src/snowflake/snowpark/_internal/data_source/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
33
#
44

5-
__all__ = [
6-
"DataSourceReader",
7-
"DataSourcePartitioner",
8-
]
5+
__all__ = ["DataSourceReader", "DataSourcePartitioner", "JDBC"]
96

107
from snowflake.snowpark._internal.data_source.datasource_reader import DataSourceReader
118
from snowflake.snowpark._internal.data_source.datasource_partitioner import (
129
DataSourcePartitioner,
1310
)
11+
from snowflake.snowpark._internal.data_source.jdbc import JDBC

src/snowflake/snowpark/_internal/data_source/datasource_partitioner.py

Lines changed: 68 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,9 @@ def schema(self) -> StructType:
116116
if self.custom_schema is None:
117117
return auto_infer_schema
118118
else:
119-
if isinstance(self.custom_schema, str):
120-
custom_schema = type_string_to_type_object(self.custom_schema)
121-
if not isinstance(custom_schema, StructType):
122-
raise ValueError(
123-
f"Invalid schema string: {self.custom_schema}. "
124-
f"You should provide a valid schema string representing a struct type."
125-
'For example: "id INTEGER, int_col INTEGER, text_col STRING".'
126-
)
127-
elif isinstance(self.custom_schema, StructType):
128-
custom_schema = self.custom_schema
129-
else:
130-
raise ValueError(
131-
f"Invalid schema type: {type(self.custom_schema)}."
132-
'The schema should be either a valid schema string, for example: "id INTEGER, int_col INTEGER, text_col STRING".'
133-
'or a valid StructType, for example: StructType([StructField("ID", IntegerType(), False)])'
134-
)
119+
custom_schema = DataSourcePartitioner.formatting_custom_schema(
120+
self.custom_schema
121+
)
135122

136123
if not auto_infer_successful:
137124
return custom_schema
@@ -164,63 +151,82 @@ def partitions(self) -> List[str]:
164151
self._query_input_alias,
165152
)
166153
logger.debug(f"Generated select query: {select_query}")
167-
if self.column is None:
154+
155+
return DataSourcePartitioner.generate_partitions(
156+
select_query,
157+
self.schema,
158+
self.predicates,
159+
self.column,
160+
self.lower_bound,
161+
self.upper_bound,
162+
self.num_partitions,
163+
)
164+
165+
@staticmethod
166+
def generate_partitions(
167+
select_query: str,
168+
schema: StructType,
169+
predicates: Optional[List[str]] = None,
170+
column: Optional[str] = None,
171+
lower_bound: Optional[Union[str, int]] = None,
172+
upper_bound: Optional[Union[str, int]] = None,
173+
num_partitions: Optional[int] = None,
174+
):
175+
if column is None:
168176
if (
169-
self.lower_bound is not None
170-
or self.upper_bound is not None
171-
or self.num_partitions is not None
177+
lower_bound is not None
178+
or upper_bound is not None
179+
or num_partitions is not None
172180
):
173181
raise ValueError(
174182
"when column is not specified, lower_bound, upper_bound, num_partitions are expected to be None"
175183
)
176-
if self.predicates is None:
184+
if predicates is None:
177185
partitioned_queries = [select_query]
178186
else:
179-
partitioned_queries = self.generate_partition_with_predicates(
180-
select_query, self.predicates
187+
partitioned_queries = (
188+
DataSourcePartitioner.generate_partition_with_predicates(
189+
select_query, predicates
190+
)
181191
)
182192
else:
183-
if (
184-
self.lower_bound is None
185-
or self.upper_bound is None
186-
or self.num_partitions is None
187-
):
193+
if lower_bound is None or upper_bound is None or num_partitions is None:
188194
raise ValueError(
189195
"when column is specified, lower_bound, upper_bound, num_partitions must be specified"
190196
)
191197

192198
column_type = None
193-
for field in self.schema.fields:
199+
for field in schema.fields:
194200
col = (
195-
self.column
196-
if self.column[0] == '"' and self.column[-1] == '"'
197-
else self.column.upper()
201+
column if column[0] == '"' and column[-1] == '"' else column.upper()
198202
)
199203
if field.name == col:
200204
column_type = field.datatype
201205
break
202206
if column_type is None:
203-
raise ValueError(f"Specified column {self.column} does not exist")
207+
raise ValueError(f"Specified column {column} does not exist")
204208

205209
if not isinstance(column_type, (_NumericType, DateType)):
206210
raise ValueError(
207211
f"unsupported type {column_type}, column must be a numeric type like int and float, or date type"
208212
)
209-
partitioned_queries = self.generate_partition(
210-
select_query,
211-
column_type,
212-
self.column,
213-
self.lower_bound,
214-
self.upper_bound,
215-
self.num_partitions,
213+
partitioned_queries = (
214+
DataSourcePartitioner.generate_partition_with_column_name(
215+
select_query,
216+
column_type,
217+
column,
218+
lower_bound,
219+
upper_bound,
220+
num_partitions,
221+
)
216222
)
217223
return partitioned_queries
218224

219225
@staticmethod
220-
def generate_partition(
226+
def generate_partition_with_column_name(
221227
select_query: str,
222228
column_type: DataType,
223-
column: Optional[str] = None,
229+
column: str,
224230
lower_bound: Optional[Union[str, int]] = None,
225231
upper_bound: Optional[Union[str, int]] = None,
226232
num_partitions: Optional[int] = None,
@@ -297,6 +303,26 @@ def generate_partition_with_predicates(
297303
) -> List[str]:
298304
return [select_query + f" WHERE {predicate}" for predicate in predicates]
299305

306+
@staticmethod
307+
def formatting_custom_schema(custom_schema: Union[str, StructType]) -> StructType:
308+
if isinstance(custom_schema, str):
309+
schema = type_string_to_type_object(custom_schema)
310+
if not isinstance(schema, StructType):
311+
raise ValueError(
312+
f"Invalid schema string: {custom_schema}. "
313+
f"You should provide a valid schema string representing a struct type."
314+
'For example: "id INTEGER, int_col INTEGER, text_col STRING".'
315+
)
316+
elif isinstance(custom_schema, StructType):
317+
schema = custom_schema
318+
else:
319+
raise ValueError(
320+
f"Invalid schema type: {type(custom_schema)}."
321+
'The schema should be either a valid schema string, for example: "id INTEGER, int_col INTEGER, text_col STRING".'
322+
'or a valid StructType, for example: StructType([StructField("ID", IntegerType(), False)])'
323+
)
324+
return schema
325+
300326

301327
def to_internal_value(value: Union[int, str, float], column_type: DataType):
302328
if isinstance(column_type, _NumericType):

0 commit comments

Comments
 (0)