Skip to content

Commit 87e97e5

Browse files
authored
Merge branch 'main' into pedro/add-manifest-runner
2 parents ef43855 + cd48741 commit 87e97e5

File tree

66 files changed

+1396
-1384
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1396
-1384
lines changed

.github/pr-welcome-internal.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Airbyte Maintainers can execute the following slash commands on your PR:
2626
- `/autofix` - Fixes most formatting and linting issues
2727
- `/poetry-lock` - Updates poetry.lock file
2828
- `/test` - Runs connector tests with the updated CDK
29+
- `/poe build` - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
2930
- `/poe <command>` - Runs any poe command in the CDK environment
3031

3132
[📝 _Edit this welcome message._](https://github.com/airbytehq/airbyte-python-cdk/blob/main/.github/pr-welcome-internal.md)

airbyte_cdk/cli/airbyte_cdk/_connector.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,18 @@ def connector_cli_group() -> None:
123123
multiple=True,
124124
help="Additional argument(s) to pass to pytest. Can be specified multiple times.",
125125
)
126+
@click.option(
127+
"--no-creds",
128+
is_flag=True,
129+
default=False,
130+
help="Skip tests that require credentials (marked with 'requires_creds').",
131+
)
126132
def connector_test(
127133
connector: str | Path | None = None,
128134
*,
129135
collect_only: bool = False,
130136
pytest_args: list[str] | None = None,
137+
no_creds: bool = False,
131138
) -> None:
132139
"""Run connector tests.
133140
@@ -147,6 +154,9 @@ def connector_test(
147154
if collect_only:
148155
pytest_args.append("--collect-only")
149156

157+
if no_creds:
158+
pytest_args.extend(["-m", "not requires_creds"])
159+
150160
run_connector_tests(
151161
connector_name=connector_name,
152162
connector_directory=connector_directory,

airbyte_cdk/cli/airbyte_cdk/_image.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,17 @@ def build(
100100
"--image",
101101
help="Image to test, instead of building a new one.",
102102
)
103+
@click.option(
104+
"--no-creds",
105+
is_flag=True,
106+
default=False,
107+
help="Skip tests that require credentials (marked with 'requires_creds').",
108+
)
103109
def image_test( # "image test" command
104110
connector: str | None = None,
105111
*,
106112
image: str | None = None,
113+
no_creds: bool = False,
107114
) -> None:
108115
"""Test a connector Docker image.
109116
@@ -124,7 +131,11 @@ def image_test( # "image test" command
124131
connector_name, connector_directory = resolve_connector_name_and_directory(connector)
125132

126133
# Select only tests with the 'image_tests' mark
127-
pytest_args = ["-m", "image_tests"]
134+
pytest_filter = "image_tests"
135+
if no_creds:
136+
pytest_filter += " and not requires_creds"
137+
138+
pytest_args = ["-m", pytest_filter]
128139
if not image:
129140
metadata_file_path: Path = connector_directory / "metadata.yaml"
130141
try:

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ def read_stream(
108108
stream_name = configured_catalog.streams[0].stream.name
109109

110110
stream_read = test_read_handler.run_test_read(
111-
source, config, configured_catalog, state, limits.max_records
111+
source,
112+
config,
113+
configured_catalog,
114+
stream_name,
115+
state,
116+
limits.max_records,
112117
)
113118

114119
return AirbyteMessage(

airbyte_cdk/connector_builder/test_reader/helpers.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,37 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
269269
return at_least_one_page_in_group and should_process_slice_descriptor(message)
270270

271271

272+
def is_page_http_request_for_different_stream(
273+
json_message: Optional[Dict[str, Any]], stream_name: str
274+
) -> bool:
275+
"""
276+
Determines whether a given JSON message represents a page HTTP request for a different stream.
277+
278+
This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
279+
different from the provided stream name.
280+
281+
This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
282+
when they do not match the stream that is being read.
283+
284+
Args:
285+
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
286+
stream_name (str): The name of the stream to compare against.
287+
288+
Returns:
289+
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
290+
"""
291+
if not json_message or not is_page_http_request(json_message):
292+
return False
293+
294+
message_stream_name: str | None = (
295+
json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None)
296+
)
297+
if message_stream_name is None:
298+
return False
299+
300+
return message_stream_name != stream_name
301+
302+
272303
def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
273304
"""
274305
Determines whether a given JSON message represents a page HTTP request.

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
is_async_auxiliary_request,
2929
is_config_update_message,
3030
is_log_message,
31+
is_page_http_request_for_different_stream,
3132
is_record_message,
3233
is_state_message,
3334
is_trace_with_error,
@@ -44,6 +45,7 @@ def get_message_groups(
4445
schema_inferrer: SchemaInferrer,
4546
datetime_format_inferrer: DatetimeFormatInferrer,
4647
limit: int,
48+
stream_name: str,
4749
) -> MESSAGE_GROUPS:
4850
"""
4951
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
@@ -96,6 +98,9 @@ def get_message_groups(
9698
while records_count < limit and (message := next(messages, None)):
9799
json_message = airbyte_message_to_json(message)
98100

101+
if is_page_http_request_for_different_stream(json_message, stream_name):
102+
continue
103+
99104
if should_close_page(at_least_one_page_in_group, message, json_message):
100105
current_page_request, current_page_response = handle_current_page(
101106
current_page_request,

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def run_test_read(
8686
source: DeclarativeSource,
8787
config: Mapping[str, Any],
8888
configured_catalog: ConfiguredAirbyteCatalog,
89+
stream_name: str,
8990
state: List[AirbyteStateMessage],
9091
record_limit: Optional[int] = None,
9192
) -> StreamRead:
@@ -112,14 +113,21 @@ def run_test_read(
112113

113114
record_limit = self._check_record_limit(record_limit)
114115
# The connector builder currently only supports reading from a single stream at a time
115-
stream = source.streams(config)[0]
116+
streams = source.streams(config)
117+
stream = next((stream for stream in streams if stream.name == stream_name), None)
116118

117119
# get any deprecation warnings during the component creation
118120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
119121

120122
schema_inferrer = SchemaInferrer(
121-
self._pk_to_nested_and_composite_field(stream.primary_key),
122-
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
123+
self._pk_to_nested_and_composite_field(
124+
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
125+
)
126+
if stream
127+
else None,
128+
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
129+
if stream
130+
else None,
123131
)
124132
datetime_format_inferrer = DatetimeFormatInferrer()
125133

@@ -128,6 +136,7 @@ def run_test_read(
128136
schema_inferrer,
129137
datetime_format_inferrer,
130138
record_limit,
139+
stream_name,
131140
)
132141

133142
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(

airbyte_cdk/entrypoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from airbyte_cdk.connector import TConfig
2424
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
25-
from airbyte_cdk.logger import PRINT_BUFFER, init_logger
25+
from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled
2626
from airbyte_cdk.models import (
2727
AirbyteConnectionStatus,
2828
AirbyteMessage,
@@ -158,7 +158,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
158158
if not cmd:
159159
raise Exception("No command passed")
160160

161-
if hasattr(parsed_args, "debug") and parsed_args.debug:
161+
if (hasattr(parsed_args, "debug") and parsed_args.debug) or is_platform_debug_log_enabled():
162162
self.logger.setLevel(logging.DEBUG)
163163
logger.setLevel(logging.DEBUG)
164164
self.logger.debug("Debug logs enabled")

airbyte_cdk/logger.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
54
import json
65
import logging
76
import logging.config
7+
import os
88
from typing import Any, Callable, Mapping, Optional, Tuple
99

1010
import orjson
@@ -40,6 +40,10 @@
4040
}
4141

4242

43+
def is_platform_debug_log_enabled() -> bool:
44+
return os.environ.get("LOG_LEVEL", "info").lower() == "debug"
45+
46+
4347
def init_logger(name: Optional[str] = None) -> logging.Logger:
4448
"""Initial set up of logger"""
4549
logger = logging.getLogger(name)
@@ -73,8 +77,22 @@ def format(self, record: logging.LogRecord) -> str:
7377
airbyte_level = self.level_mapping.get(record.levelno, "INFO")
7478
if airbyte_level == Level.DEBUG:
7579
extras = self.extract_extra_args_from_record(record)
76-
debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras}
77-
return filter_secrets(json.dumps(debug_dict))
80+
if is_platform_debug_log_enabled():
81+
# We have a different behavior between debug logs enabled through `--debug` argument and debug logs
82+
# enabled through environment variable. The reason is that for platform logs, we need to have these
83+
# printed as AirbyteMessage which is not the case with the current previous implementation.
84+
# Why not migrate both to AirbyteMessages then? AirbyteMessages do not support having structured logs.
85+
# which means that the DX would be degraded compared to the current solution (devs will need to identify
86+
# the `log.message` field and figure out where in this field is the response while the current solution
87+
# have a specific field that is structured for extras.
88+
message = f"{filter_secrets(record.getMessage())} ///\nExtra logs: {filter_secrets(json.dumps(extras))}"
89+
log_message = AirbyteMessage(
90+
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
91+
)
92+
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()
93+
else:
94+
debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras}
95+
return filter_secrets(json.dumps(debug_dict))
7896
else:
7997
message = super().format(record)
8098
message = filter_secrets(message)

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
#
44

55
import logging
6-
import traceback
76
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple, Union
98

10-
from airbyte_cdk import AbstractSource
9+
from airbyte_cdk.sources.abstract_source import AbstractSource
10+
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

@@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3435
def check_connection(
3536
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3637
) -> Tuple[bool, Any]:
37-
streams = source.streams(config=config)
38+
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
3839

3940
if len(streams) == 0:
4041
return False, f"No streams to connect to from source {source}"
4142
if not self.use_check_availability:
4243
return True, None
4344

44-
availability_strategy = HttpAvailabilityStrategy()
45-
4645
try:
4746
for stream in streams[: min(self.stream_count, len(streams))]:
48-
stream_is_available, reason = availability_strategy.check_availability(
49-
stream, logger
50-
)
47+
stream_is_available, reason = evaluate_availability(stream, logger)
5148
if not stream_is_available:
5249
logger.warning(f"Stream {stream.name} is not available: {reason}")
5350
return False, reason

0 commit comments

Comments
 (0)