Skip to content

Commit 0a58453

Browse files
committed
pr feedback
1 parent 9705b8c commit 0a58453

File tree

2 files changed

+10
-42
lines changed

2 files changed

+10
-42
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ def _get_declarative_component_schema() -> Dict[str, Any]:
160160

161161
# todo: AbstractSource can be removed once we've completely moved off all legacy synchronous CDK code paths
162162
# and replaced with implementing the source.py:Source class
163+
#
164+
# todo: The `ConcurrentDeclarativeSource.message_repository()` method can also be removed once AbstractSource
165+
# is no longer inherited from since the only external dependency is from that class.
163166
class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]):
164167
# By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
165168
# because it has hit the limit of futures but not partition reader is consuming them.
@@ -273,7 +276,7 @@ def __init__(
273276
logger=self.logger,
274277
slice_logger=self._slice_logger,
275278
queue=queue,
276-
message_repository=self.message_repository,
279+
message_repository=self._message_repository,
277280
)
278281

279282
def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
@@ -394,6 +397,7 @@ def resolved_manifest(self) -> Mapping[str, Any]:
394397
"""
395398
return self._source_config
396399

400+
# TODO: Deprecate this class once ConcurrentDeclarativeSource no longer inherits AbstractSource
397401
@property
398402
def message_repository(self) -> MessageRepository:
399403
return self._message_repository
@@ -414,8 +418,6 @@ def read(
414418
catalog: ConfiguredAirbyteCatalog,
415419
state: Optional[List[AirbyteStateMessage]] = None,
416420
) -> Iterator[AirbyteMessage]:
417-
self._configure_logger_level(logger)
418-
419421
concurrent_streams, _ = self._group_streams(config=config)
420422

421423
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
@@ -470,13 +472,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
470472
if self._spec_component:
471473
self._spec_component.validate_config(config)
472474

473-
self._emit_manifest_debug_message(
474-
extra_args={
475-
"source_name": self.name,
476-
"parsed_config": json.dumps(self._source_config),
477-
}
478-
)
479-
480475
stream_configs = (
481476
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
482477
)
@@ -560,20 +555,11 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
560555
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
561556
in the project root.
562557
"""
563-
self._configure_logger_level(logger)
564-
self._emit_manifest_debug_message(
565-
extra_args={
566-
"source_name": self.name,
567-
"parsed_config": json.dumps(self._source_config),
568-
}
569-
)
570-
571558
return (
572559
self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
573560
)
574561

575562
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
576-
self._configure_logger_level(logger)
577563
return super().check(logger, config)
578564

579565
def check_connection(
@@ -707,7 +693,7 @@ def _group_streams(
707693
stream_name=declarative_stream.name,
708694
json_schema=declarative_stream.get_json_schema(),
709695
retriever=retriever,
710-
message_repository=self.message_repository,
696+
message_repository=self._message_repository,
711697
max_records_limit=self._limits.max_records
712698
if self._limits
713699
else None,
@@ -744,7 +730,7 @@ def _group_streams(
744730
stream_name=declarative_stream.name,
745731
json_schema=declarative_stream.get_json_schema(),
746732
retriever=retriever,
747-
message_repository=self.message_repository,
733+
message_repository=self._message_repository,
748734
max_records_limit=self._limits.max_records
749735
if self._limits
750736
else None,
@@ -778,7 +764,7 @@ def _group_streams(
778764
stream_name=declarative_stream.name,
779765
json_schema=declarative_stream.get_json_schema(),
780766
retriever=declarative_stream.retriever,
781-
message_repository=self.message_repository,
767+
message_repository=self._message_repository,
782768
max_records_limit=self._limits.max_records if self._limits else None,
783769
),
784770
declarative_stream.retriever.stream_slicer,
@@ -790,7 +776,7 @@ def _group_streams(
790776
final_state_cursor = FinalStateCursor(
791777
stream_name=declarative_stream.name,
792778
stream_namespace=declarative_stream.namespace,
793-
message_repository=self.message_repository,
779+
message_repository=self._message_repository,
794780
)
795781

796782
concurrent_streams.append(
@@ -842,7 +828,7 @@ def _group_streams(
842828
stream_name=declarative_stream.name,
843829
json_schema=declarative_stream.get_json_schema(),
844830
retriever=retriever,
845-
message_repository=self.message_repository,
831+
message_repository=self._message_repository,
846832
max_records_limit=self._limits.max_records if self._limits else None,
847833
),
848834
perpartition_cursor,
@@ -1067,13 +1053,3 @@ def _migrate_state(
10671053
stream_state = dict(state_migration.migrate(stream_state))
10681054

10691055
return stream_state
1070-
1071-
def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
1072-
self.logger.debug("declarative source created from manifest", extra=extra_args)
1073-
1074-
def _configure_logger_level(self, logger: logging.Logger) -> None:
1075-
"""
1076-
Set the log level to logging.DEBUG if debug mode is enabled
1077-
"""
1078-
if self._debug:
1079-
logger.setLevel(logging.DEBUG)

unit_tests/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
3-
# THIS STOPS SOME MODELS TESTS FROM FALLING OVER. IT'S A HACK, WE SHOULD PIN DOWN WHAT'S ACTUALLY GOING ON HERE
4-
5-
# Import the thing that needs to be imported to stop the tests from falling over
6-
# from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
7-
8-
# "Use" the thing so that the linter doesn't complain
9-
# placeholder = ManifestDeclarativeSource

0 commit comments

Comments
 (0)