|
5 | 5 | import logging |
6 | 6 | from io import IOBase |
7 | 7 | from pathlib import Path |
| 8 | +import sys |
| 9 | +import tempfile |
| 10 | +import io |
8 | 11 | from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union |
| 12 | +from collections import deque |
9 | 13 |
|
10 | 14 | import orjson |
11 | 15 | import pandas as pd |
|
31 | 35 | from airbyte_cdk.sources.file_based.schema_helpers import SchemaType |
32 | 36 |
|
33 | 37 |
|
| 38 | + |
| 39 | +def iter_records_via_tempfile(df: pd.DataFrame): |
| 40 | + """ |
| 41 | + Stream records using Pandas' to_json (so datetime strings match exactly), |
| 42 | + without building a giant string in RAM. |
| 43 | +
|
| 44 | + - Writes NDJSON to a temporary file (text-wrapped over a binary file) |
| 45 | + - Reads back line-by-line and yields parsed dicts |
| 46 | + """ |
| 47 | + with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=True) as f: |
| 48 | + df.to_json(f, orient="records", lines=True, date_format="iso", date_unit="us") |
| 49 | + f.seek(0) |
| 50 | + for line in f: # line is str |
| 51 | + if line.strip(): |
| 52 | + yield orjson.loads(line) |
| 53 | + |
| 54 | + |
34 | 55 | class ExcelParser(FileTypeParser): |
35 | 56 | ENCODING = None |
36 | 57 |
|
@@ -118,9 +139,9 @@ def parse_records( |
118 | 139 | # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson |
119 | 140 | # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior |
120 | 141 | # see PR description: https://github.com/airbytehq/airbyte/pull/44444/ |
121 | | - yield from orjson.loads( |
122 | | - df.to_json(orient="records", date_format="iso", date_unit="us") |
123 | | - ) |
| 142 | + for index, row in df.iterrows(): |
| 143 | + # Convert each row (as a Series) to a JSON string |
| 144 | + yield orjson.loads(row.to_json(date_format="iso", date_unit="us")) |
124 | 145 |
|
125 | 146 | except Exception as exc: |
126 | 147 | # Raise a RecordParseError if any exception occurs during parsing |
|
0 commit comments