Skip to content

Commit eb824f4

Browse files
committed
Merge branch 'main' into revert-705-revert_concurrent_changes
2 parents f73324d + 02246dc commit eb824f4

File tree

16 files changed

+324
-462
lines changed

16 files changed

+324
-462
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ def run_test_read(
120120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
121121

122122
schema_inferrer = SchemaInferrer(
123-
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
123+
self._pk_to_nested_and_composite_field(
124+
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
125+
)
126+
if stream
127+
else None,
124128
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125129
if stream
126130
else None,

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,18 @@
55
import logging
66
from dataclasses import dataclass, field
77
from queue import Queue
8-
from typing import Any, ClassVar, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
8+
from typing import (
9+
Any,
10+
ClassVar,
11+
Generic,
12+
Iterator,
13+
List,
14+
Mapping,
15+
MutableMapping,
16+
Optional,
17+
Tuple,
18+
Union,
19+
)
920

1021
from airbyte_protocol_dataclasses.models import Level
1122

@@ -19,10 +30,6 @@
1930
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
2031
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
2132
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
22-
from airbyte_cdk.sources.declarative.extractors import RecordSelector
23-
from airbyte_cdk.sources.declarative.extractors.record_filter import (
24-
ClientSideIncrementalRecordFilterDecorator,
25-
)
2633
from airbyte_cdk.sources.declarative.incremental import (
2734
ConcurrentPerPartitionCursor,
2835
GlobalSubstreamCursor,
@@ -32,7 +39,6 @@
3239
PerPartitionWithGlobalCursor,
3340
)
3441
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
35-
from airbyte_cdk.sources.declarative.models import FileUploader
3642
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3743
ConcurrencyLevel as ConcurrencyLevelModel,
3844
)
@@ -117,7 +123,6 @@ def __init__(
117123
# incremental streams running in full refresh.
118124
component_factory = ModelToComponentFactory(
119125
emit_connector_builder_messages=emit_connector_builder_messages,
120-
disable_resumable_full_refresh=True,
121126
message_repository=ConcurrentMessageRepository(queue, message_repository),
122127
connector_state_manager=self._connector_state_manager,
123128
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
@@ -223,7 +228,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
223228
]
224229
)
225230

226-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
231+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
227232
"""
228233
The `streams` method is used as part of the AbstractSource in the following cases:
229234
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
@@ -253,6 +258,10 @@ def _group_streams(
253258
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
254259
# so we need to treat them as synchronous
255260

261+
if isinstance(declarative_stream, AbstractStream):
262+
concurrent_streams.append(declarative_stream)
263+
continue
264+
256265
supports_file_transfer = (
257266
isinstance(declarative_stream, DeclarativeStream)
258267
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
@@ -322,7 +331,7 @@ def _group_streams(
322331
partition_generator = StreamSlicerPartitionGenerator(
323332
partition_factory=DeclarativePartitionFactory(
324333
stream_name=declarative_stream.name,
325-
json_schema=declarative_stream.get_json_schema(),
334+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
326335
retriever=retriever,
327336
message_repository=self.message_repository,
328337
max_records_limit=self._limits.max_records
@@ -359,7 +368,7 @@ def _group_streams(
359368
partition_generator = StreamSlicerPartitionGenerator(
360369
partition_factory=DeclarativePartitionFactory(
361370
stream_name=declarative_stream.name,
362-
json_schema=declarative_stream.get_json_schema(),
371+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
363372
retriever=retriever,
364373
message_repository=self.message_repository,
365374
max_records_limit=self._limits.max_records
@@ -393,7 +402,7 @@ def _group_streams(
393402
partition_generator = StreamSlicerPartitionGenerator(
394403
DeclarativePartitionFactory(
395404
stream_name=declarative_stream.name,
396-
json_schema=declarative_stream.get_json_schema(),
405+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
397406
retriever=declarative_stream.retriever,
398407
message_repository=self.message_repository,
399408
max_records_limit=self._limits.max_records if self._limits else None,
@@ -457,7 +466,7 @@ def _group_streams(
457466
partition_generator = StreamSlicerPartitionGenerator(
458467
DeclarativePartitionFactory(
459468
stream_name=declarative_stream.name,
460-
json_schema=declarative_stream.get_json_schema(),
469+
schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish
461470
retriever=retriever,
462471
message_repository=self.message_repository,
463472
max_records_limit=self._limits.max_records if self._limits else None,

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
1212

1313
import orjson
1414
import yaml
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
6767
from airbyte_cdk.sources.declarative.spec.spec import Spec
6868
from airbyte_cdk.sources.message import MessageRepository
69+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
6970
from airbyte_cdk.sources.streams.core import Stream
7071
from airbyte_cdk.sources.types import Config, ConnectionDefinition
7172
from airbyte_cdk.sources.utils.slice_logger import (
@@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
297298
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
298299
)
299300

300-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
301+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302+
"""
303+
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304+
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305+
fully decouple this from the AbstractSource.
306+
"""
301307
if self._spec_component:
302308
self._spec_component.validate_config(config)
303309

0 commit comments

Comments
 (0)