Skip to content

Commit f05baf1

Browse files
authored
Fix: Updated handling for Manifest-only connectors; Breaks: Remove Python 3.9 support (#340)
1 parent 39fe9a3 commit f05baf1

20 files changed

+649
-484
lines changed

.github/workflows/autofix.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
- name: Set up Python
2222
uses: actions/setup-python@v5
2323
with:
24-
python-version: 3.9
24+
python-version: 3.10
2525
cache: 'poetry'
2626

2727
- name: Install dependencies

.github/workflows/python_pytest.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ jobs:
131131
strategy:
132132
matrix:
133133
python-version: [
134-
'3.9',
135134
'3.10',
136135
'3.11',
137136
]

.github/workflows/test-pr-command.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ jobs:
5454
strategy:
5555
matrix:
5656
python-version: [
57-
'3.9',
5857
'3.10',
5958
'3.11',
6059
]

airbyte/_executors/declarative.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
from typing import IO, TYPE_CHECKING, cast
1010

1111
import pydantic
12+
import yaml
1213

1314
from airbyte_cdk.entrypoint import AirbyteEntrypoint
1415
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
1516

17+
from airbyte import exceptions as exc
1618
from airbyte._executors.base import Executor
17-
from airbyte.exceptions import PyAirbyteInternalError
1819

1920

2021
if TYPE_CHECKING:
@@ -40,6 +41,7 @@ class DeclarativeExecutor(Executor):
4041

4142
def __init__(
4243
self,
44+
name: str,
4345
manifest: dict | Path,
4446
) -> None:
4547
"""Initialize a declarative executor.
@@ -49,22 +51,36 @@ def __init__(
4951
- If `manifest` is a dict, it will be used as is.
5052
"""
5153
_suppress_cdk_pydantic_deprecation_warnings()
54+
55+
self.name = name
5256
self._manifest_dict: dict
5357
if isinstance(manifest, Path):
54-
self._manifest_dict = cast(dict, json.loads(manifest.read_text()))
58+
self._manifest_dict = cast(dict, yaml.safe_load(manifest.read_text()))
5559

5660
elif isinstance(manifest, dict):
5761
self._manifest_dict = manifest
5862

59-
if not isinstance(self._manifest_dict, dict):
60-
raise PyAirbyteInternalError(message="Manifest must be a dict.")
61-
63+
self._validate_manifest(self._manifest_dict)
6264
self.declarative_source = ManifestDeclarativeSource(source_config=self._manifest_dict)
6365

6466
# TODO: Consider adding version detection
6567
# https://github.com/airbytehq/airbyte/issues/318
6668
self.reported_version: str | None = None
6769

70+
def _validate_manifest(self, manifest_dict: dict) -> None:
71+
"""Validate the manifest."""
72+
manifest_text = json.dumps(manifest_dict)
73+
if "class_name:" in manifest_text:
74+
raise exc.AirbyteConnectorInstallationError(
75+
message=(
76+
"The provided manifest requires additional code files (`class_name` key "
77+
"detected). This feature is not compatible with the declarative YAML "
78+
"executor. To use this executor, please try again with the Python "
79+
"executor."
80+
),
81+
connector_name=self.name,
82+
)
83+
6884
@property
6985
def _cli(self) -> list[str]:
7086
"""Not applicable."""

airbyte/_executors/util.py

Lines changed: 68 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
import shutil
55
import sys
66
import tempfile
7-
from json import JSONDecodeError
87
from pathlib import Path
98
from typing import TYPE_CHECKING, cast
109

1110
import requests
1211
import yaml
12+
from requests import HTTPError
1313
from rich import print
1414

1515
from airbyte import exceptions as exc
@@ -25,6 +25,58 @@
2525
from airbyte._executors.base import Executor
2626

2727

28+
def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict:
29+
"""Try to get a source manifest from a URL.
30+
31+
If the URL is not provided, we'll try a couple of default URLs.
32+
We can remove/refactor this once manifests are available in GCS connector registry.
33+
"""
34+
if manifest_url:
35+
response = requests.get(url=manifest_url)
36+
response.raise_for_status() # Raise HTTPError exception if the download failed
37+
try:
38+
return cast(dict, yaml.safe_load(response.text))
39+
except yaml.YAMLError as ex:
40+
raise exc.AirbyteConnectorInstallationError(
41+
message="Failed to parse the connector manifest YAML.",
42+
connector_name=source_name,
43+
context={
44+
"manifest_url": manifest_url,
45+
},
46+
) from ex
47+
48+
# No manifest URL was provided. We'll try a couple of default URLs.
49+
50+
try:
51+
# First try the new URL format (language='manifest-only'):
52+
result_1 = _try_get_source_manifest(
53+
source_name=source_name,
54+
manifest_url=(
55+
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
56+
f"/connectors/{source_name}/manifest.yaml"
57+
),
58+
)
59+
except HTTPError as ex_1:
60+
# If the new URL path was not found, try the old URL format (language='low-code'):
61+
try:
62+
result_2 = _try_get_source_manifest(
63+
source_name=source_name,
64+
manifest_url=(
65+
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
66+
f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml"
67+
),
68+
)
69+
except HTTPError:
70+
# Raise the first exception, since that represents the new default URL
71+
raise ex_1 from None
72+
else:
73+
# Old URL path was found (no exceptions raised).
74+
return result_2
75+
else:
76+
# New URL path was found (no exceptions raised).
77+
return result_1
78+
79+
2880
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too complex
2981
name: str,
3082
*,
@@ -143,49 +195,23 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too complex
143195
)
144196

