Skip to content

Commit 1d5b468

Browse files
author
maxime.c
committed
remove
1 parent e4cbaaf commit 1d5b468

File tree

12 files changed

+4
-158
lines changed

12 files changed

+4
-158
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@
5252
from airbyte_cdk.sources.streams import Stream
5353
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
5454
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
55-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
56-
AlwaysAvailableAvailabilityStrategy,
57-
)
5855
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
5956
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
6057
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
@@ -325,7 +322,6 @@ def _group_streams(
325322
partition_generator=partition_generator,
326323
name=declarative_stream.name,
327324
json_schema=declarative_stream.get_json_schema(),
328-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
329325
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
330326
cursor_field=cursor.cursor_field.cursor_field_key
331327
if hasattr(cursor, "cursor_field")
@@ -362,7 +358,6 @@ def _group_streams(
362358
partition_generator=partition_generator,
363359
name=declarative_stream.name,
364360
json_schema=declarative_stream.get_json_schema(),
365-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
366361
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
367362
cursor_field=None,
368363
logger=self.logger,
@@ -417,7 +412,6 @@ def _group_streams(
417412
partition_generator=partition_generator,
418413
name=declarative_stream.name,
419414
json_schema=declarative_stream.get_json_schema(),
420-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
421415
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
422416
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
423417
logger=self.logger,
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from .abstract_file_based_availability_strategy import (
2-
AbstractFileBasedAvailabilityStrategy,
3-
AbstractFileBasedAvailabilityStrategyWrapper,
4-
)
1+
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
52
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
63

74
__all__ = [
85
"AbstractFileBasedAvailabilityStrategy",
9-
"AbstractFileBasedAvailabilityStrategyWrapper",
106
"DefaultFileBasedAvailabilityStrategy",
117
]

airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@
1010

1111
from airbyte_cdk.sources import Source
1212
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
13-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
14-
AbstractAvailabilityStrategy,
15-
StreamAvailability,
16-
StreamAvailable,
17-
StreamUnavailable,
18-
)
1913
from airbyte_cdk.sources.streams.core import Stream
2014

2115
if TYPE_CHECKING:
@@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas
2822
self,
2923
stream: Stream,
3024
logger: logging.Logger,
31-
_: Optional[Source],
25+
source: Optional[Source] = None,
3226
) -> Tuple[bool, Optional[str]]:
3327
"""
3428
Perform a connection check for the stream.
@@ -51,23 +45,3 @@ def check_availability_and_parsability(
5145
Returns (True, None) if successful, otherwise (False, <error message>).
5246
"""
5347
...
54-
55-
56-
class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
57-
def __init__(self, stream: AbstractFileBasedStream) -> None:
58-
self.stream = stream
59-
60-
def check_availability(self, logger: logging.Logger) -> StreamAvailability:
61-
is_available, reason = self.stream.availability_strategy.check_availability(
62-
self.stream, logger, None
63-
)
64-
if is_available:
65-
return StreamAvailable()
66-
return StreamUnavailable(reason or "")
67-
68-
def check_availability_and_parsability(
69-
self, logger: logging.Logger
70-
) -> Tuple[bool, Optional[str]]:
71-
return self.stream.availability_strategy.check_availability_and_parsability(
72-
self.stream, logger, None
73-
)

airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
179179
)
180180

181181
@cached_property
182-
@deprecated("Deprecated as of CDK version 3.7.0.")
183182
def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
184183
return self._availability_strategy
185184

airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
)
2020
from airbyte_cdk.sources import AbstractSource
2121
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
22-
from airbyte_cdk.sources.file_based.availability_strategy import (
23-
AbstractFileBasedAvailabilityStrategy,
24-
AbstractFileBasedAvailabilityStrategyWrapper,
25-
)
22+
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
2623
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
2724
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
2825
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -97,7 +94,6 @@ def create_from_stream(
9794
),
9895
name=stream.name,
9996
json_schema=stream.get_json_schema(),
100-
availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
10197
primary_key=pk,
10298
cursor_field=cursor_field,
10399
logger=logger,

airbyte_cdk/sources/streams/availability_strategy.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources import Source
1515

1616

17+
# FIXME this
1718
class AvailabilityStrategy(ABC):
1819
"""
1920
Abstract base class for checking stream availability.

airbyte_cdk/sources/streams/concurrent/abstract_stream.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from airbyte_cdk.models import AirbyteStream
1111
from airbyte_cdk.sources.source import ExperimentalClassWarning
12-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
1312
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1413
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1514

@@ -64,12 +63,6 @@ def cursor_field(self) -> Optional[str]:
6463
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
6564
"""
6665

