Skip to content

Commit 4ce32b0

Browse files
feat(UNTRACKED): snowflake uploader executemany (#599)
1 parent 05c7e51 commit 4ce32b0

File tree

4 files changed

+24
-21
lines changed

4 files changed

+24
-21
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.2.17
2+
3+
* **Enhancement: Use a single `executemany` instead of per element `execute` in Snowflake Uploader**
4+
15
## 1.2.16
26

37
* **Fix: Catch databricks client auth errors that were being missed**

test/unit/connectors/sql/test_snowflake.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,29 +110,29 @@ def test_embeddings_dimension_non_vector_column(
110110
mock_cursor.execute.assert_called_once_with("SHOW COLUMNS LIKE 'embeddings' IN test_table")
111111

112112

113-
def test_parse_values_with_vector_columns(snowflake_uploader: SnowflakeUploader):
113+
def test_parse_select_with_vector_columns(snowflake_uploader: SnowflakeUploader):
114114
snowflake_uploader._embeddings_dimension = 128
115115
columns = ["embeddings", "languages", "other_column"]
116116

117-
result = snowflake_uploader._parse_values(columns)
117+
result = snowflake_uploader._parse_select(columns)
118118

119-
expected_result = "PARSE_JSON(?)::VECTOR(FLOAT,128),PARSE_JSON(?),?"
119+
expected_result = "PARSE_JSON($1)::VECTOR(FLOAT,128),PARSE_JSON($2),$3"
120120
assert result == expected_result
121121

122122

123-
def test_parse_values_with_vector_columns_embedding_dimension_zero(
123+
def test_parse_select_with_vector_columns_embedding_dimension_zero(
124124
snowflake_uploader: SnowflakeUploader,
125125
):
126126
snowflake_uploader._embeddings_dimension = 0
127127
columns = ["embeddings", "languages", "other_column"]
128128

129-
result = snowflake_uploader._parse_values(columns)
129+
result = snowflake_uploader._parse_select(columns)
130130

131-
expected_result = "PARSE_JSON(?),PARSE_JSON(?),?"
131+
expected_result = "PARSE_JSON($1),PARSE_JSON($2),$3"
132132
assert result == expected_result
133133

134134

135-
def test_parse_values_with_vector_columns_embedding_dimension_none(
135+
def test_parse_select_with_vector_columns_embedding_dimension_none(
136136
mocker: MockerFixture, snowflake_uploader: SnowflakeUploader
137137
):
138138
mock_embeddings_dimension = mocker.PropertyMock
@@ -143,7 +143,7 @@ def test_parse_values_with_vector_columns_embedding_dimension_none(
143143
)
144144
columns = ["embeddings", "languages", "other_column"]
145145

146-
result = snowflake_uploader._parse_values(columns)
146+
result = snowflake_uploader._parse_select(columns)
147147

148-
expected_result = "PARSE_JSON(?),PARSE_JSON(?),?"
148+
expected_result = "PARSE_JSON($1),PARSE_JSON($2),$3"
149149
assert result == expected_result

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.2.16" # pragma: no cover
1+
__version__ = "1.2.17" # pragma: no cover

unstructured_ingest/processes/connectors/sql/snowflake.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -233,18 +233,19 @@ def prepare_data(
233233
output.append(tuple(parsed))
234234
return output
235235

236-
def _parse_values(self, columns: list[str]) -> str:
236+
def _parse_select(self, columns: list[str]) -> str:
237237
embeddings_dimension = self.embeddings_dimension
238238
parsed_values = []
239-
for col in columns:
239+
for i, col in enumerate(columns):
240+
argument_selector = f"${i + 1}"
240241
if col in _VECTOR_COLUMNS and embeddings_dimension:
241242
parsed_values.append(
242-
f"PARSE_JSON({self.values_delimiter})::VECTOR(FLOAT,{embeddings_dimension})"
243+
f"PARSE_JSON({argument_selector})::VECTOR(FLOAT,{embeddings_dimension})"
243244
)
244245
elif col in _ARRAY_COLUMNS or col in _VECTOR_COLUMNS:
245-
parsed_values.append(f"PARSE_JSON({self.values_delimiter})")
246+
parsed_values.append(f"PARSE_JSON({argument_selector})")
246247
else:
247-
parsed_values.append(self.values_delimiter)
248+
parsed_values.append(argument_selector)
248249
return ",".join(parsed_values)
249250

250251
def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
@@ -262,10 +263,11 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
262263
df.replace({np.nan: None}, inplace=True)
263264

264265
columns = list(df.columns)
265-
stmt = "INSERT INTO {table_name} ({columns}) SELECT {values}".format(
266+
stmt = "INSERT INTO {table_name} ({columns}) SELECT {select} FROM VALUES ({values})".format(
266267
table_name=self.upload_config.table_name,
267268
columns=",".join(columns),
268-
values=self._parse_values(columns),
269+
select=self._parse_select(columns),
270+
values=",".join([self.values_delimiter for _ in columns]),
269271
)
270272
logger.info(
271273
f"writing a total of {len(df)} elements via"
@@ -276,10 +278,7 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
276278
for rows in split_dataframe(df=df, chunk_size=self.upload_config.batch_size):
277279
with self.connection_config.get_cursor() as cursor:
278280
values = self.prepare_data(columns, tuple(rows.itertuples(index=False, name=None)))
279-
# TODO: executemany break on 'Binding data in type (list) is not supported'
280-
for val in values:
281-
logger.debug(f"running query: {stmt}\nwith values: {val}")
282-
cursor.execute(stmt, val)
281+
cursor.executemany(stmt, values)
283282

284283

285284
snowflake_source_entry = SourceRegistryEntry(

0 commit comments

Comments
 (0)