145197
if source_manifest:
146-
if source_manifest is True:
147-
# Auto-set the manifest to a valid http address URL string
148-
source_manifest = (
149-
"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
150-
f"/connectors/{name}/{name.replace('-', '_')}/manifest.yaml"
198+
if isinstance(source_manifest, (dict, Path)):
199+
return DeclarativeExecutor(
200+
name=name,
201+
manifest=source_manifest,
151202
)
152-
if isinstance(source_manifest, str):
153-
print("Installing connector from YAML manifest:", source_manifest)
154-
# Download the manifest file
155-
response = requests.get(url=source_manifest)
156-
response.raise_for_status() # Raise an exception if the download failed
157-
158-
if "class_name:" in response.text:
159-
raise exc.AirbyteConnectorInstallationError(
160-
message=(
161-
"The provided manifest requires additional code files (`class_name` key "
162-
"detected). This feature is not compatible with the declarative YAML "
163-
"executor. To use this executor, please try again with the Python "
164-
"executor."
165-
),
166-
connector_name=name,
167-
context={
168-
"manifest_url": source_manifest,
169-
},
170-
)
171203

172-
try:
173-
source_manifest = cast(dict, yaml.safe_load(response.text))
174-
except JSONDecodeError as ex:
175-
raise exc.AirbyteConnectorInstallationError(
176-
connector_name=name,
177-
context={
178-
"manifest_url": source_manifest,
179-
},
180-
) from ex
181-
182-
if isinstance(source_manifest, Path):
183-
source_manifest = cast(dict, yaml.safe_load(source_manifest.read_text()))
184-
185-
# Source manifest is a dict at this point
186-
return DeclarativeExecutor(
187-
manifest=source_manifest,
188-
)
204+
if isinstance(source_manifest, (str, bool)):
205+
# Source manifest is either a URL or a boolean (True)
206+
source_manifest = _try_get_source_manifest(
207+
source_name=name,
208+
manifest_url=None if source_manifest is True else source_manifest,
209+
)
210+
211+
return DeclarativeExecutor(
212+
name=name,
213+
manifest=source_manifest,
214+
)
189215

190216
# else: we are installing a connector in a Python virtual environment:
191217

airbyte/secrets/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
1. Environment variables.
1212
2. Variables defined in a local `.env` ("Dotenv") file.
1313
3. [Google Colab secrets](https://medium.com/@parthdasawant/how-to-use-secrets-in-google-colab-450c38e3ec75).
14-
4. Manual entry via [`getpass`](https://docs.python.org/3.9/library/getpass.html).
14+
4. Manual entry via [`getpass`](https://docs.python.org/3.10/library/getpass.html).
1515
1616
**Note:** You can also build your own secret manager by subclassing the `CustomSecretManager`
1717
implementation. For more information, see the `airbyte.secrets.CustomSecretManager` reference docs.

airbyte/sources/registry.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
_REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
2525

2626
_LOWCODE_LABEL = "cdk:low-code"
27+
_MANIFEST_ONLY_LABEL = "cdk:manifest-only"
2728

2829
_LOWCODE_CONNECTORS_NEEDING_PYTHON: list[str] = [
2930
"source-adjust",
@@ -169,7 +170,10 @@ def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
169170
InstallType.DOCKER if entry.get("dockerImageTag") else None,
170171
InstallType.PYTHON if pypi_enabled else None,
171172
InstallType.JAVA if language == Language.JAVA else None,
172-
InstallType.YAML if _LOWCODE_LABEL in entry.get("tags", []) else None,
173+
InstallType.YAML
174+
if _LOWCODE_LABEL in entry.get("tags", [])
175+
or _MANIFEST_ONLY_LABEL in entry.get("tags", [])
176+
else None,
173177
]
174178
if x
175179
}

examples/run_sync_to_destination_from_read_result.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
def get_my_source() -> ab.Source:
2121
return ab.get_source(
2222
"source-faker",
23-
local_executable="source-faker",
2423
config={
2524
"count": SCALE,
2625
"seed": 1234,
2726
"parallelism": 16,
2827
},
29-
install_if_missing=False,
3028
streams=["purchases"],
3129
)
3230

examples/run_sync_to_destination_w_cache.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
def get_my_source() -> ab.Source:
2121
return ab.get_source(
2222
"source-faker",
23-
local_executable="source-faker",
2423
config={
2524
"count": SCALE,
2625
"seed": 1234,
2726
"parallelism": 16,
2827
},
29-
install_if_missing=False,
3028
streams="*",
3129
)
3230

examples/run_sync_to_destination_wo_cache.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
def get_my_source() -> ab.Source:
2121
return ab.get_source(
2222
"source-faker",
23-
local_executable="source-faker",
2423
config={
2524
"count": SCALE,
2625
"seed": 1234,
2726
"parallelism": 16,
2827
},
29-
install_if_missing=False,
3028
streams=["purchases"],
3129
)
3230

0 commit comments

Comments
 (0)