|
2 | 2 | # Copyright (c) 2023 Airbyte, Inc., all rights reserved. |
3 | 3 | # |
4 | 4 |
|
5 | | -import datetime |
6 | | -from dataclasses import InitVar, dataclass |
7 | | -from typing import Any, ClassVar, Iterable, Mapping, MutableMapping, Optional, Union |
| 5 | +from typing import Any, Mapping |
8 | 6 |
|
9 | | -from airbyte_cdk.models import SyncMode |
10 | | -from airbyte_cdk.sources.declarative.incremental import Cursor |
11 | | -from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState |
12 | | -from airbyte_cdk.sources.streams.core import Stream |
| 7 | +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream |
| 8 | +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString |
| 9 | +from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration |
| 10 | +from airbyte_cdk.sources.declarative.types import Config |
13 | 11 |
|
14 | 12 |
|
15 | | -@dataclass |
16 | | -class GreenHouseSlicer(Cursor): |
17 | | - parameters: InitVar[Mapping[str, Any]] |
18 | | - cursor_field: str |
19 | | - request_cursor_field: str |
| 13 | +class GreenhouseStateMigration(LegacyToPerPartitionStateMigration): |
| 14 | + declarative_stream: DeclarativeStream |
| 15 | + config: Config |
20 | 16 |
|
21 | | - START_DATETIME: ClassVar[str] = "1970-01-01T00:00:00.000Z" |
22 | | - DATETIME_FORMAT: ClassVar[str] = "%Y-%m-%dT%H:%M:%S.%fZ" |
| 17 | + def __init__(self, declarative_stream: DeclarativeStream, config: Config): |
| 18 | + self._partition_router = declarative_stream.retriever.partition_router |
| 19 | + self._cursor = declarative_stream.incremental_sync |
| 20 | + self._config = config |
| 21 | + self._parameters = declarative_stream.parameters |
| 22 | + self._partition_key_field = InterpolatedString.create( |
| 23 | + self._get_partition_field(self._partition_router), parameters=self._parameters |
| 24 | + ).eval(self._config) |
| 25 | + self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config) |
23 | 26 |
|
24 | | - def __post_init__(self, parameters: Mapping[str, Any]): |
25 | | - self._state = {} |
26 | | - |
27 | | - def stream_slices(self) -> Iterable[StreamSlice]: |
28 | | - yield StreamSlice(partition={}, cursor_slice={self.request_cursor_field: self._state.get(self.cursor_field, self.START_DATETIME)}) |
29 | | - |
30 | | - def _max_dt_str(self, *args: str) -> Optional[str]: |
31 | | - new_state_candidates = list(map(lambda x: datetime.datetime.strptime(x, self.DATETIME_FORMAT), filter(None, args))) |
32 | | - if not new_state_candidates: |
33 | | - return |
34 | | - max_dt = max(new_state_candidates) |
35 | | - # `.%f` gives us microseconds, we need milliseconds |
36 | | - (dt, micro) = max_dt.strftime(self.DATETIME_FORMAT).split(".") |
37 | | - return "%s.%03dZ" % (dt, int(micro[:-1:]) / 1000) |
38 | | - |
39 | | - def set_initial_state(self, stream_state: StreamState) -> None: |
40 | | - cursor_value = stream_state.get(self.cursor_field) |
41 | | - if cursor_value: |
42 | | - self._state[self.cursor_field] = cursor_value |
43 | | - |
44 | | - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: |
45 | | - stream_slice_value = stream_slice.get(self.cursor_field) |
46 | | - current_state = self._state.get(self.cursor_field) |
47 | | - record_cursor_value = most_recent_record and most_recent_record[self.cursor_field] |
48 | | - max_dt = self._max_dt_str(stream_slice_value, current_state, record_cursor_value) |
49 | | - if not max_dt: |
50 | | - return |
51 | | - self._state[self.cursor_field] = max_dt |
52 | | - |
53 | | - def should_be_synced(self, record: Record) -> bool: |
54 | | - """ |
55 | | - As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the |
56 | | - implementation is irrelevant for greenhouse |
| 27 | + def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: |
57 | 28 | """ |
58 | | - return True |
59 | | - |
60 | | - def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: |
| 29 | + LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer, |
| 30 | + which leads to partition mismatch. |
| 31 | + To prevent this type casting for partition key was added. |
61 | 32 | """ |
62 | | - Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice |
63 | | - """ |
64 | | - first_cursor_value = first.get(self.cursor_field, "") |
65 | | - second_cursor_value = second.get(self.cursor_field, "") |
66 | | - if first_cursor_value and second_cursor_value: |
67 | | - return first_cursor_value >= second_cursor_value |
68 | | - elif first_cursor_value: |
69 | | - return True |
70 | | - else: |
71 | | - return False |
72 | | - |
73 | | - def _parse_to_datetime(self, datetime_str: str) -> datetime.datetime: |
74 | | - return datetime.datetime.strptime(datetime_str, self.DATETIME_FORMAT) |
75 | | - |
76 | | - def get_stream_state(self) -> StreamState: |
77 | | - return self._state |
78 | | - |
79 | | - def get_request_params( |
80 | | - self, |
81 | | - *, |
82 | | - stream_state: Optional[StreamState] = None, |
83 | | - stream_slice: Optional[StreamSlice] = None, |
84 | | - next_page_token: Optional[Mapping[str, Any]] = None, |
85 | | - ) -> MutableMapping[str, Any]: |
86 | | - return stream_slice or {} |
87 | | - |
88 | | - def get_request_headers(self, *args, **kwargs) -> Mapping[str, Any]: |
89 | | - return {} |
90 | | - |
91 | | - def get_request_body_data(self, *args, **kwargs) -> Optional[Union[Mapping, str]]: |
92 | | - return {} |
93 | | - |
94 | | - def get_request_body_json(self, *args, **kwargs) -> Optional[Mapping]: |
95 | | - return {} |
96 | | - |
97 | | - |
98 | | -@dataclass |
99 | | -class GreenHouseSubstreamSlicer(GreenHouseSlicer): |
100 | | - parent_stream: Stream |
101 | | - stream_slice_field: str |
102 | | - parent_key: str |
103 | | - |
104 | | - def stream_slices(self) -> Iterable[StreamSlice]: |
105 | | - for parent_stream_slice in self.parent_stream.stream_slices( |
106 | | - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=self.get_stream_state() |
107 | | - ): |
108 | | - for parent_record in self.parent_stream.read_records( |
109 | | - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None |
110 | | - ): |
111 | | - parent_primary_key = parent_record.get(self.parent_key) |
112 | | - |
113 | | - partition = {self.stream_slice_field: parent_primary_key} |
114 | | - cursor_slice = { |
115 | | - self.request_cursor_field: self._state.get(str(parent_primary_key), {}).get(self.cursor_field, self.START_DATETIME) |
116 | | - } |
117 | | - |
118 | | - yield StreamSlice(partition=partition, cursor_slice=cursor_slice) |
119 | | - |
120 | | - def set_initial_state(self, stream_state: StreamState) -> None: |
121 | | - if self.stream_slice_field in stream_state: |
122 | | - return |
123 | | - substream_ids = map(lambda x: str(x), set(stream_state.keys()) | set(self._state.keys())) |
124 | | - for id_ in substream_ids: |
125 | | - self._state[id_] = { |
126 | | - self.cursor_field: self._max_dt_str( |
127 | | - stream_state.get(id_, {}).get(self.cursor_field), self._state.get(id_, {}).get(self.cursor_field) |
128 | | - ) |
129 | | - } |
130 | | - |
131 | | - def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: |
132 | | - if most_recent_record: |
133 | | - substream_id = str(stream_slice[self.stream_slice_field]) |
134 | | - current_state = self._state.get(substream_id, {}).get(self.cursor_field) |
135 | | - last_state = most_recent_record[self.cursor_field] |
136 | | - max_dt = self._max_dt_str(last_state, current_state) |
137 | | - self._state[substream_id] = {self.cursor_field: max_dt} |
138 | | - return |
139 | | - |
140 | | - def should_be_synced(self, record: Record) -> bool: |
141 | | - """ |
142 | | - As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the |
143 | | - implementation is irrelevant for greenhouse |
144 | | - """ |
145 | | - return True |
146 | | - |
147 | | - def get_request_params( |
148 | | - self, |
149 | | - *, |
150 | | - stream_state: Optional[StreamState] = None, |
151 | | - stream_slice: Optional[StreamSlice] = None, |
152 | | - next_page_token: Optional[Mapping[str, Any]] = None, |
153 | | - ) -> MutableMapping[str, Any]: |
154 | | - # ignore other fields in a slice |
155 | | - return {self.request_cursor_field: stream_slice[self.request_cursor_field]} |
| 33 | + states = [ |
| 34 | + {"partition": {self._partition_key_field: int(key), "parent_slice": {}}, "cursor": value} for key, value in stream_state.items() |
| 35 | + ] |
| 36 | + return {"states": states} |
0 commit comments