Skip to content

Commit 9039719

Browse files
authored
Merge branch 'main' into catalog-fixes
2 parents b51c060 + ff20533 commit 9039719

File tree

17 files changed

+566
-310
lines changed

17 files changed

+566
-310
lines changed

.github/workflows/python_pytest.yml

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,6 @@ jobs:
4848
--durations=5 --exitfirst
4949
-m "not slow and not requires_creds and not linting and not flaky"
5050
51-
- name: Run Pytest with Coverage (Flaky Tests Only)
52-
timeout-minutes: 60
53-
continue-on-error: true
54-
env:
55-
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
56-
run: >
57-
poetry run coverage run -m pytest
58-
--durations=5 --exitfirst
59-
-m "flaky and not slow and not requires_creds"
60-
6151
- name: Print Coverage Report
6252
if: always()
6353
run: poetry run coverage report
@@ -75,6 +65,13 @@ jobs:
7565
name: fasttest-coverage
7666
path: htmlcov/
7767

68+
- name: Upload logs to GitHub Artifacts
69+
if: failure()
70+
uses: actions/upload-artifact@v4
71+
with:
72+
name: pytest-fast-test-logs
73+
path: /tmp/airbyte/logs/
74+
7875
pytest-no-creds:
7976
name: Pytest (No Creds)
8077
runs-on: ubuntu-latest
@@ -122,6 +119,13 @@ jobs:
122119
name: nocreds-test-coverage
123120
path: htmlcov/
124121

