Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
13 commits
Select commit Hold shift + click to select a range
c9285f8
feat(destinations): add universal destination using PyAirbyte caches
devin-ai-integration[bot] Jan 29, 2026
dbe9884
refactor: move universal destination to airbyte/cli/universal_connect…
devin-ai-integration[bot] Jan 29, 2026
561093b
feat(sources): add universal source connector using PyAirbyte
devin-ai-integration[bot] Jan 29, 2026
9e19587
fix: add spec() method to destination and fix source read() for CLI p…
devin-ai-integration[bot] Jan 29, 2026
4e326cd
ci: add TK-TODO comment for STATE handling and import TK-TODO CI check
devin-ai-integration[bot] Jan 29, 2026
bc64448
Merge remote-tracking branch 'origin/main' into devin/1769653021-dest…
devin-ai-integration[bot] Feb 26, 2026
ab6ed52
fix: add resource cleanup (try/finally) to check() and write() methods
devin-ai-integration[bot] Feb 26, 2026
74dbb59
fix: use record's original emitted_at timestamp instead of current time
devin-ai-integration[bot] Feb 26, 2026
f405021
refactor: rename CLI entrypoints to destination-smoke-test and source…
devin-ai-integration[bot] Feb 26, 2026
a3ebe6e
fix: update Dockerfile entrypoint to destination-smoke-test
devin-ai-integration[bot] Feb 26, 2026
ef338c4
revert: restore CLI entrypoints to destination-pyairbyte/source-pyair…
devin-ai-integration[bot] Feb 26, 2026
60c502b
feat(connectors): add smoke test source for destination regression te…
aaronsteers Feb 26, 2026
271673c
docs: add experimental warning to universal connector modules
devin-ai-integration[bot] Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""CLI modules for PyAirbyte."""
11 changes: 11 additions & 0 deletions airbyte/cli/universal_connector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73

WORKDIR /airbyte/integration_code

COPY pyproject.toml uv.lock ./
COPY airbyte ./airbyte

RUN pip install --no-cache-dir .

ENV AIRBYTE_ENTRYPOINT="destination-pyairbyte"
ENTRYPOINT ["destination-pyairbyte"]
11 changes: 11 additions & 0 deletions airbyte/cli/universal_connector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Universal connectors using PyAirbyte as backends."""

from airbyte.cli.universal_connector.destination import DestinationPyAirbyteUniversal
from airbyte.cli.universal_connector.source import SourcePyAirbyteUniversal


