|
46 | 46 | from airbyte_cdk.sources.source import TState |
47 | 47 | from airbyte_cdk.sources.streams import Stream |
48 | 48 | from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream |
| 49 | +from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade |
49 | 50 | from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( |
50 | 51 | AlwaysAvailableAvailabilityStrategy, |
51 | 52 | ) |
52 | 53 | from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor |
53 | 54 | from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream |
54 | 55 | from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream |
55 | | -from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade |
56 | 56 |
|
57 | 57 |
|
58 | 58 | class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]): |
@@ -381,7 +381,10 @@ def _group_streams( |
381 | 381 | # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. |
382 | 382 | # Condition below needs to ensure that concurrent support is not lost for sources that already support |
383 | 383 | # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). |
384 | | - elif isinstance(declarative_stream, AbstractStreamFacade) and self.is_partially_declarative: |
| 384 | + elif ( |
| 385 | + isinstance(declarative_stream, AbstractStreamFacade) |
| 386 | + and self.is_partially_declarative |
| 387 | + ): |
385 | 388 | concurrent_streams.append(declarative_stream.get_underlying_stream()) |
386 | 389 | else: |
387 | 390 | synchronous_streams.append(declarative_stream) |
|
0 commit comments