Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ This list is not final. If you have a driver you'd like to see added, please ope
| [`asyncpg`](https://magicstack.github.io/asyncpg/current/) | PostgreSQL | Async | ✅ |
| [`psycopg`](https://www.psycopg.org/) | PostgreSQL | Sync | ✅ |
| [`psycopg`](https://www.psycopg.org/) | PostgreSQL | Async | ✅ |
| [`psqlpy`](https://psqlpy-python.github.io/) | PostgreSQL | Async | ✅ |
| [`aiosqlite`](https://github.com/omnilib/aiosqlite) | SQLite | Async | ✅ |
| `sqlite3` | SQLite | Sync | ✅ |
| [`oracledb`](https://oracle.github.io/python-oracledb/) | Oracle | Async | ✅ |
Expand Down
1 change: 1 addition & 0 deletions docs/PYPI_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ This list is not final. If you have a driver you'd like to see added, please ope
| [`asyncpg`](https://magicstack.github.io/asyncpg/current/) | PostgreSQL | Async | ✅ |
| [`psycopg`](https://www.psycopg.org/) | PostgreSQL | Sync | ✅ |
| [`psycopg`](https://www.psycopg.org/) | PostgreSQL | Async | ✅ |
| [`psqlpy`](https://psqlpy-python.github.io/) | PostgreSQL | Async | ✅ |
| [`aiosqlite`](https://github.com/omnilib/aiosqlite) | SQLite | Async | ✅ |
| `sqlite3` | SQLite | Sync | ✅ |
| [`oracledb`](https://oracle.github.io/python-oracledb/) | Oracle | Async | ✅ |
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nanoid = ["fastnanoid>=0.4.1"]
oracledb = ["oracledb"]
orjson = ["orjson"]
performance = ["sqlglot[rs]"]
psqlpy = ["psqlpy"]
psycopg = ["psycopg[binary,pool]"]
pydantic = ["pydantic", "pydantic-extra-types"]
pymssql = ["pymssql"]
Expand Down Expand Up @@ -212,6 +213,7 @@ markers = [
"psycopg: marks tests using psycopg",
"pymssql: marks tests using pymssql",
"pymysql: marks tests using pymysql",
"psqlpy: marks tests using psqlpy",
]
testpaths = ["tests"]

Expand Down
7 changes: 3 additions & 4 deletions sqlspec/adapters/asyncmy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def pool_config_dict(self) -> "dict[str, Any]":
raise ImproperConfigurationError(msg)

async def create_connection(self) -> "Connection": # pyright: ignore[reportUnknownParameterType]
"""Create and return a new asyncmy connection.
"""Create and return a new asyncmy connection from the pool.

Returns:
A Connection instance.
Expand All @@ -171,9 +171,8 @@ async def create_connection(self) -> "Connection": # pyright: ignore[reportUnkn
ImproperConfigurationError: If the connection could not be created.
"""
try:
import asyncmy # pyright: ignore[reportMissingTypeStubs]

return await asyncmy.connect(**self.connection_config_dict) # pyright: ignore
async with self.provide_connection() as conn:
return conn
except Exception as e:
msg = f"Could not configure the Asyncmy connection. Error: {e!s}"
raise ImproperConfigurationError(msg) from e
Expand Down
7 changes: 3 additions & 4 deletions sqlspec/adapters/asyncpg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def provide_pool(self, *args: "Any", **kwargs: "Any") -> "Awaitable[Pool]": # p
return self.create_pool() # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType]

async def create_connection(self) -> "AsyncpgConnection":
"""Create and return a new asyncpg connection.
"""Create and return a new asyncpg connection from the pool.

Returns:
A Connection instance.
Expand All @@ -183,9 +183,8 @@ async def create_connection(self) -> "AsyncpgConnection":
ImproperConfigurationError: If the connection could not be created.
"""
try:
import asyncpg

return await asyncpg.connect(**self.connection_config_dict) # type: ignore[no-any-return]
pool = await self.provide_pool()
return await pool.acquire()
except Exception as e:
msg = f"Could not configure the asyncpg connection. Error: {e!s}"
raise ImproperConfigurationError(msg) from e
Expand Down
9 changes: 4 additions & 5 deletions sqlspec/adapters/oracledb/config/_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, cast

from oracledb import create_pool_async as oracledb_create_pool # pyright: ignore[reportUnknownVariableType]
from oracledb.connection import AsyncConnection
Expand Down Expand Up @@ -112,7 +112,7 @@ def pool_config_dict(self) -> "dict[str, Any]":
raise ImproperConfigurationError(msg)

async def create_connection(self) -> "AsyncConnection":
"""Create and return a new oracledb async connection.
"""Create and return a new oracledb async connection from the pool.

Returns:
An AsyncConnection instance.
Expand All @@ -121,9 +121,8 @@ async def create_connection(self) -> "AsyncConnection":
ImproperConfigurationError: If the connection could not be created.
"""
try:
import oracledb

return await oracledb.connect_async(**self.connection_config_dict) # type: ignore[no-any-return]
pool = await self.provide_pool()
return cast("AsyncConnection", await pool.acquire()) # type: ignore[no-any-return,unused-ignore]
except Exception as e:
msg = f"Could not configure the Oracle async connection. Error: {e!s}"
raise ImproperConfigurationError(msg) from e
Expand Down
7 changes: 3 additions & 4 deletions sqlspec/adapters/oracledb/config/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def pool_config_dict(self) -> "dict[str, Any]":
raise ImproperConfigurationError(msg)

def create_connection(self) -> "Connection":
"""Create and return a new oracledb connection.
"""Create and return a new oracledb connection from the pool.

Returns:
A Connection instance.
Expand All @@ -121,9 +121,8 @@ def create_connection(self) -> "Connection":
ImproperConfigurationError: If the connection could not be created.
"""
try:
import oracledb

return oracledb.connect(**self.connection_config_dict)
pool = self.provide_pool()
return pool.acquire()
except Exception as e:
msg = f"Could not configure the Oracle connection. Error: {e!s}"
raise ImproperConfigurationError(msg) from e
Expand Down
Empty file.
258 changes: 258 additions & 0 deletions sqlspec/adapters/psqlpy/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
"""Configuration for the psqlpy PostgreSQL adapter."""

from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Optional, Union

from psqlpy import Connection, ConnectionPool

from sqlspec.adapters.psqlpy.driver import PsqlpyDriver
from sqlspec.base import AsyncDatabaseConfig, GenericPoolConfig
from sqlspec.exceptions import ImproperConfigurationError
from sqlspec.typing import Empty, EmptyType, dataclass_to_dict

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable


__all__ = (
"PsqlpyConfig",
"PsqlpyPoolConfig",
)


@dataclass
class PsqlpyPoolConfig(GenericPoolConfig):
"""Configuration for psqlpy connection pool.

Ref: https://psqlpy-python.github.io/components/connection_pool.html#all-available-connectionpool-parameters
"""

dsn: Optional[Union[str, EmptyType]] = Empty
"""DSN of the PostgreSQL."""
# Required connection parameters
username: Optional[Union[str, EmptyType]] = Empty
"""Username of the user in the PostgreSQL."""
password: Optional[Union[str, EmptyType]] = Empty
"""Password of the user in the PostgreSQL."""
db_name: Optional[Union[str, EmptyType]] = Empty
"""Name of the database in PostgreSQL."""

# Single or Multi-host parameters (mutually exclusive)
host: Optional[Union[str, EmptyType]] = Empty
"""Host of the PostgreSQL (use for single host)."""
port: Optional[Union[int, EmptyType]] = Empty
"""Port of the PostgreSQL (use for single host)."""
hosts: Optional[Union[list[str], EmptyType]] = Empty
"""List of hosts of the PostgreSQL (use for multiple hosts)."""
ports: Optional[Union[list[int], EmptyType]] = Empty
"""List of ports of the PostgreSQL (use for multiple hosts)."""

# Pool size
max_db_pool_size: int = 10
"""Maximum size of the connection pool. Defaults to 10."""

# Optional timeouts
connect_timeout_sec: Optional[Union[int, EmptyType]] = Empty
"""The time limit in seconds applied to each socket-level connection attempt."""
connect_timeout_nanosec: Optional[Union[int, EmptyType]] = Empty
"""Nanoseconds for connection timeout, can be used only with `connect_timeout_sec`."""
tcp_user_timeout_sec: Optional[Union[int, EmptyType]] = Empty
"""The time limit that transmitted data may remain unacknowledged before a connection is forcibly closed."""
tcp_user_timeout_nanosec: Optional[Union[int, EmptyType]] = Empty
"""Nanoseconds for tcp_user_timeout, can be used only with `tcp_user_timeout_sec`."""

# Optional keepalives
keepalives: bool = True
"""Controls the use of TCP keepalive. Defaults to True (on)."""
keepalives_idle_sec: Optional[Union[int, EmptyType]] = Empty
"""The number of seconds of inactivity after which a keepalive message is sent to the server."""
keepalives_idle_nanosec: Optional[Union[int, EmptyType]] = Empty
"""Nanoseconds for keepalives_idle_sec."""
keepalives_interval_sec: Optional[Union[int, EmptyType]] = Empty
"""The time interval between TCP keepalive probes."""
keepalives_interval_nanosec: Optional[Union[int, EmptyType]] = Empty
"""Nanoseconds for keepalives_interval_sec."""
keepalives_retries: Optional[Union[int, EmptyType]] = Empty
"""The maximum number of TCP keepalive probes that will be sent before dropping a connection."""

# Other optional parameters
load_balance_hosts: Optional[Union[str, EmptyType]] = Empty
"""Controls the order in which the client tries to connect to the available hosts and addresses ('disable' or 'random')."""
conn_recycling_method: Optional[Union[str, EmptyType]] = Empty
"""How a connection is recycled."""
ssl_mode: Optional[Union[str, EmptyType]] = Empty
"""SSL mode."""
ca_file: Optional[Union[str, EmptyType]] = Empty
"""Path to ca_file for SSL."""
target_session_attrs: Optional[Union[str, EmptyType]] = Empty
"""Specifies requirements of the session (e.g., 'read-write')."""
options: Optional[Union[str, EmptyType]] = Empty
"""Command line options used to configure the server."""
application_name: Optional[Union[str, EmptyType]] = Empty
"""Sets the application_name parameter on the server."""


@dataclass
class PsqlpyConfig(AsyncDatabaseConfig[Connection, ConnectionPool, PsqlpyDriver]):
"""Configuration for psqlpy database connections, managing a connection pool.

This configuration class wraps `PsqlpyPoolConfig` and manages the lifecycle
of a `psqlpy.ConnectionPool`.
"""

pool_config: Optional[PsqlpyPoolConfig] = field(default=None)
"""Psqlpy Pool configuration"""
driver_type: type[PsqlpyDriver] = field(default=PsqlpyDriver, init=False, hash=False)
"""Type of the driver object"""
connection_type: type[Connection] = field(default=Connection, init=False, hash=False)
"""Type of the connection object"""
pool_instance: Optional[ConnectionPool] = field(default=None, hash=False)
"""The connection pool instance. If set, this will be used instead of creating a new pool."""

@property
def connection_config_dict(self) -> "dict[str, Any]":
"""Return the minimal connection configuration as a dict for standalone use.

Returns:
A string keyed dict of config kwargs for a psqlpy.Connection.

Raises:
ImproperConfigurationError: If essential connection parameters are missing.
"""
if self.pool_config:
# Exclude pool-specific keys and internal metadata
pool_specific_keys = {
"max_db_pool_size",
"load_balance_hosts",
"conn_recycling_method",
"pool_instance",
"connection_type",
"driver_type",
}
return dataclass_to_dict(
self.pool_config,
exclude_empty=True,
convert_nested=False,
exclude_none=True,
exclude=pool_specific_keys,
)
msg = "You must provide a 'pool_config' for this adapter."
raise ImproperConfigurationError(msg)

@property
def pool_config_dict(self) -> "dict[str, Any]":
"""Return the pool configuration as a dict.

Raises:
ImproperConfigurationError: If no pool_config is provided but a pool_instance

Returns:
A string keyed dict of config kwargs for creating a psqlpy pool.
"""
if self.pool_config:
# Extract the config from the pool_config
return dataclass_to_dict(
self.pool_config,
exclude_empty=True,
convert_nested=False,
exclude_none=True,
exclude={"pool_instance", "connection_type", "driver_type"},
)

msg = "'pool_config' methods can not be used when a 'pool_instance' is provided."
raise ImproperConfigurationError(msg)

async def create_pool(self) -> "ConnectionPool":
"""Return a pool. If none exists yet, create one.

Ensures that the pool is initialized and returns the instance.

Returns:
The pool instance used by the plugin.

Raises:
ImproperConfigurationError: If the pool could not be configured.
"""
if self.pool_instance is not None:
return self.pool_instance

if self.pool_config is None:
msg = "One of 'pool_config' or 'pool_instance' must be provided."
raise ImproperConfigurationError(msg)

# pool_config is guaranteed to exist due to __post_init__
try:
# psqlpy ConnectionPool doesn't have an explicit async connect/startup method
# It creates connections on demand.
self.pool_instance = ConnectionPool(**self.pool_config_dict)
except Exception as e:
msg = f"Could not configure the 'pool_instance'. Error: {e!s}. Please check your configuration."
raise ImproperConfigurationError(msg) from e

return self.pool_instance

def provide_pool(self, *args: "Any", **kwargs: "Any") -> "Awaitable[ConnectionPool]":
"""Create or return the pool instance.

Returns:
An awaitable resolving to the Pool instance.
"""

async def _create() -> "ConnectionPool":
return await self.create_pool()

return _create()

def create_connection(self) -> "Awaitable[Connection]":
"""Create and return a new, standalone psqlpy connection using the configured parameters.

Note: This method is not supported by the psqlpy adapter as connection
creation is primarily handled via the ConnectionPool.
Use `provide_connection` or `provide_session` for pooled connections.

Returns:
An awaitable that resolves to a new Connection instance.

Raises:
NotImplementedError: This method is not implemented for psqlpy.
"""

async def _create() -> "Connection":
# psqlpy does not seem to offer a public API for creating
# standalone async connections easily outside the pool context.
msg = (
"Creating standalone connections is not directly supported by the psqlpy adapter. "
"Please use the pool via `provide_connection` or `provide_session`."
)
raise NotImplementedError(msg)

return _create()

@asynccontextmanager
async def provide_connection(self, *args: "Any", **kwargs: "Any") -> "AsyncGenerator[Connection, None]":
"""Acquire a connection from the pool.

Yields:
A connection instance managed by the pool.
"""
db_pool = await self.provide_pool(*args, **kwargs)
async with db_pool.acquire() as conn:
yield conn

def close_pool(self) -> None:
"""Close the connection pool."""
if self.pool_instance is not None:
# psqlpy pool close is synchronous
self.pool_instance.close()
self.pool_instance = None

@asynccontextmanager
async def provide_session(self, *args: Any, **kwargs: Any) -> "AsyncGenerator[PsqlpyDriver, None]":
"""Create and provide a database session using a pooled connection.

Yields:
A Psqlpy driver instance wrapping a pooled connection.
"""
async with self.provide_connection(*args, **kwargs) as connection:
yield self.driver_type(connection)
Loading