__all__ = [
"DestinationPyAirbyteUniversal",
"SourcePyAirbyteUniversal",
]
320 changes: 320 additions & 0 deletions airbyte/cli/universal_connector/destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Universal destination implementation using PyAirbyte caches."""

from __future__ import annotations

import datetime
import json
import logging
import uuid
from collections import defaultdict
from typing import TYPE_CHECKING, Any

from sqlalchemy import text

from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
DestinationSyncMode,
Status,
Type,
)

from airbyte.caches.bigquery import BigQueryCache
from airbyte.caches.duckdb import DuckDBCache
from airbyte.caches.motherduck import MotherDuckCache
from airbyte.caches.postgres import PostgresCache
from airbyte.caches.snowflake import SnowflakeCache
from airbyte.secrets.base import SecretString


if TYPE_CHECKING:
from collections.abc import Iterable, Mapping

from sqlalchemy.engine import Engine

from airbyte.caches.base import CacheBase


logger = logging.getLogger("airbyte")


DESTINATION_TYPE_DUCKDB = "duckdb"
DESTINATION_TYPE_POSTGRES = "postgres"
DESTINATION_TYPE_SNOWFLAKE = "snowflake"
DESTINATION_TYPE_BIGQUERY = "bigquery"
DESTINATION_TYPE_MOTHERDUCK = "motherduck"


class DestinationPyAirbyteUniversal(Destination):
"""Universal destination that writes to any PyAirbyte-supported cache backend."""

def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002
"""Return the connector specification."""
return ConnectorSpecification(
documentationUrl="https://docs.airbyte.com/integrations/destinations/pyairbyte-universal",
connectionSpecification={
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "PyAirbyte Universal Destination Spec",
"type": "object",
"required": ["destination_type"],
"properties": {
"destination_type": {
"type": "string",
"title": "Destination Type",
"description": "The type of destination to write to.",
"enum": ["duckdb", "postgres", "snowflake", "bigquery", "motherduck"],
},
"duckdb": {
"type": "object",
"title": "DuckDB Configuration",
"properties": {
"db_path": {"type": "string", "default": "/local/pyairbyte.duckdb"},
"schema_name": {"type": "string", "default": "main"},
},
},
"postgres": {
"type": "object",
"title": "PostgreSQL Configuration",
"properties": {
"host": {"type": "string", "default": "localhost"},
"port": {"type": "integer", "default": 5432},
"username": {"type": "string"},
"password": {"type": "string", "airbyte_secret": True},
"database": {"type": "string"},
"schema_name": {"type": "string", "default": "public"},
},
},
"snowflake": {
"type": "object",
"title": "Snowflake Configuration",
"properties": {
"account": {"type": "string"},
"username": {"type": "string"},
"password": {"type": "string", "airbyte_secret": True},
"warehouse": {"type": "string"},
"database": {"type": "string"},
"schema_name": {"type": "string", "default": "PUBLIC"},
"role": {"type": "string"},
},
},
"bigquery": {
"type": "object",
"title": "BigQuery Configuration",
"properties": {
"project_name": {"type": "string"},
"dataset_name": {"type": "string"},
"credentials_path": {"type": "string"},
},
},
"motherduck": {
"type": "object",
"title": "MotherDuck Configuration",
"properties": {
"database": {"type": "string", "default": "my_db"},
"schema_name": {"type": "string", "default": "main"},
"api_key": {"type": "string", "airbyte_secret": True},
},
},
},
},
)

def _get_cache(self, config: Mapping[str, Any]) -> CacheBase:
"""Create and return the appropriate cache based on configuration."""
destination_type = config.get("destination_type")

if destination_type == DESTINATION_TYPE_DUCKDB:
duckdb_config = config.get("duckdb", {})
return DuckDBCache(
db_path=duckdb_config.get("db_path", "/local/pyairbyte.duckdb"),
schema_name=duckdb_config.get("schema_name", "main"),
)

if destination_type == DESTINATION_TYPE_POSTGRES:
pg_config = config.get("postgres", {})
return PostgresCache(
host=pg_config.get("host", "localhost"),
port=pg_config.get("port", 5432),
username=pg_config.get("username", "postgres"),
password=SecretString(pg_config.get("password", "")),
database=pg_config.get("database", "postgres"),
schema_name=pg_config.get("schema_name", "public"),
)

if destination_type == DESTINATION_TYPE_SNOWFLAKE:
sf_config = config.get("snowflake", {})
password = sf_config.get("password")
return SnowflakeCache(
account=sf_config.get("account", ""),
username=sf_config.get("username", ""),
password=SecretString(password) if password else None,
warehouse=sf_config.get("warehouse", ""),
database=sf_config.get("database", ""),
schema_name=sf_config.get("schema_name", "PUBLIC"),
role=sf_config.get("role", ""),
)
Comment on lines +153 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the SnowflakeCache class definition
find . -type f -name "*.py" | xargs rg "class SnowflakeCache" -A 50

Repository: airbytehq/PyAirbyte

Length of output: 1430


🏁 Script executed:

# Also check if there's any validation of these parameters
rg "SnowflakeCache" --type py -B 2 -A 10 | head -100

Repository: airbytehq/PyAirbyte

Length of output: 7486


🏁 Script executed:

# Search for snowflake cache files
fd "snowflake" --type f --extension py | grep -i cache

Repository: airbytehq/PyAirbyte

Length of output: 90


🏁 Script executed:

# Find SnowflakeConfig definition
rg "class SnowflakeConfig" -A 50 airbyte/caches/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there's validation in SnowflakeConfig or SnowflakeCache
rg -A 5 -B 5 "validator\|root_validator\|field_validator" airbyte/caches/snowflake.py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full SnowflakeConfig definition to understand defaults
cat -n airbyte/caches/snowflake.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3232


🏁 Script executed:

# Find the SnowflakeConfig definition
find . -path "*/_processors/sql/snowflake.py" -type f

Repository: airbytehq/PyAirbyte

Length of output: 101


🏁 Script executed:

# Search for SnowflakeConfig class
rg "class SnowflakeConfig" -A 100 --type py

Repository: airbytehq/PyAirbyte

Length of output: 7591


🏁 Script executed:

# Check SqlConfig for any validation
rg "class SqlConfig" -A 30 airbyte/_processors/sql/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full _get_cache method in destination.py for context
rg -B 10 -A 40 "def _get_cache" airbyte/cli/universal_connector/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 2399


Add early validation to reject empty strings for required Snowflake fields, wdyt?

The constructor accepts all these parameters, but passing empty strings for required fields like account, username, warehouse, database, and role will slip through validation and only fail at runtime during connection. Consider adding validation in SnowflakeConfig to catch these misconfigurations early with clear error messages.

🤖 Prompt for AI Agents
In `@airbyte/cli/universal_connector/destination.py` around lines 76 - 87, The
code currently constructs a SnowflakeCache when destination_type ==
DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings for required fields;
add early validation to fail fast by checking required Snowflake fields
(account, username, warehouse, database, role) are non-empty before constructing
SnowflakeCache (or implement the same checks in SnowflakeConfig/SnowflakeCache
initializer/constructor), and raise a clear ValueError indicating which field is
missing; preserve the existing SecretString wrapping for password and include
that validation logic near the SnowflakeCache creation site or inside
SnowflakeCache/SnowflakeConfig (functions/classes to touch: destination_type
check, SnowflakeCache constructor, or SnowflakeConfig class) so
misconfigurations are caught immediately with explicit error messages.


if destination_type == DESTINATION_TYPE_BIGQUERY:
bq_config = config.get("bigquery", {})
return BigQueryCache(
project_name=bq_config.get("project_name", ""),
dataset_name=bq_config.get("dataset_name", ""),
credentials_path=bq_config.get("credentials_path"),
)

if destination_type == DESTINATION_TYPE_MOTHERDUCK:
md_config = config.get("motherduck", {})
return MotherDuckCache(
database=md_config.get("database", "my_db"),
schema_name=md_config.get("schema_name", "main"),
api_key=SecretString(md_config.get("api_key", "")),
)

raise ValueError(f"Unsupported destination type: {destination_type}")

def check(
self,
logger: logging.Logger, # noqa: ARG002
config: Mapping[str, Any],
) -> AirbyteConnectionStatus:
"""Test the connection to the destination."""
try:
cache = self._get_cache(config)
engine = cache.get_sql_engine()
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Connection failed: {e!r}"
)

def write(
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage],
) -> Iterable[AirbyteMessage]:
"""Write data to the destination using PyAirbyte cache.

