Skip to content

Commit 7b527d0

Browse files
authored
Feat: Remove need to import CacheConfig classes in addition to Cache classes (major refactor) (#59)
1 parent 81d1b9c commit 7b527d0

35 files changed

+1638
-1567
lines changed

airbyte/__init__.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
"""
66
from __future__ import annotations
77

8-
from airbyte._factories.cache_factories import get_default_cache, new_local_cache
8+
from airbyte import caches, datasets, registry, secrets
99
from airbyte._factories.connector_factories import get_source
10-
from airbyte.caches import DuckDBCache, DuckDBCacheConfig
10+
from airbyte.caches.duckdb import DuckDBCache
11+
from airbyte.caches.factories import get_default_cache, new_local_cache
1112
from airbyte.datasets import CachedDataset
1213
from airbyte.registry import get_available_connectors
1314
from airbyte.results import ReadResult
@@ -16,14 +17,20 @@
1617

1718

1819
__all__ = [
19-
"CachedDataset",
20-
"DuckDBCache",
21-
"DuckDBCacheConfig",
20+
# Modules
21+
"caches",
22+
"datasets",
23+
"registry",
24+
"secrets",
25+
# Factories
2226
"get_available_connectors",
23-
"get_source",
2427
"get_default_cache",
2528
"get_secret",
29+
"get_source",
2630
"new_local_cache",
31+
# Classes
32+
"CachedDataset",
33+
"DuckDBCache",
2734
"ReadResult",
2835
"SecretSource",
2936
"Source",

airbyte/_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def install(self) -> None:
6565
pass
6666

6767
@abstractmethod
68-
def get_telemetry_info(self) -> SourceTelemetryInfo:
68+
def _get_telemetry_info(self) -> SourceTelemetryInfo:
6969
pass
7070

7171
@abstractmethod
@@ -388,7 +388,7 @@ def execute(self, args: list[str]) -> Iterator[str]:
388388
with _stream_from_subprocess([str(connector_path), *args]) as stream:
389389
yield from stream
390390

391-
def get_telemetry_info(self) -> SourceTelemetryInfo:
391+
def _get_telemetry_info(self) -> SourceTelemetryInfo:
392392
return SourceTelemetryInfo(
393393
name=self.name,
394394
type=SourceType.VENV,
@@ -449,7 +449,7 @@ def execute(self, args: list[str]) -> Iterator[str]:
449449
with _stream_from_subprocess([str(self.path), *args]) as stream:
450450
yield from stream
451451

452-
def get_telemetry_info(self) -> SourceTelemetryInfo:
452+
def _get_telemetry_info(self) -> SourceTelemetryInfo:
453453
return SourceTelemetryInfo(
454454
str(self.name),
455455
SourceType.LOCAL_INSTALL,

airbyte/_file_writers/__init__.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

airbyte/_processors/__init__.py

Whitespace-only changes.
Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""Abstract base class for Processors, including SQL and File writers.
23
3-
"""Define abstract base class for Processors, including Caches and File writers.
4-
5-
Processors can all take input from STDIN or a stream of Airbyte messages.
4+
Processors can take input from STDIN or a stream of Airbyte messages.
65
76
Caches will pass their input to the File Writer. They share a common base class so certain
87
abstractions like "write" and "finalize" can be handled in either layer, or both.
@@ -34,6 +33,7 @@
3433

3534
from airbyte import exceptions as exc
3635
from airbyte._util import protocol_util
36+
from airbyte.caches.base import CacheBase
3737
from airbyte.progress import progress
3838
from airbyte.strategies import WriteStrategy
3939
from airbyte.types import _get_pyarrow_type
@@ -43,7 +43,6 @@
4343
from collections.abc import Generator, Iterable, Iterator
4444

4545
from airbyte.caches._catalog_manager import CatalogManager
46-
from airbyte.config import CacheConfigBase
4746

4847

4948
DEFAULT_BATCH_SIZE = 10_000
@@ -61,32 +60,29 @@ class AirbyteMessageParsingError(Exception):
6160
class RecordProcessor(abc.ABC):
6261
"""Abstract base class for classes which can process input records."""
6362

64-
config_class: type[CacheConfigBase]
6563
skip_finalize_step: bool = False
66-
_expected_streams: set[str]
6764

6865
def __init__(
6966
self,
70-
config: CacheConfigBase | dict | None,
67+
cache: CacheBase,
7168
*,
7269
catalog_manager: CatalogManager | None = None,
7370
) -> None:
74-
if isinstance(config, dict):
75-
config = self.config_class(**config)
76-
77-
self.config = config or self.config_class()
78-
if not isinstance(self.config, self.config_class):
79-
err_msg = (
80-
f"Expected config class of type '{self.config_class.__name__}'. "
81-
f"Instead found '{type(self.config).__name__}'."
71+
self._expected_streams: set[str] | None = None
72+
self.cache: CacheBase = cache
73+
if not isinstance(self.cache, CacheBase):
74+
raise exc.AirbyteLibInputError(
75+
message=(
76+
f"Expected config class of type 'CacheBase'. "
77+
f"Instead received type '{type(self.cache).__name__}'."
78+
),
8279
)
83-
raise TypeError(err_msg)
8480

8581
self.source_catalog: ConfiguredAirbyteCatalog | None = None
8682
self._source_name: str | None = None
8783

88-
self._pending_batches: dict[str, dict[str, Any]] = defaultdict(lambda: {}, {})
89-
self._finalized_batches: dict[str, dict[str, Any]] = defaultdict(lambda: {}, {})
84+
self._pending_batches: dict[str, dict[str, Any]] = defaultdict(dict, {})
85+
self._finalized_batches: dict[str, dict[str, Any]] = defaultdict(dict, {})
9086

9187
self._pending_state_messages: dict[str, list[AirbyteStateMessage]] = defaultdict(list, {})
9288
self._finalized_state_messages: dict[
@@ -97,6 +93,11 @@ def __init__(
9793
self._catalog_manager: CatalogManager | None = catalog_manager
9894
self._setup()
9995

96+
@property
97+
def expected_streams(self) -> set[str]:
98+
"""Return the expected stream names."""
99+
return self._expected_streams or set()
100+
100101
def register_source(
101102
self,
102103
source_name: str,
@@ -115,11 +116,6 @@ def register_source(
115116
)
116117
self._expected_streams = stream_names
117118

118-
@property
119-
def _streams_with_data(self) -> set[str]:
120-
"""Return a list of known streams."""
121-
return self._pending_batches.keys() | self._finalized_batches.keys()
122-
123119
@final
124120
def process_stdin(
125121
self,
@@ -216,7 +212,7 @@ def process_airbyte_messages(
216212

217213
all_streams = list(self._pending_batches.keys())
218214
# Add empty streams to the streams list, so we create a destination table for it
219-
for stream_name in self._expected_streams:
215+
for stream_name in self.expected_streams:
220216
if stream_name not in all_streams:
221217
if DEBUG_MODE:
222218
print(f"Stream {stream_name} has no data")
@@ -358,6 +354,7 @@ def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract
358354
By default this is a no-op but subclasses can override this method to prepare
359355
any necessary resources.
360356
"""
357+
pass
361358

362359
def _teardown(self) -> None:
363360
"""Teardown the processor resources.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""File processors."""
3+
4+
from __future__ import annotations
5+
6+
from .base import FileWriterBase, FileWriterBatchHandle
7+
from .jsonl import JsonlWriter
8+
from .parquet import ParquetWriter
9+
10+
11+
__all__ = [
12+
"FileWriterBatchHandle",
13+
"FileWriterBase",
14+
"JsonlWriter",
15+
"ParquetWriter",
16+
]
Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2-
3-
"""Define abstract base class for File Writers, which write and read from file storage."""
2+
"""Abstract base class for File Writers, which write and read from file storage."""
43

54
from __future__ import annotations
65

76
import abc
87
from dataclasses import dataclass, field
9-
from pathlib import Path
108
from typing import TYPE_CHECKING, cast, final
119

1210
from overrides import overrides
1311

14-
from airbyte._processors import BatchHandle, RecordProcessor
15-
from airbyte.config import CacheConfigBase
12+
from airbyte._processors.base import BatchHandle, RecordProcessor
1613

1714

1815
if TYPE_CHECKING:
16+
from pathlib import Path
17+
1918
import pyarrow as pa
2019

2120
from airbyte_protocol.models import (
@@ -34,21 +33,9 @@ class FileWriterBatchHandle(BatchHandle):
3433
files: list[Path] = field(default_factory=list)
3534

3635

37-
class FileWriterConfigBase(CacheConfigBase):
38-
"""Configuration for the Snowflake cache."""
39-
40-
cache_dir: Path = Path("./.cache/files/")
41-
"""The directory to store cache files in."""
42-
cleanup: bool = True
43-
"""Whether to clean up temporary files after processing a batch."""
44-
45-
4636
class FileWriterBase(RecordProcessor, abc.ABC):
4737
"""A generic base implementation for a file-based cache."""
4838

49-
config_class = FileWriterConfigBase
50-
config: FileWriterConfigBase
51-
5239
@abc.abstractmethod
5340
@overrides
5441
def _write_batch(
@@ -91,7 +78,7 @@ def _cleanup_batch(
9178
9279
This method is a no-op if the `cleanup` config option is set to False.
9380
"""
94-
if self.config.cleanup:
81+
if self.cache.cleanup:
9582
batch_handle = cast(FileWriterBatchHandle, batch_handle)
9683
_ = stream_name, batch_id
9784
for file_path in batch_handle.files:
Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,37 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2-
32
"""A Parquet cache implementation."""
3+
44
from __future__ import annotations
55

66
import gzip
77
from pathlib import Path
8-
from typing import TYPE_CHECKING, cast
8+
from typing import TYPE_CHECKING
99

1010
import orjson
1111
import ulid
1212
from overrides import overrides
1313

14-
from airbyte._file_writers.base import (
14+
from airbyte._processors.file.base import (
1515
FileWriterBase,
1616
FileWriterBatchHandle,
17-
FileWriterConfigBase,
1817
)
1918

2019

2120
if TYPE_CHECKING:
2221
import pyarrow as pa
2322

2423

25-
class JsonlWriterConfig(FileWriterConfigBase):
26-
"""Configuration for the Snowflake cache."""
27-
28-
# Inherits `cache_dir` from base class
29-
30-
3124
class JsonlWriter(FileWriterBase):
3225
"""A Jsonl cache implementation."""
3326

34-
config_class = JsonlWriterConfig
35-
3627
def get_new_cache_file_path(
3728
self,
3829
stream_name: str,
3930
batch_id: str | None = None, # ULID of the batch
4031
) -> Path:
4132
"""Return a new cache file path for the given stream."""
4233
batch_id = batch_id or str(ulid.ULID())
43-
config: JsonlWriterConfig = cast(JsonlWriterConfig, self.config)
44-
target_dir = Path(config.cache_dir)
34+
target_dir = Path(self.cache.cache_dir)
4535
target_dir.mkdir(parents=True, exist_ok=True)
4636
return target_dir / f"{stream_name}_{batch_id}.jsonl.gz"
4737

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2-
3-
"""A Parquet cache implementation.
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved
2+
"""A Parquet file writer implementation.
43
54
NOTE: Parquet is a strongly typed columnar storage format, which has known issues when applied to
65
variable schemas, schemas with indeterminate types, and schemas that have empty data nodes.
@@ -18,34 +17,24 @@
1817
from pyarrow import parquet
1918

2019
from airbyte import exceptions as exc
21-
from airbyte._file_writers.base import (
20+
from airbyte._processors.file.base import (
2221
FileWriterBase,
2322
FileWriterBatchHandle,
24-
FileWriterConfigBase,
2523
)
2624
from airbyte._util.text_util import lower_case_set
2725

2826

29-
class ParquetWriterConfig(FileWriterConfigBase):
30-
"""Configuration for the Snowflake cache."""
31-
32-
# Inherits `cache_dir` from base class
33-
34-
3527
class ParquetWriter(FileWriterBase):
3628
"""A Parquet cache implementation."""
3729

38-
config_class = ParquetWriterConfig
39-
4030
def get_new_cache_file_path(
4131
self,
4232
stream_name: str,
4333
batch_id: str | None = None, # ULID of the batch
4434
) -> Path:
4535
"""Return a new cache file path for the given stream."""
4636
batch_id = batch_id or str(ulid.ULID())
47-
config: ParquetWriterConfig = cast(ParquetWriterConfig, self.config)
48-
target_dir = Path(config.cache_dir)
37+
target_dir = Path(self.cache.cache_dir)
4938
target_dir.mkdir(parents=True, exist_ok=True)
5039
return target_dir / f"{stream_name}_{batch_id}.parquet"
5140

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""SQL processors."""

0 commit comments

Comments
 (0)