Skip to content

Commit 00f8d97

Browse files
fix(concurrent_declarative_source): add default value to optional state parameter
- Add default value of None to state parameter in ConcurrentDeclarativeSource.__init__() - Remove Generic[TState] from class definition as it adds no meaningful value - Update all type annotations throughout codebase to use concrete Optional[List[AirbyteStateMessage]] type - Fix test parameter order to match updated constructor signature - Resolves breaking change introduced in PR #704 where Optional state parameter lacked default value Fixes: #704 Co-Authored-By: AJ Steers <[email protected]>
1 parent e4b34b6 commit 00f8d97

File tree

9 files changed

+15
-17
lines changed

9 files changed

+15
-17
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def create_source(
6565
limits: TestLimits,
6666
catalog: Optional[ConfiguredAirbyteCatalog],
6767
state: Optional[List[AirbyteStateMessage]],
68-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
68+
) -> ConcurrentDeclarativeSource:
6969
manifest = config["__injected_declarative_manifest"]
7070

7171
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
@@ -88,7 +88,7 @@ def create_source(
8888

8989

9090
def read_stream(
91-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
91+
source: ConcurrentDeclarativeSource,
9292
config: Mapping[str, Any],
9393
configured_catalog: ConfiguredAirbyteCatalog,
9494
state: List[AirbyteStateMessage],
@@ -127,7 +127,7 @@ def read_stream(
127127

128128

129129
def resolve_manifest(
130-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
130+
source: ConcurrentDeclarativeSource,
131131
) -> AirbyteMessage:
132132
try:
133133
return AirbyteMessage(
@@ -146,7 +146,7 @@ def resolve_manifest(
146146

147147

148148
def full_resolve_manifest(
149-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits
149+
source: ConcurrentDeclarativeSource, limits: TestLimits
150150
) -> AirbyteMessage:
151151
try:
152152
manifest = {**source.resolved_manifest}

airbyte_cdk/connector_builder/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def get_config_and_catalog_from_args(
7070

7171

7272
def handle_connector_builder_request(
73-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
73+
source: ConcurrentDeclarativeSource,
7474
command: str,
7575
config: Mapping[str, Any],
7676
catalog: Optional[ConfiguredAirbyteCatalog],

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(
8585

8686
def run_test_read(
8787
self,
88-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
88+
source: ConcurrentDeclarativeSource,
8989
config: Mapping[str, Any],
9090
configured_catalog: ConfiguredAirbyteCatalog,
9191
stream_name: str,
@@ -383,7 +383,7 @@ def _get_latest_config_update(
383383

384384
def _read_stream(
385385
self,
386-
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
386+
source: ConcurrentDeclarativeSource,
387387
config: Mapping[str, Any],
388388
configured_catalog: ConfiguredAirbyteCatalog,
389389
state: List[AirbyteStateMessage],

airbyte_cdk/manifest_server/command_processor/processor.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121

2222

2323
class ManifestCommandProcessor:
24-
_source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
24+
_source: ConcurrentDeclarativeSource
2525
_logger = logging.getLogger("airbyte.manifest-server")
2626

27-
def __init__(
28-
self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
29-
) -> None:
27+
def __init__(self, source: ConcurrentDeclarativeSource) -> None:
3028
self._source = source
3129

3230
def test_read(

airbyte_cdk/manifest_server/command_processor/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def build_source(
6363
record_limit: Optional[int] = None,
6464
page_limit: Optional[int] = None,
6565
slice_limit: Optional[int] = None,
66-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
66+
) -> ConcurrentDeclarativeSource:
6767
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
6868
# to retain ordering for the grouping of the builder message responses.
6969
definition = copy.deepcopy(manifest)

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def safe_build_source(
4040
page_limit: Optional[int] = None,
4141
slice_limit: Optional[int] = None,
4242
record_limit: Optional[int] = None,
43-
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
43+
) -> ConcurrentDeclarativeSource:
4444
"""Wrapper around build_source that converts ValidationError to HTTPException."""
4545
try:
4646
return build_source(

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def _get_declarative_component_schema() -> Dict[str, Any]:
162162
# is no longer inherited from since the only external dependency is from that class.
163163
#
164164
# todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]]
165-
class ConcurrentDeclarativeSource(AbstractSource, Generic[TState]):
165+
class ConcurrentDeclarativeSource(AbstractSource):
166166
# By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
167167
# because it has hit the limit of futures but not partition reader is consuming them.
168168
_LOWEST_SAFE_CONCURRENCY_LEVEL = 2
@@ -171,8 +171,8 @@ def __init__(
171171
self,
172172
catalog: Optional[ConfiguredAirbyteCatalog],
173173
config: Optional[Mapping[str, Any]],
174-
state: TState,
175174
source_config: ConnectionDefinition,
175+
state: Optional[List[AirbyteStateMessage]] = None,
176176
debug: bool = False,
177177
emit_connector_builder_messages: bool = False,
178178
migrate_manifest: bool = False,

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from airbyte_cdk.sources.types import ConnectionDefinition
1515

1616

17-
class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
17+
class YamlDeclarativeSource(ConcurrentDeclarativeSource):
1818
"""Declarative source defined by a yaml file"""
1919

2020
def __init__(

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2219,7 +2219,7 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess
22192219
)
22202220
config = {}
22212221
state = {}
2222-
source = ConcurrentDeclarativeSource(catalog, config, state, manifest)
2222+
source = ConcurrentDeclarativeSource(catalog, config, manifest, state)
22232223
return list(source.read(logger, {}, catalog, state))
22242224

22252225

0 commit comments

Comments
 (0)