This method processes messages in a streaming fashion, buffering records
and flushing on state messages to ensure fault tolerance.
"""
cache = self._get_cache(config)
streams = {s.stream.name for s in configured_catalog.streams}
schema_name = cache.schema_name

logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams")

# Get SQL engine and ensure schema exists
engine = cache.get_sql_engine()
with engine.connect() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create tables for each stream
for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"

with engine.connect() as conn:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
logger.info(f"Dropping table for overwrite: {table_name}")
conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]
Comment on lines +241 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Unquoted SQL identifiers cause syntax errors for stream names with special characters

Stream names and schema names are interpolated directly into raw SQL strings without any identifier quoting. When a stream name contains characters that are not valid in unquoted SQL identifiers (e.g., dashes, dots, spaces, or SQL keywords), the generated SQL will fail with a syntax error or, worse, enable SQL injection.

Root Cause and Impact

The write method constructs SQL statements by f-string interpolation of schema_name and table_name (which is f"_airbyte_raw_{stream_name}") at multiple locations:

  • destination.py:221: CREATE SCHEMA IF NOT EXISTS {schema_name}
  • destination.py:232: DROP TABLE IF EXISTS {schema_name}.{table_name}
  • destination.py:237: CREATE TABLE IF NOT EXISTS {schema_name}.{table_name}
  • destination.py:319: INSERT INTO {schema_name}.{table_name}

For example, a stream named my-stream produces _airbyte_raw_my-stream, and the SQL CREATE TABLE IF NOT EXISTS main._airbyte_raw_my-stream (...) will be a syntax error because - is interpreted as the minus operator.

Similarly, a stream name containing '; DROP TABLE users; -- could lead to SQL injection.

Notably, the companion smoke test source defines scenarios like "CamelCaseStreamName" which would work (unquoted identifiers are case-insensitive but still syntactically valid), but any source with stream names containing dashes, dots, or spaces would break this destination entirely.

Impact: The destination will crash with SQL errors for any stream whose name contains characters not valid in unquoted SQL identifiers. This includes common stream naming patterns like my-table, schema.table, or table name.

Prompt for agents
In airbyte/cli/universal_connector/destination.py, all SQL identifiers (schema_name and table_name) must be properly quoted before interpolation into SQL strings. This affects lines 221, 232, 237-243, and 318-322.

One approach: add a helper method to quote identifiers, e.g.:

    def _quote_identifier(self, name: str) -> str:
        escaped = name.replace('"', '""')
        return f'"{ escaped}"'

Then use it everywhere identifiers are interpolated:
- Line 221: text(f"CREATE SCHEMA IF NOT EXISTS {self._quote_identifier(schema_name)}")
- Line 232: text(f"DROP TABLE IF EXISTS {self._quote_identifier(schema_name)}.{self._quote_identifier(table_name)}")
- Lines 237-243: Same pattern for CREATE TABLE
- Lines 318-322: Same pattern for INSERT INTO in _flush_buffer method

This needs to be applied consistently across all 4 SQL construction sites in the file.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


# Create the raw table if needed
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id VARCHAR(36) PRIMARY KEY,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data JSON
)
"""
conn.execute(text(create_sql))
conn.commit() # pyrefly: ignore[missing-attribute]

