Skip to content

Commit af4e258

Browse files
fix: Snowflake Destination schema example and vector type embeddings (#502)
1. Fixes table schema example 2. Fixes issue with embeddings column when using vector type 3. Fixes issue with dropping/adding case insensitive column names
1 parent 5e85748 commit af4e258

File tree

7 files changed

+265
-35
lines changed

7 files changed

+265
-35
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
## 1.0.27
2+
3+
### Fixes
4+
5+
* **Fix table schema example for Snowflake Destination connector**
6+
* **Fix Snowflake Destination issue with dropping/removing case insensitive column names when populating the table**
7+
* **Fix Snowflake Destination issue with `embeddings` column when using `VECTOR` type**
8+
19
## 1.0.26
210

311
* **Fix Notion connector error with FileIcons**

test/integration/connectors/env_setup/sql/snowflake/destination/snowflake-schema.sql

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,46 @@
11
CREATE TABLE elements (
2-
id UUID PRIMARY KEY,
2+
id VARCHAR(36) PRIMARY KEY NOT NULL DEFAULT UUID_STRING(),
33
record_id VARCHAR,
44
element_id VARCHAR,
5-
text TEXT,
6-
embeddings ARRAY(FLOAT),
5+
text VARCHAR,
6+
embeddings VECTOR(FLOAT, 1536),
77
type VARCHAR,
88
system VARCHAR,
99
layout_width DECIMAL,
1010
layout_height DECIMAL,
11-
points TEXT,
12-
url TEXT,
11+
points VARCHAR,
12+
url VARCHAR,
1313
version VARCHAR,
1414
date_created TIMESTAMP_TZ,
1515
date_modified TIMESTAMP_TZ,
1616
date_processed TIMESTAMP_TZ,
17-
permissions_data TEXT,
18-
record_locator TEXT,
17+
permissions_data VARCHAR,
18+
record_locator VARCHAR,
1919
category_depth INTEGER,
2020
parent_id VARCHAR,
2121
attached_filename VARCHAR,
2222
filetype VARCHAR,
2323
last_modified TIMESTAMP_TZ,
2424
file_directory VARCHAR,
2525
filename VARCHAR,
26-
languages ARRAY(VARCHAR),
26+
languages ARRAY,
2727
page_number VARCHAR,
28-
links TEXT,
28+
links VARCHAR,
2929
page_name VARCHAR,
30-
link_urls ARRAY(VARCHAR),
31-
link_texts ARRAY(VARCHAR),
32-
sent_from ARRAY(VARCHAR),
33-
sent_to ARRAY(VARCHAR),
30+
link_urls ARRAY,
31+
link_texts ARRAY,
32+
sent_from ARRAY,
33+
sent_to ARRAY,
3434
subject VARCHAR,
3535
section VARCHAR,
3636
header_footer_type VARCHAR,
37-
emphasized_text_contents ARRAY(VARCHAR),
38-
emphasized_text_tags ARRAY(VARCHAR),
39-
text_as_html TEXT,
40-
regex_metadata TEXT,
41-
detection_class_prob DECIMAL
37+
emphasized_text_contents ARRAY,
38+
emphasized_text_tags ARRAY,
39+
text_as_html VARCHAR,
40+
regex_metadata VARCHAR,
41+
detection_class_prob DECIMAL,
42+
image_base64 VARCHAR,
43+
image_mime_type VARCHAR,
44+
orig_elements VARCHAR,
45+
is_continuation BOOLEAN
4246
);
43-
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from unittest.mock import MagicMock
2+
3+
import pytest
4+
from pydantic import Secret
5+
from pytest_mock import MockerFixture
6+
7+
from unstructured_ingest.processes.connectors.sql.snowflake import (
8+
SnowflakeAccessConfig,
9+
SnowflakeConnectionConfig,
10+
SnowflakeUploader,
11+
)
12+
13+
14+
@pytest.fixture
15+
def snowflake_access_config():
16+
return SnowflakeAccessConfig(
17+
password="password",
18+
)
19+
20+
21+
@pytest.fixture
22+
def snowflake_connection_config(snowflake_access_config: SnowflakeAccessConfig):
23+
return SnowflakeConnectionConfig(
24+
account="account",
25+
user="user",
26+
host="host",
27+
port=1234,
28+
database="database",
29+
schema="schema",
30+
role="role",
31+
access_config=Secret(snowflake_access_config),
32+
)
33+
34+
35+
@pytest.fixture
36+
def snowflake_uploader(snowflake_connection_config: SnowflakeConnectionConfig):
37+
uploader = SnowflakeUploader(
38+
connection_config=snowflake_connection_config,
39+
upload_config=MagicMock(table_name="test_table"),
40+
)
41+
uploader._embeddings_dimension = None
42+
return uploader
43+
44+
45+
@pytest.fixture
46+
def mock_cursor(mocker: MockerFixture):
47+
return mocker.MagicMock()
48+
49+
50+
@pytest.fixture
51+
def mock_get_cursor(mocker: MockerFixture, mock_cursor: MagicMock):
52+
mock = mocker.patch(
53+
"unstructured_ingest.processes.connectors.sql.snowflake.SnowflakeConnectionConfig.get_cursor",
54+
autospec=True,
55+
)
56+
mock.return_value.__enter__.return_value = mock_cursor
57+
return mock
58+
59+
60+
def test_embeddings_dimension_vector_column(
61+
mock_cursor: MagicMock, snowflake_uploader: SnowflakeUploader, mock_get_cursor: MagicMock
62+
):
63+
mock_cursor.execute.return_value.fetchone.return_value = (
64+
"",
65+
"",
66+
"",
67+
'{"type": "VECTOR", "dimension": 128}',
68+
)
69+
70+
dimension = snowflake_uploader.embeddings_dimension
71+
72+
assert dimension == 128
73+
mock_cursor.execute.assert_called_once_with("SHOW COLUMNS LIKE 'embeddings' IN test_table")
74+
75+
76+
def test_embeddings_dimension_vector_column_dict(
77+
mock_cursor: MagicMock, snowflake_uploader: SnowflakeUploader, mock_get_cursor: MagicMock
78+
):
79+
mock_cursor.execute.return_value.fetchone.return_value = {
80+
"col_1": "test_1",
81+
"col_2": "test_2",
82+
"data_type": '{"type": "VECTOR", "dimension": 64}',
83+
}
84+
85+
dimension = snowflake_uploader.embeddings_dimension
86+
87+
assert dimension == 64
88+
mock_cursor.execute.assert_called_once_with("SHOW COLUMNS LIKE 'embeddings' IN test_table")
89+
90+
91+
def test_embeddings_dimension_no_column(
92+
mock_cursor: MagicMock, snowflake_uploader: SnowflakeUploader, mock_get_cursor: MagicMock
93+
):
94+
mock_cursor.execute.return_value.fetchone.return_value = None
95+
96+
dimension = snowflake_uploader.embeddings_dimension
97+
98+
assert dimension == 0
99+
mock_cursor.execute.assert_called_once_with("SHOW COLUMNS LIKE 'embeddings' IN test_table")
100+
101+
102+
def test_embeddings_dimension_non_vector_column(
103+
mock_cursor: MagicMock, snowflake_uploader: SnowflakeUploader, mock_get_cursor: MagicMock
104+
):
105+
mock_cursor.execute.return_value.fetchone.return_value = ("", "", "", '{"type": "STRING"}')
106+
107+
dimension = snowflake_uploader.embeddings_dimension
108+
109+
assert dimension == 0
110+
mock_cursor.execute.assert_called_once_with("SHOW COLUMNS LIKE 'embeddings' IN test_table")
111+
112+
113+
def test_parse_values_with_vector_columns(snowflake_uploader: SnowflakeUploader):
114+
snowflake_uploader._embeddings_dimension = 128
115+
columns = ["embeddings", "languages", "other_column"]
116+
117+
result = snowflake_uploader._parse_values(columns)
118+
119+
expected_result = "PARSE_JSON(?)::VECTOR(FLOAT,128),PARSE_JSON(?),?"
120+
assert result == expected_result
121+
122+
123+
def test_parse_values_with_vector_columns_embedding_dimension_zero(
124+
snowflake_uploader: SnowflakeUploader,
125+
):
126+
snowflake_uploader._embeddings_dimension = 0
127+
columns = ["embeddings", "languages", "other_column"]
128+
129+
result = snowflake_uploader._parse_values(columns)
130+
131+
expected_result = "PARSE_JSON(?),PARSE_JSON(?),?"
132+
assert result == expected_result
133+
134+
135+
def test_parse_values_with_vector_columns_embedding_dimension_none(
136+
mocker: MockerFixture, snowflake_uploader: SnowflakeUploader
137+
):
138+
mock_embeddings_dimension = mocker.PropertyMock
139+
mock_embeddings_dimension.return_value = None
140+
mocker.patch(
141+
"unstructured_ingest.processes.connectors.sql.snowflake.SnowflakeUploader.embeddings_dimension",
142+
new_callable=mock_embeddings_dimension,
143+
)
144+
columns = ["embeddings", "languages", "other_column"]
145+
146+
result = snowflake_uploader._parse_values(columns)
147+
148+
expected_result = "PARSE_JSON(?),PARSE_JSON(?),?"
149+
assert result == expected_result

test/unit/connectors/sql/test_sql.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,39 @@ def test_fit_to_schema_no_add_missing_columns(mocker: MockerFixture, mock_upload
150150

151151
assert "col2" not in result.columns
152152
assert "col1" in result.columns
153+
154+
155+
def test_fit_to_schema_case_sensitive(mocker: MockerFixture, mock_uploader: SQLUploader):
156+
df = pd.DataFrame(
157+
{
158+
"col1": [1, 2],
159+
"col2": [3, 4],
160+
"col3": [5, 6],
161+
}
162+
)
163+
mocker.patch.object(mock_uploader, "get_table_columns", return_value=["COL1", "COL2", "col3"])
164+
165+
result = mock_uploader._fit_to_schema(df)
166+
167+
assert "col1" not in result.columns
168+
assert "col2" not in result.columns
169+
assert "col3" in result.columns
170+
assert "COL1" in result.columns
171+
assert "COL2" in result.columns
172+
173+
174+
def test_fit_to_schema_not_case_sensitive(mocker: MockerFixture, mock_uploader: SQLUploader):
175+
df = pd.DataFrame(
176+
{
177+
"col1": [1, 2],
178+
"col2": [3, 4],
179+
"col3": [5, 6],
180+
}
181+
)
182+
mocker.patch.object(mock_uploader, "get_table_columns", return_value=["COL1", "COL2"])
183+
184+
result = mock_uploader._fit_to_schema(df, add_missing_columns=False, case_sensitive=False)
185+
186+
assert "col3" not in result.columns
187+
assert "col1" in result.columns
188+
assert "col2" in result.columns

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.26" # pragma: no cover
1+
__version__ = "1.0.27" # pragma: no cover

unstructured_ingest/processes/connectors/sql/snowflake.py

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737

3838
CONNECTOR_TYPE = "snowflake"
3939

40+
EMBEDDINGS_COLUMN = "embeddings"
4041
_ARRAY_COLUMNS = (
41-
"embeddings",
4242
"languages",
4343
"link_urls",
4444
"link_texts",
@@ -47,6 +47,7 @@
4747
"emphasized_text_contents",
4848
"emphasized_text_tags",
4949
)
50+
_VECTOR_COLUMNS = (EMBEDDINGS_COLUMN,)
5051

5152

5253
class SnowflakeAccessConfig(SQLAccessConfig):
@@ -174,6 +175,33 @@ class SnowflakeUploader(SQLUploader):
174175
connector_type: str = CONNECTOR_TYPE
175176
values_delimiter: str = "?"
176177

178+
_embeddings_dimension: Optional[int] = None
179+
180+
@property
181+
def embeddings_dimension(self) -> Optional[int]:
182+
"""
183+
Get the dimension of the embeddings column in the Snowflake table.
184+
If the column is not present or is not of type VECTOR, returns None.
185+
"""
186+
if self._embeddings_dimension is None:
187+
with self.connection_config.get_cursor() as cursor:
188+
embeddings_column = cursor.execute(
189+
f"SHOW COLUMNS LIKE '{EMBEDDINGS_COLUMN}' IN {self.upload_config.table_name}"
190+
).fetchone()
191+
if embeddings_column:
192+
data_type = {}
193+
if isinstance(embeddings_column, dict):
194+
data_type = json.loads(embeddings_column.get("data_type", "{}"))
195+
elif isinstance(embeddings_column, tuple):
196+
data_type = json.loads(embeddings_column[3] or "{}")
197+
if isinstance(data_type, dict) and data_type.get("type") == "VECTOR":
198+
self._embeddings_dimension = data_type.get("dimension")
199+
# If the _embeddings_dimension is still None, it means the column
200+
# is not present or not a VECTOR type
201+
if self._embeddings_dimension is None:
202+
self._embeddings_dimension = 0
203+
return self._embeddings_dimension
204+
177205
@requires_dependencies(["pandas"], extras="snowflake")
178206
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
179207
super().run(path=path, file_data=file_data, **kwargs)
@@ -193,7 +221,7 @@ def prepare_data(
193221
parsed.append(None)
194222
else:
195223
parsed.append(parse_date_string(value))
196-
elif column_name in _ARRAY_COLUMNS:
224+
elif column_name in _ARRAY_COLUMNS or column_name in _VECTOR_COLUMNS:
197225
if not isinstance(value, list) and (
198226
value is None or pd.isna(value)
199227
): # pandas is nan
@@ -206,16 +234,18 @@ def prepare_data(
206234
return output
207235

208236
def _parse_values(self, columns: list[str]) -> str:
209-
return ",".join(
210-
[
211-
(
212-
f"PARSE_JSON({self.values_delimiter})"
213-
if col in _ARRAY_COLUMNS
214-
else self.values_delimiter
237+
embeddings_dimension = self.embeddings_dimension
238+
parsed_values = []
239+
for col in columns:
240+
if col in _VECTOR_COLUMNS and embeddings_dimension:
241+
parsed_values.append(
242+
f"PARSE_JSON({self.values_delimiter})::VECTOR(FLOAT,{embeddings_dimension})"
215243
)
216-
for col in columns
217-
]
218-
)
244+
elif col in _ARRAY_COLUMNS or col in _VECTOR_COLUMNS:
245+
parsed_values.append(f"PARSE_JSON({self.values_delimiter})")
246+
else:
247+
parsed_values.append(self.values_delimiter)
248+
return ",".join(parsed_values)
219249

220250
def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
221251
import numpy as np
@@ -228,8 +258,8 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
228258
f"record id column "
229259
f"{self.upload_config.record_id_key}, skipping delete"
230260
)
261+
df = self._fit_to_schema(df=df, add_missing_columns=True, case_sensitive=False)
231262
df.replace({np.nan: None}, inplace=True)
232-
self._fit_to_schema(df=df)
233263

234264
columns = list(df.columns)
235265
stmt = "INSERT INTO {table_name} ({columns}) SELECT {values}".format(

unstructured_ingest/processes/connectors/sql/sql.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,16 @@ def prepare_data(
339339
output.append(tuple(parsed))
340340
return output
341341

342-
def _fit_to_schema(self, df: "DataFrame", add_missing_columns: bool = True) -> "DataFrame":
342+
def _fit_to_schema(
343+
self, df: "DataFrame", add_missing_columns: bool = True, case_sensitive: bool = True
344+
) -> "DataFrame":
343345
import pandas as pd
344346

345347
table_columns = self.get_table_columns()
346-
columns = set(df.columns)
347-
schema_fields = set(table_columns)
348+
columns = set(df.columns if case_sensitive else df.columns.str.lower())
349+
schema_fields = set(
350+
table_columns if case_sensitive else {col.lower() for col in table_columns}
351+
)
348352
columns_to_drop = columns - schema_fields
349353
missing_columns = schema_fields - columns
350354

0 commit comments

Comments
 (0)