Skip to content

Commit 5399436

Browse files
author
maxime.c
committed
have bland stream be instantiated as DefaultStream
1 parent 689e792 commit 5399436

File tree

13 files changed

+174
-193
lines changed

13 files changed

+174
-193
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ def _group_streams(
209209
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
210210
# so we need to treat them as synchronous
211211

212+
if isinstance(declarative_stream, AbstractStream):
213+
concurrent_streams.append(declarative_stream)
214+
continue
215+
212216
supports_file_transfer = (
213217
isinstance(declarative_stream, DeclarativeStream)
214218
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
@@ -278,7 +282,7 @@ def _group_streams(
278282
partition_generator = StreamSlicerPartitionGenerator(
279283
partition_factory=DeclarativePartitionFactory(
280284
declarative_stream.name,
281-
declarative_stream.get_json_schema(),
285+
declarative_stream.schema_loader,
282286
retriever,
283287
self.message_repository,
284288
),
@@ -309,7 +313,7 @@ def _group_streams(
309313
partition_generator = StreamSlicerPartitionGenerator(
310314
partition_factory=DeclarativePartitionFactory(
311315
declarative_stream.name,
312-
declarative_stream.get_json_schema(),
316+
declarative_stream.schema_loader,
313317
retriever,
314318
self.message_repository,
315319
),
@@ -339,7 +343,7 @@ def _group_streams(
339343
partition_generator = StreamSlicerPartitionGenerator(
340344
DeclarativePartitionFactory(
341345
declarative_stream.name,
342-
declarative_stream.get_json_schema(),
346+
declarative_stream.schema_loader,
343347
declarative_stream.retriever,
344348
self.message_repository,
345349
),
@@ -399,7 +403,7 @@ def _group_streams(
399403
partition_generator = StreamSlicerPartitionGenerator(
400404
DeclarativePartitionFactory(
401405
declarative_stream.name,
402-
declarative_stream.get_json_schema(),
406+
declarative_stream.schema_loader,
403407
retriever,
404408
self.message_repository,
405409
),

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
1212

1313
import orjson
1414
import yaml
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
6767
from airbyte_cdk.sources.declarative.spec.spec import Spec
6868
from airbyte_cdk.sources.message import MessageRepository
69+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
6970
from airbyte_cdk.sources.streams.core import Stream
7071
from airbyte_cdk.sources.types import Config, ConnectionDefinition
7172
from airbyte_cdk.sources.utils.slice_logger import (
@@ -297,7 +298,12 @@ def connection_checker(self) -> ConnectionChecker:
297298
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
298299
)
299300

300-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
301+
def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:
302+
"""
303+
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304+
Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305+
fully decouple this from the AbstractSource.
306+
"""
301307
if self._spec_component:
302308
self._spec_component.validate_config(config)
303309

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@
598598
Rate,
599599
UnlimitedCallRatePolicy,
600600
)
601+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
601602
from airbyte_cdk.sources.streams.concurrent.clamping import (
602603
ClampingEndProvider,
603604
ClampingStrategy,
@@ -1920,8 +1921,8 @@ def create_datetime_based_cursor(
19201921
)
19211922

19221923
def create_declarative_stream(
1923-
self, model: DeclarativeStreamModel, config: Config, **kwargs: Any
1924-
) -> DeclarativeStream:
1924+
self, model: DeclarativeStreamModel, config: Config, is_parent=False, **kwargs: Any
1925+
) -> Union[DeclarativeStream, AbstractStream]:
19251926
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
19261927
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
19271928
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
@@ -2065,8 +2066,38 @@ def create_declarative_stream(
20652066
options["name"] = model.name
20662067
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20672068

2068-
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
2069+
if isinstance(combined_slicers, PartitionRouter) and not is_parent and not self._emit_connector_builder_messages:
2070+
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
2071+
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
2072+
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
2073+
# * Streams without partition router but with cursor
2074+
# * Streams with both partition router and cursor
2075+
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
2076+
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway
2077+
stream_name = model.name or ""
2078+
partition_generator = StreamSlicerPartitionGenerator(
2079+
DeclarativePartitionFactory(
2080+
stream_name,
2081+
schema_loader,
2082+
retriever,
2083+
self._message_repository,
2084+
),
2085+
combined_slicers,
2086+
)
2087+
FinalStateCursor(stream_name, None, self._message_repository)
2088+
return DefaultStream(
2089+
partition_generator=partition_generator,
2090+
name=stream_name,
2091+
json_schema=schema_loader.get_json_schema,
2092+
primary_key=get_primary_key_from_stream(primary_key),
2093+
cursor_field=None,
2094+
# FIXME we should have the cursor field has part of the interface of cursor
2095+
logger=logging.getLogger(f"airbyte.{stream_name}"),
2096+
# FIXME this is a breaking change compared to the old implementation,
2097+
cursor=FinalStateCursor(stream_name, None, self._message_repository),
2098+
)
20692099

2100+
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
20702101
if model.state_migrations:
20712102
state_transformations = [
20722103
self._create_component_from_model(state_migration, config, declarative_stream=model)
@@ -2094,7 +2125,7 @@ def _build_stream_slicer_from_partition_router(
20942125
],
20952126
config: Config,
20962127
stream_name: Optional[str] = None,
2097-
) -> Optional[PartitionRouter]:
2128+
) -> PartitionRouter:
20982129
if (
20992130
hasattr(model, "partition_router")
21002131
and isinstance(model, SimpleRetrieverModel | AsyncRetrieverModel)
@@ -2115,15 +2146,15 @@ def _build_stream_slicer_from_partition_router(
21152146
return self._create_component_from_model( # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
21162147
model=stream_slicer_model, config=config, stream_name=stream_name or ""
21172148
)
2118-
return None
2149+
return SinglePartitionRouter(parameters={})
21192150

21202151
def _build_incremental_cursor(
21212152
self,
21222153
model: DeclarativeStreamModel,
21232154
stream_slicer: Optional[PartitionRouter],
21242155
config: Config,
21252156
) -> Optional[StreamSlicer]:
2126-
if model.incremental_sync and stream_slicer:
2157+
if model.incremental_sync and (stream_slicer and not isinstance(stream_slicer, SinglePartitionRouter)):
21272158
if model.retriever.type == "AsyncRetriever":
21282159
stream_name = model.name or ""
21292160
stream_namespace = None
@@ -2871,7 +2902,7 @@ def create_parent_stream_config(
28712902
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
28722903
) -> ParentStreamConfig:
28732904
declarative_stream = self._create_component_from_model(
2874-
model.stream, config=config, **kwargs
2905+
model.stream, config=config, is_parent=True, **kwargs,
28752906
)
28762907
request_option = (
28772908
self._create_component_from_model(model.request_option, config=config)

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any, Iterable, Mapping, Optional
44

55
from airbyte_cdk.sources.declarative.retrievers import Retriever
6+
from airbyte_cdk.sources.declarative.schema import SchemaLoader
67
from airbyte_cdk.sources.message import MessageRepository
78
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
89
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
@@ -11,11 +12,23 @@
1112
from airbyte_cdk.utils.slice_hasher import SliceHasher
1213

1314

15+
class SchemaLoaderCachingDecorator(SchemaLoader):
16+
17+
def __init__(self, schema_loader: SchemaLoader):
18+
self._decorated = schema_loader
19+
self._loaded_schema = None
20+
21+
def get_json_schema(self) -> Mapping[str, Any]:
22+
if self._loaded_schema is None:
23+
self._loaded_schema = self._decorated.get_json_schema()
24+
return self._loaded_schema
25+
26+
1427
class DeclarativePartitionFactory:
1528
def __init__(
1629
self,
1730
stream_name: str,
18-
json_schema: Mapping[str, Any],
31+
schema_loader: SchemaLoader,
1932
retriever: Retriever,
2033
message_repository: MessageRepository,
2134
) -> None:
@@ -25,14 +38,14 @@ def __init__(
2538
In order to avoid these problems, we will create one retriever per thread which should make the processing thread-safe.
2639
"""
2740
self._stream_name = stream_name
28-
self._json_schema = json_schema
41+
self._schema_loader = SchemaLoaderCachingDecorator(schema_loader)
2942
self._retriever = retriever
3043
self._message_repository = message_repository
3144

3245
def create(self, stream_slice: StreamSlice) -> Partition:
3346
return DeclarativePartition(
3447
self._stream_name,
35-
self._json_schema,
48+
self._schema_loader,
3649
self._retriever,
3750
self._message_repository,
3851
stream_slice,
@@ -43,20 +56,20 @@ class DeclarativePartition(Partition):
4356
def __init__(
4457
self,
4558
stream_name: str,
46-
json_schema: Mapping[str, Any],
59+
schema_loader: SchemaLoader,
4760
retriever: Retriever,
4861
message_repository: MessageRepository,
4962
stream_slice: StreamSlice,
5063
):
5164
self._stream_name = stream_name
52-
self._json_schema = json_schema
65+
self._schema_loader = schema_loader
5366
self._retriever = retriever
5467
self._message_repository = message_repository
5568
self._stream_slice = stream_slice
5669
self._hash = SliceHasher.hash(self._stream_name, self._stream_slice)
5770

5871
def read(self) -> Iterable[Record]:
59-
for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):
72+
for stream_data in self._retriever.read_records(self._schema_loader.get_json_schema(), self._stream_slice):
6073
if isinstance(stream_data, Mapping):
6174
record = (
6275
stream_data

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import copy
66
import json
77
import logging
8-
from functools import lru_cache
9-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
8+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
109

1110
from typing_extensions import deprecated
1211

@@ -196,7 +195,6 @@ def cursor_field(self) -> Union[str, List[str]]:
196195
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
197196
return self._cursor
198197

199-
@lru_cache(maxsize=None)
200198
def get_json_schema(self) -> Mapping[str, Any]:
201199
return self._abstract_stream.get_json_schema()
202200

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from functools import lru_cache
65
from logging import Logger
7-
from typing import Any, Iterable, List, Mapping, Optional
6+
from typing import Any, Iterable, List, Mapping, Optional, Union, Callable
87

98
from airbyte_cdk.models import AirbyteStream, SyncMode
109
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
@@ -20,7 +19,7 @@ def __init__(
2019
self,
2120
partition_generator: PartitionGenerator,
2221
name: str,
23-
json_schema: Mapping[str, Any],
22+
json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]],
2423
primary_key: List[str],
2524
cursor_field: Optional[str],
2625
logger: Logger,
@@ -53,14 +52,15 @@ def namespace(self) -> Optional[str]:
5352
def cursor_field(self) -> Optional[str]:
5453
return self._cursor_field
5554

56-
@lru_cache(maxsize=None)
5755
def get_json_schema(self) -> Mapping[str, Any]:
56+
if isinstance(self._json_schema, Callable):
57+
return self._json_schema()
5858
return self._json_schema
5959

6060
def as_airbyte_stream(self) -> AirbyteStream:
6161
stream = AirbyteStream(
6262
name=self.name,
63-
json_schema=dict(self._json_schema),
63+
json_schema=dict(self.get_json_schema()),
6464
supported_sync_modes=[SyncMode.full_refresh],
6565
is_resumable=False,
6666
is_file_based=self._supports_file_transfer,

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ def test_config_update() -> None:
780780
"client_secret": "a client secret",
781781
"refresh_token": "a refresh token",
782782
}
783-
source = ManifestDeclarativeSource(source_config=manifest)
783+
source = ManifestDeclarativeSource(source_config=manifest, emit_connector_builder_messages=True)
784784

785785
refresh_request_response = {
786786
"access_token": "an updated access token",

0 commit comments

Comments
 (0)