Skip to content

Commit 81d1b9c

Browse files
authored
Feat: Replace Parquet File Writer with Gzipped Jsonl File Writer (#60)
1 parent b0a98fe commit 81d1b9c

File tree

11 files changed

+253
-72
lines changed

11 files changed

+253
-72
lines changed

airbyte/_file_writers/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
from __future__ import annotations
22

33
from .base import FileWriterBase, FileWriterBatchHandle, FileWriterConfigBase
4+
from .jsonl import JsonlWriter, JsonlWriterConfig
45
from .parquet import ParquetWriter, ParquetWriterConfig
56

67

78
__all__ = [
89
"FileWriterBatchHandle",
910
"FileWriterBase",
1011
"FileWriterConfigBase",
12+
"JsonlWriter",
13+
"JsonlWriterConfig",
1114
"ParquetWriter",
1215
"ParquetWriterConfig",
1316
]

airbyte/_file_writers/jsonl.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
"""A Parquet cache implementation."""
4+
from __future__ import annotations
5+
6+
import gzip
7+
from pathlib import Path
8+
from typing import TYPE_CHECKING, cast
9+
10+
import orjson
11+
import ulid
12+
from overrides import overrides
13+
14+
from airbyte._file_writers.base import (
15+
FileWriterBase,
16+
FileWriterBatchHandle,
17+
FileWriterConfigBase,
18+
)
19+
20+
21+
if TYPE_CHECKING:
22+
import pyarrow as pa
23+
24+
25+
class JsonlWriterConfig(FileWriterConfigBase):
26+
"""Configuration for the Snowflake cache."""
27+
28+
# Inherits `cache_dir` from base class
29+
30+
31+
class JsonlWriter(FileWriterBase):
32+
"""A Jsonl cache implementation."""
33+
34+
config_class = JsonlWriterConfig
35+
36+
def get_new_cache_file_path(
37+
self,
38+
stream_name: str,
39+
batch_id: str | None = None, # ULID of the batch
40+
) -> Path:
41+
"""Return a new cache file path for the given stream."""
42+
batch_id = batch_id or str(ulid.ULID())
43+
config: JsonlWriterConfig = cast(JsonlWriterConfig, self.config)
44+
target_dir = Path(config.cache_dir)
45+
target_dir.mkdir(parents=True, exist_ok=True)
46+
return target_dir / f"{stream_name}_{batch_id}.jsonl.gz"
47+
48+
@overrides
49+
def _write_batch(
50+
self,
51+
stream_name: str,
52+
batch_id: str,
53+
record_batch: pa.Table,
54+
) -> FileWriterBatchHandle:
55+
"""Process a record batch.
56+
57+
Return the path to the cache file.
58+
"""
59+
_ = batch_id # unused
60+
output_file_path = self.get_new_cache_file_path(stream_name)
61+
62+
with gzip.open(output_file_path, "w") as jsonl_file:
63+
for record in record_batch.to_pylist():
64+
jsonl_file.write(orjson.dumps(record) + b"\n")
65+
66+
batch_handle = FileWriterBatchHandle()
67+
batch_handle.files.append(output_file_path)
68+
return batch_handle

airbyte/_file_writers/parquet.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22

3-
"""A Parquet cache implementation."""
3+
"""A Parquet cache implementation.
4+
5+
NOTE: Parquet is a strongly typed columnar storage format, which has known issues when applied to
6+
variable schemas, schemas with indeterminate types, and schemas that have empty data nodes.
7+
This implementation is deprecated for now in favor of jsonl.gz, and may be removed or revamped in
8+
the future.
9+
"""
410
from __future__ import annotations
511

612
from pathlib import Path
@@ -83,8 +89,20 @@ def _write_batch(
8389
for col in missing_columns:
8490
record_batch = record_batch.append_column(col, null_array)
8591

86-
with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer:
87-
writer.write_table(record_batch)
92+
try:
93+
with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer:
94+
writer.write_table(record_batch)
95+
except Exception as e:
96+
raise exc.AirbyteLibInternalError(
97+
message=f"Failed to write record batch to Parquet file: {e}",
98+
context={
99+
"stream_name": stream_name,
100+
"batch_id": batch_id,
101+
"output_file_path": output_file_path,
102+
"schema": record_batch.schema,
103+
"record_batch": record_batch,
104+
},
105+
) from e
88106

89107
batch_handle = FileWriterBatchHandle()
90108
batch_handle.files.append(output_file_path)

airbyte/_processors.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from collections import defaultdict
1818
from typing import TYPE_CHECKING, Any, cast, final
1919

20+
import pandas as pd
2021
import pyarrow as pa
2122
import ulid
2223

@@ -35,6 +36,7 @@
3536
from airbyte._util import protocol_util
3637
from airbyte.progress import progress
3738
from airbyte.strategies import WriteStrategy
39+
from airbyte.types import _get_pyarrow_type
3840

3941

4042
if TYPE_CHECKING:
@@ -177,17 +179,16 @@ def process_airbyte_messages(
177179
)
178180

179181
stream_batches: dict[str, list[dict]] = defaultdict(list, {})
180-
181182
# Process messages, writing to batches as we go
182183
for message in messages:
183184
if message.type is Type.RECORD:
184185
record_msg = cast(AirbyteRecordMessage, message.record)
185186
stream_name = record_msg.stream
186187
stream_batch = stream_batches[stream_name]
187188
stream_batch.append(protocol_util.airbyte_record_message_to_dict(record_msg))
188-
189189
if len(stream_batch) >= max_batch_size:
190-
record_batch = pa.Table.from_pylist(stream_batch)
190+
batch_df = pd.DataFrame(stream_batch)
191+
record_batch = pa.Table.from_pandas(batch_df)
191192
self._process_batch(stream_name, record_batch)
192193
progress.log_batch_written(stream_name, len(stream_batch))
193194
stream_batch.clear()
@@ -206,21 +207,23 @@ def process_airbyte_messages(
206207
# Type.LOG, Type.TRACE, Type.CONTROL, etc.
207208
pass
208209

209-
# Add empty streams to the dictionary, so we create a destination table for it
210-
for stream_name in self._expected_streams:
211-
if stream_name not in stream_batches:
212-
if DEBUG_MODE:
213-
print(f"Stream {stream_name} has no data")
214-
stream_batches[stream_name] = []
215-
216210
# We are at the end of the stream. Process whatever else is queued.
217211
for stream_name, stream_batch in stream_batches.items():
218-
record_batch = pa.Table.from_pylist(stream_batch)
212+
batch_df = pd.DataFrame(stream_batch)
213+
record_batch = pa.Table.from_pandas(batch_df)
219214
self._process_batch(stream_name, record_batch)
220215
progress.log_batch_written(stream_name, len(stream_batch))
221216

217+
all_streams = list(self._pending_batches.keys())
218+
# Add empty streams to the streams list, so we create a destination table for it
219+
for stream_name in self._expected_streams:
220+
if stream_name not in all_streams:
221+
if DEBUG_MODE:
222+
print(f"Stream {stream_name} has no data")
223+
all_streams.append(stream_name)
224+
222225
# Finalize any pending batches
223-
for stream_name in list(self._pending_batches.keys()):
226+
for stream_name in all_streams:
224227
self._finalize_batches(stream_name, write_strategy=write_strategy)
225228
progress.log_stream_finalized(stream_name)
226229

@@ -394,3 +397,17 @@ def _get_stream_json_schema(
394397
) -> dict[str, Any]:
395398
"""Return the column definitions for the given stream."""
396399
return self._get_stream_config(stream_name).stream.json_schema
400+
401+
def _get_stream_pyarrow_schema(
402+
self,
403+
stream_name: str,
404+
) -> pa.Schema:
405+
"""Return the column definitions for the given stream."""
406+
return pa.schema(
407+
fields=[
408+
pa.field(prop_name, _get_pyarrow_type(prop_def))
409+
for prop_name, prop_def in self._get_stream_json_schema(stream_name)[
410+
"properties"
411+
].items()
412+
]
413+
)

airbyte/caches/base.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from typing import TYPE_CHECKING, cast, final
1010

1111
import pandas as pd
12-
import pyarrow as pa
1312
import sqlalchemy
1413
import ulid
1514
from overrides import overrides
@@ -42,6 +41,7 @@
4241
from collections.abc import Generator, Iterator
4342
from pathlib import Path
4443

44+
import pyarrow as pa
4545
from sqlalchemy.engine import Connection, Engine
4646
from sqlalchemy.engine.cursor import CursorResult
4747
from sqlalchemy.engine.reflection import Inspector
@@ -545,17 +545,6 @@ def _finalize_batches(
545545
although this is a fairly rare edge case we can ignore in V1.
546546
"""
547547
with self._finalizing_batches(stream_name) as batches_to_finalize:
548-
if not batches_to_finalize:
549-
return {}
550-
551-
files: list[Path] = []
552-
# Get a list of all files to finalize from all pending batches.
553-
for batch_handle in batches_to_finalize.values():
554-
batch_handle = cast(FileWriterBatchHandle, batch_handle)
555-
files += batch_handle.files
556-
# Use the max batch ID as the batch ID for table names.
557-
max_batch_id = max(batches_to_finalize.keys())
558-
559548
# Make sure the target schema and target table exist.
560549
self._ensure_schema_exists()
561550
final_table_name = self._ensure_final_table_exists(
@@ -567,6 +556,18 @@ def _finalize_batches(
567556
raise_on_error=True,
568557
)
569558

559+
if not batches_to_finalize:
560+
# If there are no batches to finalize, return after ensuring the table exists.
561+
return {}
562+
563+
files: list[Path] = []
564+
# Get a list of all files to finalize from all pending batches.
565+
for batch_handle in batches_to_finalize.values():
566+
batch_handle = cast(FileWriterBatchHandle, batch_handle)
567+
files += batch_handle.files
568+
# Use the max batch ID as the batch ID for table names.
569+
max_batch_id = max(batches_to_finalize.keys())
570+
570571
temp_table_name = self._write_files_to_new_table(
571572
files=files,
572573
stream_name=stream_name,
@@ -659,27 +660,25 @@ def _write_files_to_new_table(
659660
"""
660661
temp_table_name = self._create_table_for_loading(stream_name, batch_id)
661662
for file_path in files:
662-
with pa.parquet.ParquetFile(file_path) as pf:
663-
record_batch = pf.read()
664-
dataframe = record_batch.to_pandas()
665-
666-
# Pandas will auto-create the table if it doesn't exist, which we don't want.
667-
if not self._table_exists(temp_table_name):
668-
raise exc.AirbyteLibInternalError(
669-
message="Table does not exist after creation.",
670-
context={
671-
"temp_table_name": temp_table_name,
672-
},
673-
)
674-
675-
dataframe.to_sql(
676-
temp_table_name,
677-
self.get_sql_alchemy_url(),
678-
schema=self.config.schema_name,
679-
if_exists="append",
680-
index=False,
681-
dtype=self._get_sql_column_definitions(stream_name),
663+
dataframe = pd.read_json(file_path, lines=True)
664+
665+
# Pandas will auto-create the table if it doesn't exist, which we don't want.
666+
if not self._table_exists(temp_table_name):
667+
raise exc.AirbyteLibInternalError(
668+
message="Table does not exist after creation.",
669+
context={
670+
"temp_table_name": temp_table_name,
671+
},
682672
)
673+
674+
dataframe.to_sql(
675+
temp_table_name,
676+
self.get_sql_alchemy_url(),
677+
schema=self.config.schema_name,
678+
if_exists="append",
679+
index=False,
680+
dtype=self._get_sql_column_definitions(stream_name),
681+
)
683682
return temp_table_name
684683

685684
@final
@@ -959,6 +958,11 @@ def register_source(
959958
This method is called by the source when it is initialized.
960959
"""
961960
self._source_name = source_name
961+
self.file_writer.register_source(
962+
source_name,
963+
incoming_source_catalog,
964+
stream_names=stream_names,
965+
)
962966
self._ensure_schema_exists()
963967
super().register_source(
964968
source_name,

airbyte/caches/duckdb.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from overrides import overrides
1313

14-
from airbyte._file_writers import ParquetWriter, ParquetWriterConfig
14+
from airbyte._file_writers import JsonlWriter, JsonlWriterConfig
1515
from airbyte.caches.base import SQLCacheBase, SQLCacheConfigBase
1616
from airbyte.telemetry import CacheTelemetryInfo
1717

@@ -24,10 +24,10 @@
2424
)
2525

2626

27-
class DuckDBCacheConfig(SQLCacheConfigBase, ParquetWriterConfig):
27+
class DuckDBCacheConfig(SQLCacheConfigBase, JsonlWriterConfig):
2828
"""Configuration for the DuckDB cache.
2929
30-
Also inherits config from the ParquetWriter, which is responsible for writing files to disk.
30+
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
3131
"""
3232

3333
db_path: Path | str
@@ -88,7 +88,7 @@ class DuckDBCache(DuckDBCacheBase):
8888
so we insert as values instead.
8989
"""
9090

91-
file_writer_class = ParquetWriter
91+
file_writer_class = JsonlWriter
9292

9393
# TODO: Delete or rewrite this method after DuckDB adds support for primary key inspection.
9494
# @overrides
@@ -181,12 +181,22 @@ def _write_files_to_new_table(
181181
stream_name=stream_name,
182182
batch_id=batch_id,
183183
)
184-
columns_list = [
185-
self._quote_identifier(c)
186-
for c in list(self._get_sql_column_definitions(stream_name).keys())
187-
]
188-
columns_list_str = indent("\n, ".join(columns_list), " ")
184+
columns_list = list(self._get_sql_column_definitions(stream_name=stream_name).keys())
185+
columns_list_str = indent(
186+
"\n, ".join([self._quote_identifier(c) for c in columns_list]),
187+
" ",
188+
)
189189
files_list = ", ".join([f"'{f!s}'" for f in files])
190+
columns_type_map = indent(
191+
"\n, ".join(
192+
[
193+
f"{self._quote_identifier(c)}: "
194+
f"{self._get_sql_column_definitions(stream_name)[c]!s}"
195+
for c in columns_list
196+
]
197+
),
198+
" ",
199+
)
190200
insert_statement = dedent(
191201
f"""
192202
INSERT INTO {self.config.schema_name}.{temp_table_name}
@@ -195,9 +205,11 @@ def _write_files_to_new_table(
195205
)
196206
SELECT
197207
{columns_list_str}
198-
FROM read_parquet(
208+
FROM read_json_auto(
199209
[{files_list}],
200-
union_by_name = true
210+
format = 'newline_delimited',
211+
union_by_name = true,
212+
columns = {{ { columns_type_map } }}
201213
)
202214
"""
203215
)

0 commit comments

Comments
 (0)