Skip to content

Commit 3d48eb0

Browse files
Merge branch 'main' into daryna/low-code/refactor-model-to-component-factory
2 parents 1b02cdc + 1c9049a commit 3d48eb0

27 files changed

+238
-317
lines changed

.github/pr-welcome-internal.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Airbyte Maintainers can execute the following slash commands on your PR:
2626
- `/autofix` - Fixes most formatting and linting issues
2727
- `/poetry-lock` - Updates poetry.lock file
2828
- `/test` - Runs connector tests with the updated CDK
29+
- `/poe build` - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
2930
- `/poe <command>` - Runs any poe command in the CDK environment
3031

3132
[📝 _Edit this welcome message._](https://github.com/airbytehq/airbyte-python-cdk/blob/main/.github/pr-welcome-internal.md)

airbyte_cdk/cli/airbyte_cdk/_connector.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,18 @@ def connector_cli_group() -> None:
123123
multiple=True,
124124
help="Additional argument(s) to pass to pytest. Can be specified multiple times.",
125125
)
126+
@click.option(
127+
"--no-creds",
128+
is_flag=True,
129+
default=False,
130+
help="Skip tests that require credentials (marked with 'requires_creds').",
131+
)
126132
def connector_test(
127133
connector: str | Path | None = None,
128134
*,
129135
collect_only: bool = False,
130136
pytest_args: list[str] | None = None,
137+
no_creds: bool = False,
131138
) -> None:
132139
"""Run connector tests.
133140
@@ -147,6 +154,9 @@ def connector_test(
147154
if collect_only:
148155
pytest_args.append("--collect-only")
149156

157+
if no_creds:
158+
pytest_args.extend(["-m", "not requires_creds"])
159+
150160
run_connector_tests(
151161
connector_name=connector_name,
152162
connector_directory=connector_directory,

airbyte_cdk/cli/airbyte_cdk/_image.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,17 @@ def build(
100100
"--image",
101101
help="Image to test, instead of building a new one.",
102102
)
103+
@click.option(
104+
"--no-creds",
105+
is_flag=True,
106+
default=False,
107+
help="Skip tests that require credentials (marked with 'requires_creds').",
108+
)
103109
def image_test( # "image test" command
104110
connector: str | None = None,
105111
*,
106112
image: str | None = None,
113+
no_creds: bool = False,
107114
) -> None:
108115
"""Test a connector Docker image.
109116
@@ -124,7 +131,11 @@ def image_test( # "image test" command
124131
connector_name, connector_directory = resolve_connector_name_and_directory(connector)
125132

126133
# Select only tests with the 'image_tests' mark
127-
pytest_args = ["-m", "image_tests"]
134+
pytest_filter = "image_tests"
135+
if no_creds:
136+
pytest_filter += " and not requires_creds"
137+
138+
pytest_args = ["-m", pytest_filter]
128139
if not image:
129140
metadata_file_path: Path = connector_directory / "metadata.yaml"
130141
try:

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
import logging
6-
import traceback
76
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple, Union
98

10-
from airbyte_cdk import AbstractSource
9+
from airbyte_cdk.sources.abstract_source import AbstractSource
10+
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

@@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3435
def check_connection(
3536
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3637
) -> Tuple[bool, Any]:
37-
streams = source.streams(config=config)
38+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
3839

3940
if len(streams) == 0:
4041
return False, f"No streams to connect to from source {source}"
4142
if not self.use_check_availability:
4243
return True, None
4344

44-
availability_strategy = HttpAvailabilityStrategy()
45-
4645
try:
4746
for stream in streams[: min(self.stream_count, len(streams))]:
48-
stream_is_available, reason = availability_strategy.check_availability(
49-
stream, logger
50-
)
47+
stream_is_available, reason = evaluate_availability(stream, logger)
5148
if not stream_is_available:
5249
logger.warning(f"Stream {stream.name} is not available: {reason}")
5350
return False, reason

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,30 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, Dict, List, Mapping, Optional, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
99

10-
from airbyte_cdk import AbstractSource
10+
from airbyte_cdk.sources.abstract_source import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
13+
from airbyte_cdk.sources.streams.core import Stream
1214
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1315

1416

17+
def evaluate_availability(
18+
stream: Union[Stream, AbstractStream], logger: logging.Logger
19+
) -> Tuple[bool, Optional[str]]:
20+
"""
21+
As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream.
22+
"""
23+
if isinstance(stream, Stream):
24+
return HttpAvailabilityStrategy().check_availability(stream, logger)
25+
elif isinstance(stream, AbstractStream):
26+
availability = stream.check_availability()
27+
return availability.is_available, availability.reason
28+
else:
29+
raise ValueError(f"Unsupported stream type {type(stream)}")
30+
31+
1532
@dataclass(frozen=True)
1633
class DynamicStreamCheckConfig:
1734
"""Defines the configuration for dynamic stream during connection checking. This class specifies
@@ -51,7 +68,7 @@ def check_connection(
5168
) -> Tuple[bool, Any]:
5269
"""Checks the connection to the source and its streams."""
5370
try:
54-
streams = source.streams(config=config)
71+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
5572
if not streams:
5673
return False, f"No streams to connect to from source {source}"
5774
except Exception as error:
@@ -82,13 +99,15 @@ def check_connection(
8299
return True, None
83100

84101
def _check_stream_availability(
85-
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
102+
self,
103+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
104+
stream_name: str,
105+
logger: logging.Logger,
86106
) -> Tuple[bool, Any]:
87107
"""Checks if streams are available."""
88-
availability_strategy = HttpAvailabilityStrategy()
89108
try:
90109
stream = stream_name_to_stream[stream_name]
91-
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
110+
stream_is_available, reason = evaluate_availability(stream, logger)
92111
if not stream_is_available:
93112
message = f"Stream {stream_name} is not available: {reason}"
94113
logger.warning(message)
@@ -98,7 +117,10 @@ def _check_stream_availability(
98117
return True, None
99118

100119
def _check_dynamic_streams_availability(
101-
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
120+
self,
121+
source: AbstractSource,
122+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
123+
logger: logging.Logger,
102124
) -> Tuple[bool, Any]:
103125
"""Checks the availability of dynamic streams."""
104126
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
@@ -135,18 +157,15 @@ def _map_generated_streams(
135157
def _check_generated_streams_availability(
136158
self,
137159
generated_streams: List[Dict[str, Any]],
138-
stream_name_to_stream: Dict[str, Any],
160+
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
139161
logger: logging.Logger,
140162
max_count: int,
141163
) -> Tuple[bool, Any]:
142164
"""Checks availability of generated dynamic streams."""
143-
availability_strategy = HttpAvailabilityStrategy()
144165
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
145166
stream = stream_name_to_stream[declarative_stream["name"]]
146167
try:
147-
stream_is_available, reason = availability_strategy.check_availability(
148-
stream, logger
149-
)
168+
stream_is_available, reason = evaluate_availability(stream, logger)
150169
if not stream_is_available:
151170
message = f"Dynamic Stream {stream.name} is not available: {reason}"
152171
logger.warning(message)

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@
5252
from airbyte_cdk.sources.streams import Stream
5353
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
5454
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
55-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
56-
AlwaysAvailableAvailabilityStrategy,
57-
)
5855
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
5956
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
6057
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
@@ -325,7 +322,6 @@ def _group_streams(
325322
partition_generator=partition_generator,
326323
name=declarative_stream.name,
327324
json_schema=declarative_stream.get_json_schema(),
328-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
329325
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
330326
cursor_field=cursor.cursor_field.cursor_field_key
331327
if hasattr(cursor, "cursor_field")
@@ -362,7 +358,6 @@ def _group_streams(
362358
partition_generator=partition_generator,
363359
name=declarative_stream.name,
364360
json_schema=declarative_stream.get_json_schema(),
365-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
366361
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
367362
cursor_field=None,
368363
logger=self.logger,
@@ -417,7 +412,6 @@ def _group_streams(
417412
partition_generator=partition_generator,
418413
name=declarative_stream.name,
419414
json_schema=declarative_stream.get_json_schema(),
420-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
421415
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
422416
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
423417
logger=self.logger,

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,7 @@ definitions:
18381838
"$ref": "#/definitions/RequestOption"
18391839
page_token_option:
18401840
title: Inject Page Token Into Outgoing HTTP Request
1841+
description: Inject the page token into the outgoing HTTP requests by inserting it into either the request URL path or a field on the request.
18411842
anyOf:
18421843
- "$ref": "#/definitions/RequestOption"
18431844
- "$ref": "#/definitions/RequestPath"
@@ -3486,7 +3487,7 @@ definitions:
34863487
- [["content", "html"], ["content", "plain_text"]]
34873488
RequestPath:
34883489
title: Request Path
3489-
description: Specifies where in the request path a component's value should be inserted.
3490+
description: The URL path to be used for the HTTP request.
34903491
type: object
34913492
required:
34923493
- type

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2002,7 +2002,9 @@ class DefaultPaginator(BaseModel):
20022002
None, title="Inject Page Size Into Outgoing HTTP Request"
20032003
)
20042004
page_token_option: Optional[Union[RequestOption, RequestPath]] = Field(
2005-
None, title="Inject Page Token Into Outgoing HTTP Request"
2005+
None,
2006+
description="Inject the page token into the outgoing HTTP requests by inserting it into either the request URL path or a field on the request.",
2007+
title="Inject Page Token Into Outgoing HTTP Request",
20062008
)
20072009
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20082010

airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,13 @@ def _get_url(
168168
next_page_token=next_page_token,
169169
)
170170

171-
full_url = self._join_url(url_base, path) if url_base else url + path if path else url
171+
full_url = (
172+
self._join_url(url_base, path)
173+
if url_base
174+
else self._join_url(url, path)
175+
if path
176+
else url
177+
)
172178

173179
return full_url
174180

airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,7 @@ def path(
159159
) -> Optional[str]:
160160
token = next_page_token.get("next_page_token") if next_page_token else None
161161
if token and self.page_token_option and isinstance(self.page_token_option, RequestPath):
162-
# make additional interpolation context
163-
interpolation_context = get_interpolation_context(
164-
stream_state=stream_state,
165-
stream_slice=stream_slice,
166-
next_page_token=next_page_token,
167-
)
168-
# Replace url base to only return the path
169-
return str(token).replace(self.url_base.eval(self.config, **interpolation_context), "") # type: ignore # url_base is casted to a InterpolatedString in __post_init__
162+
return str(token)
170163
else:
171164
return None
172165

0 commit comments

Comments
 (0)