Skip to content

Commit 4de5793

Browse files
authored
Feat: Add experimental support for low-code source execution via manifest YAML (#175)
1 parent 68da09d commit 4de5793

File tree

15 files changed

+828
-144
lines changed

15 files changed

+828
-144
lines changed

airbyte/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
documents,
1717
exceptions, # noqa: ICN001 # No 'exc' alias for top-level module
1818
experimental,
19+
records,
1920
results,
2021
secrets,
2122
sources,

airbyte/_processors/sql/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
from airbyte._processors.sql import snowflakecortex
67
from airbyte._processors.sql.snowflakecortex import (
78
SnowflakeCortexSqlProcessor,
89
SnowflakeCortexTypeConverter,

airbyte/caches/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from airbyte.caches import bigquery, duckdb, motherduck, postgres, snowflake, util
6+
from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util
77
from airbyte.caches.base import CacheBase
88
from airbyte.caches.bigquery import BigQueryCache
99
from airbyte.caches.duckdb import DuckDBCache

airbyte/sources/declarative.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
"""Support for declarative yaml source testing."""
3+
4+
from __future__ import annotations
5+
6+
import json
7+
from pathlib import Path
8+
from typing import TYPE_CHECKING, cast
9+
10+
from airbyte_cdk.entrypoint import AirbyteEntrypoint
11+
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
12+
13+
from airbyte._executor import Executor
14+
from airbyte.exceptions import PyAirbyteInternalError
15+
from airbyte.sources.base import Source
16+
17+
18+
if TYPE_CHECKING:
19+
from collections.abc import Iterator
20+
21+
22+
class DeclarativeExecutor(Executor):
23+
"""An executor for declarative sources."""
24+
25+
def __init__(
26+
self,
27+
manifest: str | dict | Path,
28+
) -> None:
29+
"""Initialize a declarative executor.
30+
31+
- If `manifest` is a path, it will be read as a json file.
32+
- If `manifest` is a string, it will be parsed as an HTTP path.
33+
- If `manifest` is a dict, it will be used as is.
34+
"""
35+
self._manifest_dict: dict
36+
if isinstance(manifest, Path):
37+
self._manifest_dict = cast(dict, json.loads(manifest.read_text()))
38+
39+
elif isinstance(manifest, str):
40+
# TODO: Implement HTTP path parsing
41+
raise NotImplementedError("HTTP path parsing is not yet implemented.")
42+
43+
elif isinstance(manifest, dict):
44+
self._manifest_dict = manifest
45+
46+
if not isinstance(self._manifest_dict, dict):
47+
raise PyAirbyteInternalError(message="Manifest must be a dict.")
48+
49+
self.declarative_source = ManifestDeclarativeSource(source_config=self._manifest_dict)
50+
self.reported_version: str | None = None # TODO: Consider adding version detection
51+
52+
def execute(self, args: list[str]) -> Iterator[str]:
53+
"""Execute the declarative source."""
54+
source_entrypoint = AirbyteEntrypoint(self.declarative_source)
55+
parsed_args = source_entrypoint.parse_args(args)
56+
yield from source_entrypoint.run(parsed_args)
57+
58+
def ensure_installation(self, *, auto_fix: bool = True) -> None:
59+
"""No-op. The declarative source is included with PyAirbyte."""
60+
_ = auto_fix
61+
pass
62+
63+
def install(self) -> None:
64+
"""No-op. The declarative source is included with PyAirbyte."""
65+
pass
66+
67+
def uninstall(self) -> None:
68+
"""No-op. The declarative source is included with PyAirbyte."""
69+
pass
70+
71+
72+
class DeclarativeSource(Source):
73+
"""A declarative source using Airbyte's Yaml low-code/no-code framework."""
74+
75+
def __init__(
76+
self,
77+
manifest: str | dict | Path,
78+
) -> None:
79+
"""Initialize a declarative source.
80+
81+
Sample usages:
82+
```python
83+
manifest_path = "path/to/manifest.yaml"
84+
85+
source_a = DeclarativeSource(manifest=Path(manifest_path))
86+
source_b = DeclarativeSource(manifest=Path(manifest_path).read_text())
87+
source_c = DeclarativeSource(manifest=yaml.load(Path(manifest_path).read_text()))
88+
```
89+
90+
Args:
91+
manifest: The manifest for the declarative source. This can be a path to a yaml file, a
92+
yaml string, or a dict.
93+
"""
94+
# TODO: Conform manifest to a dict or str (TBD)
95+
self.manifest = manifest
96+
97+
# Initialize the source using the base class implementation
98+
super().__init__(
99+
name="Declarative", # TODO: Get name from manifest
100+
config={ # TODO: Put 'real' config here
101+
"manifest": manifest,
102+
},
103+
executor=DeclarativeExecutor(manifest),
104+
)

airbyte/sources/registry.py

Lines changed: 151 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
import json
55
import os
6+
import warnings
67
from copy import copy
78
from dataclasses import dataclass
9+
from enum import Enum
810
from pathlib import Path
911

1012
import requests
@@ -19,6 +21,89 @@
1921
_REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
2022
_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
2123

24+
_LOWCODE_LABEL = "cdk:low-code"
25+
26+
_LOWCODE_CONNECTORS_NEEDING_PYTHON = [
27+
"source-alpha-vantage",
28+
"source-amplitude",
29+
"source-apify-dataset",
30+
"source-avni",
31+
"source-bamboo-hr",
32+
"source-braintree",
33+
"source-braze",
34+
"source-chargebee",
35+
"source-close-com",
36+
"source-commercetools",
37+
"source-facebook-pages",
38+
"source-fastbill",
39+
"source-freshdesk",
40+
"source-gitlab",
41+
"source-gnews",
42+
"source-greenhouse",
43+
"source-instatus",
44+
"source-intercom",
45+
"source-iterable",
46+
"source-jira",
47+
"source-klaviyo",
48+
"source-mailchimp",
49+
"source-mixpanel",
50+
"source-monday",
51+
"source-my-hours",
52+
"source-notion",
53+
"source-okta",
54+
"source-outreach",
55+
"source-partnerstack",
56+
"source-paypal-transaction",
57+
"source-pinterest",
58+
"source-pipedrive",
59+
"source-pocket",
60+
"source-posthog",
61+
"source-prestashop",
62+
"source-public-apis",
63+
"source-qualaroo",
64+
"source-quickbooks",
65+
"source-railz",
66+
"source-recharge",
67+
"source-retently",
68+
"source-rss",
69+
"source-slack",
70+
"source-surveymonkey",
71+
"source-the-guardian-api",
72+
"source-trello",
73+
"source-typeform",
74+
"source-xero",
75+
"source-younium",
76+
"source-zendesk-chat",
77+
"source-zendesk-sunshine",
78+
"source-zendesk-support",
79+
"source-zendesk-talk",
80+
"source-zenloop",
81+
"source-zoom",
82+
]
83+
_LOWCODE_CONNECTORS_FAILING_VALIDATION = [
84+
"source-amazon-ads",
85+
]
86+
_LOWCODE_CONNECTORS_404 = [
87+
"source-unleash",
88+
]
89+
_LOWCODE_CONNECTORS_EXCLUDED: list[str] = [
90+
*_LOWCODE_CONNECTORS_FAILING_VALIDATION,
91+
*_LOWCODE_CONNECTORS_404,
92+
*_LOWCODE_CONNECTORS_NEEDING_PYTHON,
93+
]
94+
95+
96+
class InstallType(str, Enum):
97+
YAML = "yaml"
98+
PYTHON = "python"
99+
DOCKER = "docker"
100+
JAVA = "java"
101+
102+
103+
class Language(str, Enum):
104+
PYTHON = InstallType.PYTHON.value
105+
JAVA = InstallType.JAVA.value
106+
22107

23108
@dataclass
24109
class ConnectorMetadata:
@@ -33,6 +118,12 @@ class ConnectorMetadata:
33118
pypi_package_name: str | None
34119
"""The name of the PyPI package for the connector, if it exists."""
35120

121+
language: Language | None
122+
"""The language of the connector."""
123+
124+
install_types: set[InstallType]
125+
"""The supported install types for the connector."""
126+
36127

37128
def _get_registry_url() -> str:
38129
if _REGISTRY_ENV_VAR in os.environ:
@@ -43,14 +134,36 @@ def _get_registry_url() -> str:
43134

44135
def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
45136
name = entry["dockerRepository"].replace("airbyte/", "")
137+
language: Language | None = None
138+
if "language" in entry and entry["language"] is not None:
139+
try:
140+
language = Language(entry["language"])
141+
except Exception:
142+
warnings.warn(
143+
message=f"Invalid language for connector {name}: {entry['language']}",
144+
stacklevel=2,
145+
)
46146
remote_registries: dict = entry.get("remoteRegistries", {})
47147
pypi_registry: dict = remote_registries.get("pypi", {})
48148
pypi_package_name: str = pypi_registry.get("packageName", None)
49149
pypi_enabled: bool = pypi_registry.get("enabled", False)
150+
install_types: set[InstallType] = {
151+
x
152+
for x in [
153+
InstallType.DOCKER if entry.get("dockerImageTag") else None,
154+
InstallType.PYTHON if pypi_enabled else None,
155+
InstallType.JAVA if language == Language.JAVA else None,
156+
InstallType.YAML if _LOWCODE_LABEL in entry.get("tags", []) else None,
157+
]
158+
if x
159+
}
160+
50161
return ConnectorMetadata(
51162
name=name,
52-
latest_available_version=entry["dockerImageTag"],
163+
latest_available_version=entry.get("dockerImageTag", None),
53164
pypi_package_name=pypi_package_name if pypi_enabled else None,
165+
language=language,
166+
install_types=install_types,
54167
)
55168

56169

@@ -114,11 +227,45 @@ def get_connector_metadata(name: str) -> ConnectorMetadata:
114227
return cache[name]
115228

116229

117-
def get_available_connectors() -> list[str]:
230+
def get_available_connectors(install_type: InstallType | str = InstallType.PYTHON) -> list[str]:
118231
"""Return a list of all available connectors.
119232
120233
Connectors will be returned in alphabetical order, with the standard prefix "source-".
121234
"""
122-
return sorted(
123-
conn.name for conn in _get_registry_cache().values() if conn.pypi_package_name is not None
235+
if not isinstance(install_type, InstallType):
236+
install_type = InstallType(install_type)
237+
238+
if install_type == InstallType.PYTHON:
239+
return sorted(
240+
conn.name
241+
for conn in _get_registry_cache().values()
242+
if conn.pypi_package_name is not None
243+
)
244+
245+
if install_type == InstallType.JAVA:
246+
warnings.warn(
247+
message="Java connectors are not yet supported.",
248+
stacklevel=2,
249+
)
250+
return sorted(
251+
conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
252+
)
253+
254+
if install_type == InstallType.DOCKER:
255+
return sorted(conn.name for conn in _get_registry_cache().values())
256+
257+
if install_type == InstallType.YAML:
258+
return sorted(
259+
conn.name
260+
for conn in _get_registry_cache().values()
261+
if InstallType.YAML in conn.install_types
262+
and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
263+
)
264+
265+
# pragma: no cover # Should never be reached.
266+
raise exc.PyAirbyteInputError(
267+
message="Invalid install type.",
268+
context={
269+
"install_type": install_type,
270+
},
124271
)

0 commit comments

Comments
 (0)