Skip to content

Commit b49cea1

Browse files
authored
feat: Snowflake source. fetch MAX in a single query (#5387)
fetch MAX in a single query Signed-off-by: Artem Petrov <[email protected]>
1 parent 0a75696 commit b49cea1

File tree

1 file changed

+33
-28
lines changed

1 file changed

+33
-28
lines changed

sdk/python/feast/infra/offline_stores/snowflake_source.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,38 @@ def get_table_column_names_and_types(
245245
"The following source:\n" + query + "\n ... is empty"
246246
)
247247

248+
high_precision_number_columns = [
249+
col["column_name"]
250+
for col in metadata
251+
if col["type_code"] == 0 and col["scale"] == 0 and col["precision"] > 19
252+
]
253+
254+
if high_precision_number_columns:
255+
max_selects = [
256+
f'MAX("{col}") AS "{col}"' for col in high_precision_number_columns
257+
]
258+
query = (
259+
f"SELECT {', '.join(max_selects)} FROM {self.get_table_query_string()}"
260+
)
261+
262+
with GetSnowflakeConnection(config.offline_store) as conn:
263+
result = execute_snowflake_statement(conn, query).fetch_pandas_all()
264+
265+
for col in high_precision_number_columns:
266+
max_value = result[col].iloc[0]
267+
if max_value is not None:
268+
str_length = len(str(int(max_value)))
269+
for row in metadata:
270+
if row["column_name"] == col:
271+
if str_length <= 9:
272+
row["snowflake_type"] = "NUMBER32"
273+
elif str_length <= 19:
274+
row["snowflake_type"] = "NUMBER64"
275+
else:
276+
raise NotImplementedError(
277+
f"Number in column {col} larger than INT64 is not supported"
278+
)
279+
248280
for row in metadata:
249281
if row["type_code"] == 0:
250282
if row["scale"] == 0:
@@ -253,34 +285,7 @@ def get_table_column_names_and_types(
253285
elif row["precision"] <= 18: # max precision size to ensure INT64
254286
row["snowflake_type"] = "NUMBER64"
255287
else:
256-
column = row["column_name"]
257-
258-
with GetSnowflakeConnection(config.offline_store) as conn:
259-
query = f'SELECT MAX("{column}") AS "{column}" FROM {self.get_table_query_string()}'
260-
result = execute_snowflake_statement(
261-
conn, query
262-
).fetch_pandas_all()
263-
if (
264-
result.dtypes[column].name
265-
in python_int_to_snowflake_type_map
266-
):
267-
row["snowflake_type"] = python_int_to_snowflake_type_map[
268-
result.dtypes[column].name
269-
]
270-
else:
271-
if len(result) > 0:
272-
max_value = result.iloc[0][0]
273-
if max_value is not None and len(str(max_value)) <= 9:
274-
row["snowflake_type"] = "NUMBER32"
275-
continue
276-
elif (
277-
max_value is not None and len(str(max_value)) <= 18
278-
):
279-
row["snowflake_type"] = "NUMBER64"
280-
continue
281-
raise NotImplementedError(
282-
"NaNs or Numbers larger than INT64 are not supported"
283-
)
288+
continue
284289
else:
285290
row["snowflake_type"] = "NUMBERwSCALE"
286291

0 commit comments

Comments
 (0)