# Buffer for records
buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))

for message in input_messages:
if message.type == Type.STATE:
# Flush the buffer before yielding state
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)
buffer = defaultdict(lambda: defaultdict(list))
yield message

elif message.type == Type.RECORD:
record = message.record
if record is None:
continue
stream_name = record.stream
if stream_name not in streams:
logger.debug(f"Stream {stream_name} not in configured streams, skipping")
continue

# Add to buffer
buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4()))
buffer[stream_name]["_airbyte_emitted_at"].append(
datetime.datetime.now(datetime.timezone.utc).isoformat()
)
buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))

else:
logger.debug(f"Message type {message.type} not handled, skipping")

# Flush any remaining records
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)

# Close the cache
cache.close()

def _flush_buffer(
self,
*,
engine: Engine,
buffer: dict[str, dict[str, list[Any]]],
schema_name: str,
stream_name: str,
) -> None:
"""Flush buffered records to the database."""
if not buffer[stream_name]["_airbyte_ab_id"]:
return

table_name = f"_airbyte_raw_{stream_name}"
entries = buffer[stream_name]

logger.info(f"Flushing {len(entries['_airbyte_ab_id'])} records to {table_name}")

with engine.connect() as conn:
for i in range(len(entries["_airbyte_ab_id"])):
insert_sql = text(f"""
INSERT INTO {schema_name}.{table_name}
(_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
VALUES (:ab_id, :emitted_at, :data)
""")
conn.execute(
insert_sql,
{
"ab_id": entries["_airbyte_ab_id"][i],
"emitted_at": entries["_airbyte_emitted_at"][i],
"data": entries["_airbyte_data"][i],
},
)
conn.commit() # pyrefly: ignore[missing-attribute]
15 changes: 15 additions & 0 deletions airbyte/cli/universal_connector/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Entry point for the PyAirbyte Universal destination."""

import sys

from airbyte.cli.universal_connector import DestinationPyAirbyteUniversal


def run() -> None:
"""Run the destination."""
DestinationPyAirbyteUniversal().run(sys.argv[1:])


if __name__ == "__main__":
run()
17 changes: 17 additions & 0 deletions airbyte/cli/universal_connector/run_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Entry point for the PyAirbyte Universal source."""

import sys

from airbyte_cdk.entrypoint import launch

from airbyte.cli.universal_connector import SourcePyAirbyteUniversal


def run() -> None:
"""Run the source."""
launch(SourcePyAirbyteUniversal(), sys.argv[1:])


if __name__ == "__main__":
run()
Loading
Loading