Skip to content

Commit f27c71c

Browse files
authored
bugfix/add fallback when reading data based on extension (#332)
* Add fallback when reading data based on extension * Generalize approach to single function, re use elsewhere * fix test
1 parent 3a5d824 commit f27c71c

File tree

7 files changed

+41
-31
lines changed

7 files changed

+41
-31
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
## 0.3.13-dev4
1+
## 0.3.13
22

33
### Fixes
44

55
* **Fix Snowflake Uploader error**
66
* **Fix SQL Uploader Stager timestamp error**
77
* **Migrate Discord Sourced Connector to v2**
8+
* **Add read data fallback** When reading data that could be json or ndjson, if extension is missing, fallback to trying to read it as json.
89

910
### Enhancements
1011

test/integration/connectors/utils/validation/destination.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import json
21
import os
32
import shutil
43
from pathlib import Path
54

6-
import ndjson
7-
85
from test.integration.connectors.utils.validation.utils import ValidationConfig
6+
from unstructured_ingest.utils.data_prep import get_data
97
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers, UploadStager
108

119

@@ -28,7 +26,7 @@ def run_all_stager_validations(
2826
assert input_file.suffix == staged_filepath.suffix
2927

3028
# Validate length
31-
staged_data = get_data(staged_filepath=staged_filepath)
29+
staged_data = get_data(path=staged_filepath)
3230
assert len(staged_data) == configs.expected_count
3331

3432
# Validate file
@@ -46,17 +44,6 @@ def update_stager_fixtures(stager_output_path: Path, staged_filepath: Path):
4644
shutil.copy(staged_filepath, copied_filepath)
4745

4846

49-
def get_data(staged_filepath: Path) -> list[dict]:
50-
if staged_filepath.suffix == ".json":
51-
with staged_filepath.open() as f:
52-
return json.load(f)
53-
elif staged_filepath.suffix == ".ndjson":
54-
with staged_filepath.open() as f:
55-
return ndjson.load(f)
56-
else:
57-
raise ValueError(f"Unsupported file type: {staged_filepath.suffix}")
58-
59-
6047
def stager_validation(
6148
stager: UploadStager,
6249
tmp_dir: Path,

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.13-dev4" # pragma: no cover
1+
__version__ = "0.3.13" # pragma: no cover

unstructured_ingest/utils/data_prep.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import ndjson
88
import pandas as pd
99

10+
from unstructured_ingest.v2.logger import logger
11+
1012
DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d+%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z")
1113

1214
T = TypeVar("T")
@@ -135,7 +137,7 @@ def validate_date_args(date: Optional[str] = None) -> bool:
135137
)
136138

137139

138-
def get_data(path: Path) -> list[dict]:
140+
def get_data_by_suffix(path: Path) -> list[dict]:
139141
with path.open() as f:
140142
if path.suffix == ".json":
141143
return json.load(f)
@@ -151,6 +153,35 @@ def get_data(path: Path) -> list[dict]:
151153
raise ValueError(f"Unsupported file type: {path}")
152154

153155

156+
def get_data(path: Path) -> list[dict]:
157+
try:
158+
return get_data_by_suffix(path=path)
159+
except Exception as e:
160+
logger.warning(f"failed to read {path} by extension: {e}")
161+
# Fall back
162+
with path.open() as f:
163+
try:
164+
return json.load(f)
165+
except Exception as e:
166+
logger.warning(f"failed to read {path} as json: {e}")
167+
try:
168+
return ndjson.load(f)
169+
except Exception as e:
170+
logger.warning(f"failed to read {path} as ndjson: {e}")
171+
try:
172+
df = pd.read_csv(path)
173+
return df.to_dict(orient="records")
174+
except Exception as e:
175+
logger.warning(f"failed to read {path} as csv: {e}")
176+
try:
177+
df = pd.read_parquet(path)
178+
return df.to_dict(orient="records")
179+
except Exception as e:
180+
logger.warning(f"failed to read {path} as parquet: {e}")
181+
182+
raise IOError(f"File could not be parsed: {path}")
183+
184+
154185
def get_data_df(path: Path) -> pd.DataFrame:
155186
with path.open() as f:
156187
if path.suffix == ".json":

unstructured_ingest/v2/interfaces/upload_stager.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,6 @@ def write_output(self, output_path: Path, data: list[dict]) -> None:
3232
else:
3333
raise ValueError(f"Unsupported output format: {output_path}")
3434

35-
def get_data(self, elements_filepath: Path) -> list[dict]:
36-
if elements_filepath.suffix == ".json":
37-
with elements_filepath.open() as f:
38-
return json.load(f)
39-
elif elements_filepath.suffix == ".ndjson":
40-
with elements_filepath.open() as f:
41-
return ndjson.load(f)
42-
else:
43-
raise ValueError(f"Unsupported input format: {elements_filepath}")
44-
4535
def conform_dict(self, element_dict: dict, file_data: FileData) -> dict:
4636
return element_dict
4737

unstructured_ingest/v2/processes/connectors/duckdb/base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pandas as pd
66

7+
from unstructured_ingest.utils.data_prep import get_data
78
from unstructured_ingest.v2.interfaces import FileData, UploadStager
89
from unstructured_ingest.v2.utils import get_enhanced_element_id
910

@@ -79,7 +80,7 @@ def run(
7980
output_filename: str,
8081
**kwargs: Any,
8182
) -> Path:
82-
elements_contents = self.get_data(elements_filepath=elements_filepath)
83+
elements_contents = get_data(path=elements_filepath)
8384
output_path = self.get_output_path(output_filename=output_filename, output_dir=output_dir)
8485

8586
output = [

unstructured_ingest/v2/processes/connectors/sql/sql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from pydantic import BaseModel, Field, Secret
1515

1616
from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError
17-
from unstructured_ingest.utils.data_prep import get_data_df, split_dataframe
17+
from unstructured_ingest.utils.data_prep import get_data, get_data_df, split_dataframe
1818
from unstructured_ingest.v2.constants import RECORD_ID_LABEL
1919
from unstructured_ingest.v2.interfaces import (
2020
AccessConfig,
@@ -290,7 +290,7 @@ def run(
290290
output_filename: str,
291291
**kwargs: Any,
292292
) -> Path:
293-
elements_contents = self.get_data(elements_filepath=elements_filepath)
293+
elements_contents = get_data(path=elements_filepath)
294294

295295
df = pd.DataFrame(
296296
data=[

0 commit comments

Comments
 (0)