Skip to content

Commit 16e186f

Browse files
committed
Merge branch 'main' into brian/merge_concurrent_declarative_source
2 parents 0a58453 + 2b07f93 commit 16e186f

25 files changed

+543
-631
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ def run_test_read(
122122
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
123123

124124
schema_inferrer = SchemaInferrer(
125-
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
126-
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125+
self._pk_to_nested_and_composite_field(
126+
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
127+
)
127128
if stream
128129
else None,
130+
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
131+
if stream and stream.cursor_field
132+
else None,
129133
)
130134
datetime_format_inferrer = DatetimeFormatInferrer()
131135

airbyte_cdk/entrypoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from airbyte_cdk.connector import TConfig
2424
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
25-
from airbyte_cdk.logger import PRINT_BUFFER, init_logger
25+
from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled
2626
from airbyte_cdk.models import (
2727
AirbyteConnectionStatus,
2828
AirbyteMessage,
@@ -158,7 +158,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
158158
if not cmd:
159159
raise Exception("No command passed")
160160

161-
if hasattr(parsed_args, "debug") and parsed_args.debug:
161+
if (hasattr(parsed_args, "debug") and parsed_args.debug) or is_platform_debug_log_enabled():
162162
self.logger.setLevel(logging.DEBUG)
163163
logger.setLevel(logging.DEBUG)
164164
self.logger.debug("Debug logs enabled")

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
1212

1313
import orjson
1414
import yaml
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
6767
from airbyte_cdk.sources.declarative.spec.spec import Spec
6868
from airbyte_cdk.sources.message import MessageRepository
69+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
6970
from airbyte_cdk.sources.streams.core import Stream
7071
from airbyte_cdk.sources.types import Config, ConnectionDefinition
7172
from airbyte_cdk.sources.utils.slice_logger import (
@@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
297298
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
298299
)
299300

300-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
301+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302+
"""
303+
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304+
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305+
fully decouple this from the AbstractSource.
306+
"""
301307
if self._spec_component:
302308
self._spec_component.validate_config(config)
303309

airbyte_cdk/logger.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
54
import json
65
import logging
76
import logging.config
7+
import os
88
from typing import Any, Callable, Mapping, Optional, Tuple
99

1010
import orjson
@@ -40,6 +40,10 @@
4040
}
4141

4242

43+
def is_platform_debug_log_enabled() -> bool:
44+
return os.environ.get("LOG_LEVEL", "info").lower() == "debug"
45+
46+
4347
def init_logger(name: Optional[str] = None) -> logging.Logger:
4448
"""Initial set up of logger"""
4549
logger = logging.getLogger(name)
@@ -73,8 +77,22 @@ def format(self, record: logging.LogRecord) -> str:
7377
airbyte_level = self.level_mapping.get(record.levelno, "INFO")
7478
if airbyte_level == Level.DEBUG:
7579
extras = self.extract_extra_args_from_record(record)
76-
debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras}
77-
return filter_secrets(json.dumps(debug_dict))
80+
if is_platform_debug_log_enabled():
81+
# We have a different behavior between debug logs enabled through `--debug` argument and debug logs
82+
# enabled through environment variable. The reason is that for platform logs, we need to have these
83+
# printed as AirbyteMessage which is not the case with the current previous implementation.
84+
# Why not migrate both to AirbyteMessages then? AirbyteMessages do not support having structured logs.
85+
# which means that the DX would be degraded compared to the current solution (devs will need to identify
86+
# the `log.message` field and figure out where in this field is the response while the current solution
87+
# have a specific field that is structured for extras.
88+
message = f"{filter_secrets(record.getMessage())} ///\nExtra logs: {filter_secrets(json.dumps(extras))}"
89+
log_message = AirbyteMessage(
90+
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
91+
)
92+
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()
93+
else:
94+
debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras}
95+
return filter_secrets(json.dumps(debug_dict))
7896
else:
7997
message = super().format(record)
8098
message = filter_secrets(message)

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44
import logging
5+
import os
56
from typing import Dict, Iterable, List, Optional, Set
67

78
from airbyte_cdk.exception_handler import generate_failed_streams_error_message
@@ -153,7 +154,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
153154
stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
154155
)
155156
self._record_counter[stream.name] += 1
156-
stream.cursor.observe(record)
157157
yield message
158158
yield from self._message_repository.consume_queue()
159159

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
45
import concurrent
56
import logging
67
from queue import Queue

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Optional,
2020
Set,
2121
Tuple,
22+
Union,
2223
)
2324

2425
import orjson
@@ -51,10 +52,6 @@
5152
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
5253
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
5354
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
54-
from airbyte_cdk.sources.declarative.extractors import RecordSelector
55-
from airbyte_cdk.sources.declarative.extractors.record_filter import (
56-
ClientSideIncrementalRecordFilterDecorator,
57-
)
5855
from airbyte_cdk.sources.declarative.incremental import (
5956
ConcurrentPerPartitionCursor,
6057
GlobalSubstreamCursor,
@@ -205,7 +202,6 @@ def __init__(
205202
# incremental streams running in full refresh.
206203
component_factory = ModelToComponentFactory(
207204
emit_connector_builder_messages=emit_connector_builder_messages,
208-
disable_resumable_full_refresh=True,
209205
message_repository=ConcurrentMessageRepository(queue, message_repository),
210206
connector_state_manager=self._connector_state_manager,
211207
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
@@ -459,7 +455,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
459455
]
460456
)
461457

462-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
458+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
463459
"""
464460
The `streams` method is used as part of the AbstractSource in the following cases:
465461
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
@@ -622,6 +618,10 @@ def _group_streams(
622618
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
623619
# so we need to treat them as synchronous
624620

621+
if isinstance(declarative_stream, AbstractStream):
622+
concurrent_streams.append(declarative_stream)
623+
continue
624+
625625
supports_file_transfer = (
626626
isinstance(declarative_stream, DeclarativeStream)
627627
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
@@ -691,7 +691,7 @@ def _group_streams(
691691
partition_generator = StreamSlicerPartitionGenerator(
692692
partition_factory=DeclarativePartitionFactory(
693693
stream_name=declarative_stream.name,
694-
json_schema=declarative_stream.get_json_schema(),
694+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
695695
retriever=retriever,
696696
message_repository=self._message_repository,
697697
max_records_limit=self._limits.max_records
@@ -728,7 +728,7 @@ def _group_streams(
728728
partition_generator = StreamSlicerPartitionGenerator(
729729
partition_factory=DeclarativePartitionFactory(
730730
stream_name=declarative_stream.name,
731-
json_schema=declarative_stream.get_json_schema(),
731+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
732732
retriever=retriever,
733733
message_repository=self._message_repository,
734734
max_records_limit=self._limits.max_records
@@ -762,7 +762,7 @@ def _group_streams(
762762
partition_generator = StreamSlicerPartitionGenerator(
763763
DeclarativePartitionFactory(
764764
stream_name=declarative_stream.name,
765-
json_schema=declarative_stream.get_json_schema(),
765+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
766766
retriever=declarative_stream.retriever,
767767
message_repository=self._message_repository,
768768
max_records_limit=self._limits.max_records if self._limits else None,
@@ -826,7 +826,7 @@ def _group_streams(
826826
partition_generator = StreamSlicerPartitionGenerator(
827827
DeclarativePartitionFactory(
828828
stream_name=declarative_stream.name,
829-
json_schema=declarative_stream.get_json_schema(),
829+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
830830
retriever=retriever,
831831
message_repository=self._message_repository,
832832
max_records_limit=self._limits.max_records if self._limits else None,

0 commit comments

Comments
 (0)