Skip to content

Commit 797e657

Browse files
authored
Feat: Add MotherDuck cache support 🦆 (#43)
1 parent 7b527d0 commit 797e657

File tree

7 files changed

+254
-140
lines changed

7 files changed

+254
-140
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""A MotherDuck implementation of the cache, built on the DuckDB implementation."""
3+
4+
from __future__ import annotations
5+
6+
import warnings
7+
from typing import TYPE_CHECKING
8+
9+
from overrides import overrides
10+
11+
from airbyte._processors.file import JsonlWriter
12+
from airbyte._processors.sql.duckdb import DuckDBSqlProcessor
13+
14+
15+
if TYPE_CHECKING:
16+
from airbyte.caches.motherduck import MotherDuckCache
17+
18+
19+
# Suppress warnings from DuckDB about reflection on indices.
20+
# https://github.com/Mause/duckdb_engine/issues/905
21+
warnings.filterwarnings(
22+
"ignore",
23+
message="duckdb-engine doesn't yet support reflection on indices",
24+
)
25+
26+
27+
class MotherDuckSqlProcessor(DuckDBSqlProcessor):
28+
"""A cache implementation for MotherDuck."""
29+
30+
supports_merge_insert = False
31+
file_writer_class = JsonlWriter
32+
cache: MotherDuckCache
33+
34+
@overrides
35+
def _setup(self) -> None:
36+
"""Do any necessary setup, if applicable.
37+
38+
Note: The DuckDB parent class requires pre-creation of local directory structure. We
39+
don't need to do that here so we override the method be a no-op.
40+
"""
41+
# No setup to do and no need to pre-create local file storage.
42+
pass

airbyte/caches/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from airbyte.caches.base import CacheBase
66
from airbyte.caches.duckdb import DuckDBCache
77
from airbyte.caches.factories import get_default_cache, new_local_cache
8+
from airbyte.caches.motherduck import MotherDuckCache
89
from airbyte.caches.postgres import PostgresCache
910
from airbyte.caches.snowflake import SnowflakeCache
1011

@@ -15,8 +16,9 @@
1516
"get_default_cache",
1617
"new_local_cache",
1718
# Classes
19+
"CacheBase",
1820
"DuckDBCache",
21+
"MotherDuckCache",
1922
"PostgresCache",
20-
"CacheBase",
2123
"SnowflakeCache",
2224
]

airbyte/caches/motherduck.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""A cache implementation for the MotherDuck service, built on DuckDB."""
2+
from __future__ import annotations
3+
4+
from overrides import overrides
5+
from pydantic import Field
6+
7+
from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor
8+
from airbyte.caches.duckdb import DuckDBCache
9+
10+
11+
class MotherDuckCache(DuckDBCache):
12+
"""Cache that uses MotherDuck for external persistent storage."""
13+
14+
db_path: str = Field(default="md:")
15+
database: str
16+
api_key: str
17+
18+
_sql_processor_class = MotherDuckSqlProcessor
19+
20+
@overrides
21+
def get_sql_alchemy_url(self) -> str:
22+
"""Return the SQLAlchemy URL to use."""
23+
return (
24+
f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
25+
# f"&schema={self.schema_name}" # TODO: Debug why this doesn't work
26+
)
27+
28+
@overrides
29+
def get_database_name(self) -> str:
30+
"""Return the name of the database."""
31+
return self.database

airbyte/progress.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,24 @@
99
import time
1010
from contextlib import suppress
1111
from enum import Enum, auto
12-
from typing import cast
12+
from typing import TYPE_CHECKING, cast
1313

1414
from rich.errors import LiveError
1515
from rich.live import Live as RichLive
1616
from rich.markdown import Markdown as RichMarkdown
1717

1818

19+
if TYPE_CHECKING:
20+
from types import ModuleType
21+
22+
1923
DEFAULT_REFRESHES_PER_SECOND = 2
2024
IS_REPL = hasattr(sys, "ps1") # True if we're in a Python REPL, in which case we can use Rich.
2125

26+
ipy_display: ModuleType | None
2227
try:
2328
IS_NOTEBOOK = True
24-
from IPython import display as ipy_display
29+
from IPython import display as ipy_display # type: ignore # noqa: PGH003
2530

2631
except ImportError:
2732
ipy_display = None
@@ -310,7 +315,7 @@ def update_display(self, *, force_refresh: bool = False) -> None:
310315

311316
status_message = self._get_status_message()
312317

313-
if self.style == ProgressStyle.IPYTHON:
318+
if self.style == ProgressStyle.IPYTHON and ipy_display is not None:
314319
# We're in a notebook so use the IPython display.
315320
ipy_display.clear_output(wait=True)
316321
ipy_display.display(ipy_display.Markdown(status_message))
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""A sample execution script which loads data from `source-faker` to a MotherDuck-backed cache.
3+
4+
Usage (from repo root):
5+
poetry install
6+
poetry run python examples/run_faker_to_motherduck.py
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import airbyte as ab
12+
from airbyte.caches import MotherDuckCache
13+
14+
15+
MOTHERDUCK_API_KEY = ab.get_secret("MOTHERDUCK_API_KEY")
16+
"""This is the API key for the MotherDuck service.
17+
18+
It can be auto-detected in env vars and/or a .env file in the root of the project.
19+
20+
If will be prompted (and masked during input) if not found in either location.
21+
"""
22+
23+
24+
source = ab.get_source(
25+
"source-faker",
26+
config={"count": 10000, "seed": 0, "parallelism": 1, "always_updated": False},
27+
install_if_missing=True,
28+
)
29+
source.check()
30+
source.select_all_streams()
31+
32+
cache = MotherDuckCache(
33+
database="airbyte_test",
34+
schema_name="faker_data",
35+
api_key=MOTHERDUCK_API_KEY,
36+
)
37+
38+
result = source.read(cache=cache)
39+
40+
for name, records in result.streams.items():
41+
print(f"Stream {name}: {len(records)} records")

0 commit comments

Comments
 (0)