67-
@abstractmethod
68-
def check_availability(self) -> StreamAvailability:
69-
"""
70-
:return: The stream's availability
71-
"""
72-
7366
@abstractmethod
7467
def get_json_schema(self) -> Mapping[str, Any]:
7568
"""

airbyte_cdk/sources/streams/concurrent/adapters.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,7 @@
2424
from airbyte_cdk.sources.message import MessageRepository
2525
from airbyte_cdk.sources.source import ExperimentalClassWarning
2626
from airbyte_cdk.sources.streams import Stream
27-
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
2827
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
29-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
30-
AbstractAvailabilityStrategy,
31-
AlwaysAvailableAvailabilityStrategy,
32-
)
3328
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
3429
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
3530
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
@@ -101,7 +96,6 @@ def create_from_stream(
10196
name=stream.name,
10297
namespace=stream.namespace,
10398
json_schema=stream.get_json_schema(),
104-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
10599
primary_key=pk,
106100
cursor_field=cursor_field,
107101
logger=logger,
@@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
210204
def supports_incremental(self) -> bool:
211205
return self._legacy_stream.supports_incremental
212206

213-
def check_availability(
214-
self, logger: logging.Logger, source: Optional["Source"] = None
215-
) -> Tuple[bool, Optional[str]]:
216-
"""
217-
Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
218-
:param logger: (ignored)
219-
:param source: (ignored)
220-
:return:
221-
"""
222-
availability = self._abstract_stream.check_availability()
223-
return availability.is_available(), availability.message()
224-
225207
def as_airbyte_stream(self) -> AirbyteStream:
226208
return self._abstract_stream.as_airbyte_stream()
227209

@@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]:
370352
self._cursor_field,
371353
self._state,
372354
)
373-
374-
375-
@deprecated(
376-
"Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
377-
category=ExperimentalClassWarning,
378-
)
379-
class AvailabilityStrategyFacade(AvailabilityStrategy):
380-
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
381-
self._abstract_availability_strategy = abstract_availability_strategy
382-
383-
def check_availability(
384-
self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
385-
) -> Tuple[bool, Optional[str]]:
386-
"""
387-
Checks stream availability.
388-
389-
Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
390-
391-
:param stream: (unused)
392-
:param logger: logger object to use
393-
:param source: (unused)
394-
:return: A tuple of (boolean, str). If boolean is true, then the stream
395-
"""
396-
stream_availability = self._abstract_availability_strategy.check_availability(logger)
397-
return stream_availability.is_available(), stream_availability.message()

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88

99
from airbyte_cdk.models import AirbyteStream, SyncMode
1010
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
11-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
12-
AbstractAvailabilityStrategy,
13-
StreamAvailability,
14-
)
1511
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
1612
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
1713
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
@@ -23,7 +19,6 @@ def __init__(
2319
partition_generator: PartitionGenerator,
2420
name: str,
2521
json_schema: Mapping[str, Any],
26-
availability_strategy: AbstractAvailabilityStrategy,
2722
primary_key: List[str],
2823
cursor_field: Optional[str],
2924
logger: Logger,
@@ -34,7 +29,6 @@ def __init__(
3429
self._stream_partition_generator = partition_generator
3530
self._name = name
3631
self._json_schema = json_schema
37-
self._availability_strategy = availability_strategy
3832
self._primary_key = primary_key
3933
self._cursor_field = cursor_field
4034
self._logger = logger
@@ -53,9 +47,6 @@ def name(self) -> str:
5347
def namespace(self) -> Optional[str]:
5448
return self._namespace
5549

56-
def check_availability(self) -> StreamAvailability:
57-
return self._availability_strategy.check_availability(self._logger)
58-
5950
@property
6051
def cursor_field(self) -> Optional[str]:
6152
return self._cursor_field

unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
import logging
66

77
from airbyte_cdk.sources.message import InMemoryMessageRepository
8-
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
9-
AlwaysAvailableAvailabilityStrategy,
10-
)
118
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
129
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
1310
from airbyte_cdk.sources.types import Record
@@ -48,7 +45,6 @@
4845
"id": {"type": ["null", "string"]},
4946
},
5047
},
51-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
5248
primary_key=[],
5349
cursor_field=None,
5450
logger=logging.getLogger("test_logger"),
@@ -84,7 +80,6 @@
8480
"id": {"type": ["null", "string"]},
8581
},
8682
},
87-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
8883
primary_key=[],
8984
cursor_field=None,
9085
logger=logging.getLogger("test_logger"),
@@ -120,7 +115,6 @@
120115
"id": {"type": ["null", "string"]},
121116
},
122117
},
123-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
124118
primary_key=["id"],
125119
cursor_field=None,
126120
logger=logging.getLogger("test_logger"),
@@ -171,7 +165,6 @@
171165
"id": {"type": ["null", "string"]},
172166
},
173167
},
174-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
175168
primary_key=[],
176169
cursor_field=None,
177170
logger=logging.getLogger("test_logger"),
@@ -222,7 +215,6 @@
222215
"id": {"type": ["null", "string"]},
223216
},
224217
},
225-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
226218
primary_key=[],
227219
cursor_field=None,
228220
logger=logging.getLogger("test_logger"),
@@ -255,7 +247,6 @@
255247
"id": {"type": ["null", "string"]},
256248
},
257249
},
258-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
259250
primary_key=[],
260251
cursor_field=None,
261252
logger=logging.getLogger("test_logger"),
@@ -397,7 +388,6 @@
397388
"key": {"type": ["null", "string"]},
398389
},
399390
},
400-
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
401391
primary_key=[],
402392
cursor_field=None,
403393
logger=logging.getLogger("test_logger"),

0 commit comments

Comments
 (0)