Skip to content

Commit 670eecc

Browse files
feat: Add Catalog and Table implementations for PostgreSQL (#5487)
## Changes Made Allows us to do ``` catalog = daft.catalog.Catalog.from_postgres("postgresql://user:pass@uri:5432/postgres") table = catalog.get_table("myschema.mytable") df = table.read() ... output_table = catalog.create_table_if_not_exists("myschema.output") output_table.write(df) ``` Depends on #5471 for distributed writes and the `df.write_sql()` interface. ## Key decisions, limitations, and notes - Embeddings support is currently provided via the pgvector extension and the pgvector python client library. - The following types are currently not supported: - Duration - Interval - The following types currently fallback to JSONB: - Multidimensional lists - Structs - Maps - Images - Tensors - No connection pooling - Writes happen on a single node (#5471 implements distributed writes) - Uses psycopg and `COPY` for writes. ADBC would potentially be a better choice here but we ignore it for now due to its limited type support. - Instead of assuming that `public` is the default schema, we delegate table resolution to Postgres' [Schema Search Path](https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-PATH) --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent 8d62057 commit 670eecc

File tree

5 files changed

+1413
-5
lines changed

5 files changed

+1413
-5
lines changed

daft/catalog/__init__.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,35 @@ def from_glue(
312312
except ImportError:
313313
raise ImportError("AWS Glue support not installed: pip install -U 'daft[aws]'")
314314

315+
@staticmethod
316+
def from_postgres(connection_string: str, extensions: list[str] | None = None) -> Catalog:
317+
"""Create a Daft Catalog from a PostgreSQL connection string.
318+
319+
Note::
320+
This is an experimental feature and the API may change in the future.
321+
322+
Args:
323+
connection_string (str): a PostgreSQL connection string
324+
extensions (list[str], optional): List of PostgreSQL extensions to create if they don't exist.
325+
For each extension, "CREATE EXTENSION IF NOT EXISTS <extension>" will be executed.
326+
Defaults to None (no extensions).
327+
328+
Returns:
329+
Catalog: a new Catalog instance to a PostgreSQL database.
330+
331+
Examples:
332+
>>> catalog = Catalog.from_postgres("postgresql://user:password@host:port/database")
333+
>>> catalog = Catalog.from_postgres(
334+
... "postgresql://user:password@host:port/database", extensions=["vector", "pg_stat_statements"]
335+
... )
336+
"""
337+
try:
338+
from daft.catalog.__postgres import PostgresCatalog
339+
340+
return PostgresCatalog.from_uri(connection_string, extensions)
341+
except ImportError:
342+
raise ImportError("PostgreSQL support not installed: pip install -U 'daft[postgres]'")
343+
315344
@staticmethod
316345
def _from_obj(obj: object) -> Catalog:
317346
"""Returns a Daft Catalog from a supported object type or raises a ValueError."""
@@ -326,6 +355,22 @@ def _from_obj(obj: object) -> Catalog:
326355
f"Unsupported catalog type: {type(obj)}; please ensure all required extra dependencies are installed."
327356
)
328357

358+
@staticmethod
359+
def _validate_options(method: str, input: dict[str, Any], valid: set[str]) -> None:
360+
"""Validates input options against a set of valid options.
361+
362+
Args:
363+
method (str): The method name to include in the error message
364+
input (dict[str, Any]): The input options dictionary
365+
valid (set[str]): Set of valid option keys
366+
367+
Raises:
368+
ValueError: If any input options are not in the valid set
369+
"""
370+
invalid_options = set(input.keys()) - valid
371+
if invalid_options:
372+
raise ValueError(f"Unsupported option(s) for {method}, found {invalid_options!s} not in {valid!s}")
373+
329374
###
330375
# create_*
331376
###
@@ -796,6 +841,8 @@ def write(self, df: DataFrame, mode: Literal["append", "overwrite"] = "append",
796841
"""
797842
if mode == "append":
798843
return self.append(df, **options)
844+
else:
845+
return self.overwrite(df, **options)
799846

800847
@abstractmethod
801848
def append(self, df: DataFrame, **options: Any) -> None:

0 commit comments

Comments
 (0)