Skip to content

Commit 2491652

Browse files
🔀 fix: snowflake destination (#314)
* Fix snowflake destination * Fix sql uploader stager * Snowflake fix; changelog and version update * Rename date_to_string function to date_to_timestamp and fix its returned type * Add SQL Upload Stager conform_dataframe method unit test * Fix failing unit tests * Review changes * Add missing LOCALSTACK_AUTH_TOKEN secret to source and destination integration test workflow job env * Revert "Add missing LOCALSTACK_AUTH_TOKEN secret to source and destination integration test workflow job env" This reverts commit 2a4ead7. * Version bump
1 parent 123718a commit 2491652

File tree

7 files changed

+90
-8
lines changed

7 files changed

+90
-8
lines changed

‎CHANGELOG.md‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
## 0.3.13-dev0
1+
## 0.3.13-dev1
2+
3+
### Fixes
4+
5+
* **Fix Snowflake Uploader error**
6+
* **Fix SQL Uploader Stager timestamp error**
27

38
## 0.3.12
49

‎test/unit/connector/sql/__init__.py‎

Whitespace-only changes.
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import json
2+
3+
import pandas as pd
4+
5+
from unstructured_ingest.v2.processes.connectors.sql.sql import SQLUploadStager
6+
7+
test_element = {
8+
"type": "Text",
9+
"element_id": "cb869d39d5fadad791c50ef57eda8bfb",
10+
"text": "some test text",
11+
"file_directory": "/tmp/files",
12+
"filename": "some-file.pdf",
13+
"languages": ["eng"],
14+
"last_modified": "2024-11-27T15:36:24",
15+
"page_number": 1,
16+
"filetype": "application/pdf",
17+
"url": "s3://some-bucket/some-file.pdf",
18+
"version": "60598d87b05db06b0f13efbbb69b7e99",
19+
"record_locator": {
20+
"protocol": "s3",
21+
"remote_file_path": "s3://some-bucket/",
22+
},
23+
"date_created": "1732718184.0",
24+
"date_modified": "1732718184.0",
25+
"date_processed": "1734625322.9292843",
26+
"points": [
27+
[108.0, 74.15232159999994],
28+
[108.0, 95.0239216],
29+
[505.7402969717998, 95.0239216],
30+
[505.7402969717998, 74.15232159999994],
31+
],
32+
"system": "PixelSpace",
33+
"layout_width": 612,
34+
"layout_height": 792,
35+
"id": "a658ea27-7c64-55b3-9111-941da4688ea8",
36+
"record_id": "91c26667-5e97-5dc6-9252-cc54ec6c5cc6",
37+
"permissions_data": {"read": True, "write": False},
38+
"regex_metadata": "some regex metadata",
39+
"parent_id": "91c26667-5e97-5dc6-9252-cc54ec6c5cc6",
40+
"links": ["https://example.com"],
41+
}
42+
stager = SQLUploadStager()
43+
44+
45+
def test_sql_upload_stager_conform_dataframe_dates():
46+
df = pd.DataFrame(data=[test_element.copy(), test_element.copy()])
47+
conformed_df = stager.conform_dataframe(df)
48+
for column in ["date_created", "date_modified", "date_processed", "last_modified"]:
49+
assert conformed_df[column].apply(lambda x: isinstance(x, float)).all()
50+
51+
52+
def test_sql_upload_stager_conform_dataframe_json():
53+
df = pd.DataFrame(data=[test_element.copy(), test_element.copy()])
54+
conformed_df = stager.conform_dataframe(df)
55+
for column in ["permissions_data", "record_locator", "points", "links"]:
56+
assert conformed_df[column].apply(lambda x: isinstance(x, str)).all()
57+
assert (
58+
conformed_df[column]
59+
.apply(lambda x: json.loads(x))
60+
.apply(lambda x: isinstance(x, (list, dict)))
61+
.all()
62+
)
63+
64+
65+
def test_sql_upload_stager_conform_dataframe_strings():
66+
df = pd.DataFrame(data=[test_element.copy(), test_element.copy()])
67+
conformed_df = stager.conform_dataframe(df)
68+
for column in ["version", "page_number", "regex_metadata"]:
69+
assert conformed_df[column].apply(lambda x: isinstance(x, str)).all()
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.13-dev0" # pragma: no cover
1+
__version__ = "0.3.13-dev1" # pragma: no cover

‎unstructured_ingest/v2/processes/connectors/sql/snowflake.py‎

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from contextlib import contextmanager
22
from dataclasses import dataclass, field
3-
from pathlib import Path
43
from typing import TYPE_CHECKING, Generator, Optional
54

65
import numpy as np
@@ -9,6 +8,7 @@
98

109
from unstructured_ingest.utils.data_prep import split_dataframe
1110
from unstructured_ingest.utils.dep_check import requires_dependencies
11+
from unstructured_ingest.v2.interfaces.file_data import FileData
1212
from unstructured_ingest.v2.logger import logger
1313
from unstructured_ingest.v2.processes.connector_registry import (
1414
DestinationRegistryEntry,
@@ -160,9 +160,17 @@ class SnowflakeUploader(SQLUploader):
160160
connector_type: str = CONNECTOR_TYPE
161161
values_delimiter: str = "?"
162162

163-
def upload_contents(self, path: Path) -> None:
164-
df = pd.read_json(path, orient="records", lines=True)
163+
def upload_dataframe(self, df: pd.DataFrame, file_data: FileData) -> None:
164+
if self.can_delete():
165+
self.delete_by_record_id(file_data=file_data)
166+
else:
167+
logger.warning(
168+
f"table doesn't contain expected "
169+
f"record id column "
170+
f"{self.upload_config.record_id_key}, skipping delete"
171+
)
165172
df.replace({np.nan: None}, inplace=True)
173+
self._fit_to_schema(df=df, columns=self.get_table_columns())
166174

167175
columns = list(df.columns)
168176
stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format(

‎unstructured_ingest/v2/processes/connectors/sql/sql.py‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from abc import ABC, abstractmethod
44
from contextlib import contextmanager
55
from dataclasses import dataclass, field
6-
from datetime import date, datetime
6+
from datetime import datetime
77
from pathlib import Path
88
from time import time
99
from typing import Any, Generator, Union
@@ -92,7 +92,7 @@ class SqlBatchFileData(BatchFileData):
9292
additional_metadata: SqlAdditionalMetadata
9393

9494

95-
def parse_date_string(date_value: Union[str, int]) -> date:
95+
def parse_date_string(date_value: Union[str, int]) -> datetime:
9696
try:
9797
timestamp = float(date_value) / 1000 if isinstance(date_value, int) else float(date_value)
9898
return datetime.fromtimestamp(timestamp)
@@ -267,7 +267,7 @@ def conform_dict(self, element_dict: dict, file_data: FileData) -> dict:
267267

268268
def conform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
269269
for column in filter(lambda x: x in df.columns, _DATE_COLUMNS):
270-
df[column] = df[column].apply(parse_date_string)
270+
df[column] = df[column].apply(parse_date_string).apply(lambda date: date.timestamp())
271271
for column in filter(
272272
lambda x: x in df.columns,
273273
("permissions_data", "record_locator", "points", "links"),

0 commit comments

Comments
 (0)