122+
- name: Upload logs to GitHub Artifacts
123+
if: failure()
124+
uses: actions/upload-artifact@v4
125+
with:
126+
name: pytest-no-creds-test-logs
127+
path: /tmp/airbyte/logs/
128+
125129
pytest:
126130
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
127131
# Don't run on forks. Run on pushes to main, and on PRs that are not from forks.
@@ -133,7 +137,7 @@ jobs:
133137
python-version: [
134138
'3.10',
135139
'3.11',
136-
'3.12',
140+
# '3.12', # Commented out: Certain tests don't work in Python 3.12, although PyAirbyte itself does work on 3.12
137141
]
138142
os: [
139143
Ubuntu,
@@ -189,6 +193,13 @@ jobs:
189193
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-coverage
190194
path: htmlcov/
191195

196+
- name: Upload logs to GitHub Artifacts
197+
if: failure()
198+
uses: actions/upload-artifact@v4
199+
with:
200+
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-logs
201+
path: /tmp/airbyte/logs/
202+
192203
dependency-analysis:
193204
name: Dependency Analysis with Deptry
194205
runs-on: ubuntu-latest

.github/workflows/semantic_pr_check.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,13 @@ jobs:
4747
# scopes: |
4848
# core
4949
# ui
50+
51+
- name: Check for "do not merge" in PR title
52+
if: ${{ github.event.pull_request.draft == false }}
53+
uses: actions/github-script@v6
54+
with:
55+
script: |
56+
const title = context.payload.pull_request.title.toLowerCase();
57+
if (title.includes('do not merge') || title.includes('do-not-merge')) {
58+
core.setFailed('PR title contains "do not merge" or "do-not-merge". Please remove this before merging.');
59+
}

airbyte/_executors/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,14 @@ def _cli(self) -> list[str]:
185185
"""
186186
...
187187

188+
def map_cli_args(self, args: list[str]) -> list[str]:
189+
"""Map CLI args if needed.
190+
191+
By default, this is a no-op. Subclasses may override this method in order to
192+
map CLI args into the format expected by the connector.
193+
"""
194+
return args
195+
188196
def execute(
189197
self,
190198
args: list[str],
@@ -195,8 +203,9 @@ def execute(
195203
196204
If stdin is provided, it will be passed to the subprocess as STDIN.
197205
"""
206+
mapped_args = self.map_cli_args(args)
198207
with _stream_from_subprocess(
199-
[*self._cli, *args],
208+
[*self._cli, *mapped_args],
200209
stdin=stdin,
201210
) as stream_lines:
202211
yield from stream_lines

airbyte/_executors/declarative.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919

2020
if TYPE_CHECKING:
21+
from argparse import Namespace
2122
from collections.abc import Iterator
2223

2324
from airbyte._message_iterators import AirbyteMessageIterator
@@ -102,7 +103,9 @@ def execute(
102103
"""Execute the declarative source."""
103104
_ = stdin # Not used
104105
source_entrypoint = AirbyteEntrypoint(self.declarative_source)
105-
parsed_args = source_entrypoint.parse_args(args)
106+
107+
mapped_args: list[str] = self.map_cli_args(args)
108+
parsed_args: Namespace = source_entrypoint.parse_args(mapped_args)
106109
yield from source_entrypoint.run(parsed_args)
107110

108111
def ensure_installation(self, *, auto_fix: bool = True) -> None:

airbyte/_executors/docker.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,33 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4+
import logging
45
import shutil
6+
from pathlib import Path
57
from typing import NoReturn
68

79
from airbyte import exceptions as exc
810
from airbyte._executors.base import Executor
911

1012

13+
logger = logging.getLogger("airbyte")
14+
15+
16+
DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR = "/airbyte/tmp"
17+
"""Default temp dir in an Airbyte connector's Docker image."""
18+
19+
1120
class DockerExecutor(Executor):
1221
def __init__(
1322
self,
1423
name: str | None = None,
1524
*,
1625
executable: list[str],
1726
target_version: str | None = None,
27+
volumes: dict[Path, str] | None = None,
1828
) -> None:
1929
self.executable: list[str] = executable
30+
self.volumes: dict[Path, str] = volumes or {}
2031
name = name or executable[0]
2132
super().__init__(name=name, target_version=target_version)
2233

@@ -56,3 +67,40 @@ def uninstall(self) -> NoReturn:
5667
def _cli(self) -> list[str]:
5768
"""Get the base args of the CLI executable."""
5869
return self.executable
70+
71+
def map_cli_args(self, args: list[str]) -> list[str]:
72+
"""Map local file paths to the container's volume paths."""
73+
new_args = []
74+
for arg in args:
75+
if Path(arg).exists():
76+
# This is a file path and we need to map it to the same file within the
77+
# relative path of the file within the container's volume.
78+
for local_volume, container_path in self.volumes.items():
79+
if Path(arg).is_relative_to(local_volume):
80+
logger.debug(
81+
f"Found file input path `{arg}` "
82+
f"relative to container-mapped volume: {local_volume}"
83+
)
84+
mapped_path = Path(container_path) / Path(arg).relative_to(local_volume)
85+
logger.debug(f"Mapping `{arg}` -> `{mapped_path}`")
86+
new_args.append(str(mapped_path))
87+
break
88+
else:
89+
# No break reached; a volume was found for this file path
90+
logger.warning(
91+
f"File path `{arg}` is not relative to any volume path "
92+
f"in the provided volume mappings: {self.volumes}. "
93+
"The file may not be available to the container at runtime."
94+
)
95+
new_args.append(arg)
96+
97+
else:
98+
new_args.append(arg)
99+
100+
if args != new_args:
101+
logger.debug(
102+
f"Mapping local-to-container CLI args: {args} -> {new_args} "
103+
f"based upon volume definitions: {self.volumes}"
104+
)
105+
106+
return new_args

airbyte/_executors/util.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from airbyte import exceptions as exc
1313
from airbyte._executors.declarative import DeclarativeExecutor
14-
from airbyte._executors.docker import DockerExecutor
14+
from airbyte._executors.docker import DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR, DockerExecutor
1515
from airbyte._executors.local import PathExecutor
1616
from airbyte._executors.python import VenvExecutor
1717
from airbyte._util.meta import which
@@ -42,8 +42,8 @@ def _try_get_source_manifest(
4242
4343
Raises:
4444
- `PyAirbyteInputError`: If `source_name` is `None`.
45-
- `HTTPError`: If fetching the URL was unsuccessful.
46-
- `YAMLError`: If parsing the YAML failed.
45+
- `AirbyteConnectorInstallationError`: If the registry file cannot be downloaded or if the
46+
manifest YAML cannot be parsed.
4747
"""
4848
if source_name is None:
4949
raise exc.PyAirbyteInputError(
@@ -62,7 +62,16 @@ def _try_get_source_manifest(
6262
url=manifest_url,
6363
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
6464
)
65-
response.raise_for_status() # Raise HTTPError exception if the download failed
65+
try:
66+
response.raise_for_status() # Raise HTTPError exception if the download failed
67+
except requests.exceptions.HTTPError as ex:
68+
raise exc.AirbyteConnectorInstallationError(
69+
message="Failed to download the connector manifest.",
70+
context={
71+
"manifest_url": manifest_url,
72+
},
73+
) from ex
74+
6675
try:
6776
return cast("dict", yaml.safe_load(response.text))
6877
except yaml.YAMLError as ex:
@@ -115,7 +124,7 @@ def _get_local_executor(
115124
)
116125

117126

118-
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements
127+
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arguments/statements
119128
name: str,
120129
*,
121130
version: str | None = None,
@@ -217,21 +226,24 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
217226
if ":" not in docker_image:
218227
docker_image = f"{docker_image}:{version or 'latest'}"
219228

220-
temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
229+
host_temp_dir = TEMP_DIR_OVERRIDE or Path(tempfile.gettempdir())
230+
container_temp_dir = DEFAULT_AIRBYTE_CONTAINER_TEMP_DIR
221231

222232
local_mount_dir = Path().absolute() / name
223233
local_mount_dir.mkdir(exist_ok=True)
224234

235+
volumes = {
236+
local_mount_dir: "/local",
237+
host_temp_dir: container_temp_dir,
238+
}
225239
docker_cmd = [
226240
"docker",
227241
"run",
228242
"--rm",
229243
"-i",
230-
"--volume",
231-
f"{local_mount_dir}:/local/",
232-
"--volume",
233-
f"{temp_dir}:{temp_dir}",
234244
]
245+
for local_dir, container_dir in volumes.items():
246+
docker_cmd.extend(["--volume", f"{local_dir}:{container_dir}"])
235247

236248
if use_host_network is True:
237249
docker_cmd.extend(["--network", "host"])
@@ -241,6 +253,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branch
241253
return DockerExecutor(
242254
name=name,
243255
executable=docker_cmd,
256+
volumes=volumes,
244257
)
245258

246259
if source_manifest:

airbyte/_util/temp_files.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import json
7+
import stat
78
import tempfile
89
import time
910
import warnings
@@ -36,6 +37,9 @@ def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any,
3637
json.dumps(content) if isinstance(content, dict) else content,
3738
)
3839
temp_file.flush()
40+
# Grant "read" permission to all users
41+
Path(temp_file.name).chmod(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
42+
3943
# Don't close the file yet (breaks Windows)
4044
# temp_file.close()
4145
temp_files.append(temp_file)

airbyte/caches/base.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from __future__ import annotations
55

66
from pathlib import Path
7-
from typing import IO, TYPE_CHECKING, Any, ClassVar, final
7+
from typing import IO, TYPE_CHECKING, Any, ClassVar, Literal, final
88

99
import pandas as pd
1010
import pyarrow as pa
@@ -34,6 +34,7 @@
3434
from airbyte.shared.sql_processor import SqlProcessorBase
3535
from airbyte.shared.state_providers import StateProviderBase
3636
from airbyte.shared.state_writers import StateWriterBase
37+
from airbyte.sources.base import Source
3738
from airbyte.strategies import WriteStrategy
3839

3940

@@ -293,6 +294,35 @@ def register_source(
293294
incoming_stream_names=stream_names,
294295
)
295296

297+
def create_source_tables(
298+
self,
299+
source: Source,
300+
streams: Literal["*"] | list[str] | None = None,
301+
) -> None:
302+
"""Create tables in the cache for the provided source if they do not exist already.
303+
304+
Tables are created based upon the Source's catalog.
305+
306+
Args:
307+
source: The source to create tables for.
308+
streams: Stream names to create tables for. If None, use the Source's selected_streams
309+
or "*" if neither is set. If "*", all available streams will be used.
310+
"""
311+
if streams is None:
312+
streams = source.get_selected_streams() or "*"
313+
314+
catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
315+
316+
# Ensure schema exists
317+
self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member
318+
319+
# Create tables for each stream if they don't exist
320+
for stream_name in catalog_provider.stream_names:
321+
self.processor._ensure_final_table_exists( # noqa: SLF001
322+
stream_name=stream_name,
323+
create_if_missing=True,
324+
)
325+
296326
def __getitem__(self, stream: str) -> CachedDataset:
297327
"""Return a dataset by stream name."""
298328
return self.streams[stream]

airbyte/constants.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,18 @@ def _str_to_bool(value: str) -> bool:
109109
This setting helps you make informed choices about data privacy and operation in restricted and
110110
air-gapped environments.
111111
"""
112+
113+
AIRBYTE_PRINT_FULL_ERROR_LOGS: bool = _str_to_bool(
114+
os.getenv(
115+
key="AIRBYTE_PRINT_FULL_ERROR_LOGS",
116+
default=os.getenv("CI", "false"),
117+
)
118+
)
119+
"""Whether to print full error logs when an error occurs.
120+
This setting helps in debugging by providing detailed logs when errors occur. This is especially
121+
helpful in ephemeral environments like CI/CD pipelines where log files may not be persisted after
122+
the pipeline run.
123+
124+
If not set, the default value is `False` for non-CI environments.
125+
If running in a CI environment ("CI" env var is set), then the default value is `True`.
126+
"""

0 commit comments

Comments
 (0)