Skip to content

Commit ce2bcf4

Browse files
authored
Fix: Resolve conflicting schema issues by removing dependency on PyArrow (#67)
1 parent e1260c4 commit ce2bcf4

File tree

15 files changed

+451
-600
lines changed

15 files changed

+451
-600
lines changed

airbyte/_batch_handles.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Batch handle class."""
3+
4+
from __future__ import annotations
5+
6+
from contextlib import suppress
7+
from typing import IO, TYPE_CHECKING, Callable
8+
9+
10+
if TYPE_CHECKING:
11+
from pathlib import Path
12+
13+
14+
class BatchHandle:
15+
"""A handle for a batch of records."""
16+
17+
def __init__(
18+
self,
19+
stream_name: str,
20+
batch_id: str,
21+
files: list[Path],
22+
file_opener: Callable[[Path], IO[bytes]],
23+
) -> None:
24+
"""Initialize the batch handle."""
25+
self._stream_name = stream_name
26+
self._batch_id = batch_id
27+
self._files = files
28+
self._record_count = 0
29+
assert self._files, "A batch must have at least one file."
30+
self._open_file_writer: IO[bytes] = file_opener(self._files[0])
31+
32+
# Marker for whether the batch has been finalized.
33+
self.finalized: bool = False
34+
35+
@property
36+
def files(self) -> list[Path]:
37+
"""Return the files."""
38+
return self._files
39+
40+
@property
41+
def batch_id(self) -> str:
42+
"""Return the batch ID."""
43+
return self._batch_id
44+
45+
@property
46+
def stream_name(self) -> str:
47+
"""Return the stream name."""
48+
return self._stream_name
49+
50+
@property
51+
def record_count(self) -> int:
52+
"""Return the record count."""
53+
return self._record_count
54+
55+
def increment_record_count(self) -> None:
56+
"""Increment the record count."""
57+
self._record_count += 1
58+
59+
@property
60+
def open_file_writer(self) -> IO[bytes] | None:
61+
"""Return the open file writer, if any, or None."""
62+
return self._open_file_writer
63+
64+
def close_files(self) -> None:
65+
"""Close the file writer."""
66+
if self.open_file_writer is None:
67+
return
68+
69+
with suppress(Exception):
70+
self.open_file_writer.close()
71+
72+
def delete_files(self) -> None:
73+
"""Delete the files.
74+
75+
If any files are open, they will be closed first.
76+
If any files are missing, they will be ignored.
77+
"""
78+
self.close_files()
79+
for file in self.files:
80+
file.unlink(missing_ok=True)
81+
82+
def __del__(self) -> None:
83+
"""Upon deletion, close the file writer."""
84+
self.close_files()

0 commit comments

Comments
 (0)