-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathasync_retriever.py
More file actions
101 lines (80 loc) · 3.45 KB
/
async_retriever.py
File metadata and controls
101 lines (80 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional
from typing_extensions import deprecated
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
AsyncJobPartitionRouter,
)
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
@deprecated(
"This class is experimental. Use at your own risk.",
category=ExperimentalClassWarning,
)
@dataclass
class AsyncRetriever(Retriever):
config: Config
parameters: InitVar[Mapping[str, Any]]
record_selector: RecordSelector
stream_slicer: AsyncJobPartitionRouter
slice_logger: AlwaysLogSliceLogger = field(
init=False,
default_factory=lambda: AlwaysLogSliceLogger(),
)
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
@property
def state(self) -> StreamState:
"""
As a first iteration for sendgrid, there is no state to be managed
"""
return {}
@state.setter
def state(self, value: StreamState) -> None:
"""
As a first iteration for sendgrid, there is no state to be managed
"""
pass
def _get_stream_state(self) -> StreamState:
"""
Gets the current state of the stream.
Returns:
StreamState: Mapping[str, Any]
"""
return self.state
def _validate_and_get_stream_slice_jobs(
self, stream_slice: Optional[StreamSlice] = None
) -> Iterable[AsyncJob]:
"""
Validates the stream_slice argument and returns the partition from it.
Args:
stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
Returns:
AsyncPartition: The partition extracted from the stream_slice.
Raises:
AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
"""
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
yield from self.stream_slicer.stream_slices()
def read_records(
self,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
) -> Iterable[StreamData]:
# emit the slice_descriptor log message, for connector builder TestRead
yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore
stream_state: StreamState = self._get_stream_state()
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
yield from self.record_selector.filter_and_transform(
all_data=records,
stream_state=stream_state,
records_schema=records_schema,
stream_slice=stream_slice,
)