-
Notifications
You must be signed in to change notification settings - Fork 5.1k
fix(source-greenhouse): remove custom cursors #54702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
509f134
fce6712
402b385
cb8cfce
d371fe7
7664473
82618e6
f53a74b
65456ae
a546642
bb9bdb9
cf7c54b
6bea957
fb4e711
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,154 +2,35 @@ | |
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| import datetime | ||
| from dataclasses import InitVar, dataclass | ||
| from typing import Any, ClassVar, Iterable, Mapping, MutableMapping, Optional, Union | ||
| from typing import Any, Mapping | ||
|
|
||
| from airbyte_cdk.models import SyncMode | ||
| from airbyte_cdk.sources.declarative.incremental import Cursor | ||
| from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState | ||
| from airbyte_cdk.sources.streams.core import Stream | ||
| from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream | ||
| from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
| from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration | ||
| from airbyte_cdk.sources.declarative.types import Config | ||
|
|
||
|
|
||
| @dataclass | ||
| class GreenHouseSlicer(Cursor): | ||
| parameters: InitVar[Mapping[str, Any]] | ||
| cursor_field: str | ||
| request_cursor_field: str | ||
| class GreenhouseStateMigration(LegacyToPerPartitionStateMigration): | ||
| declarative_stream: DeclarativeStream | ||
| config: Config | ||
|
|
||
| START_DATETIME: ClassVar[str] = "1970-01-01T00:00:00.000Z" | ||
| DATETIME_FORMAT: ClassVar[str] = "%Y-%m-%dT%H:%M:%S.%fZ" | ||
| def __init__(self, declarative_stream: DeclarativeStream, config: Config): | ||
| self._partition_router = declarative_stream.retriever.partition_router | ||
| self._cursor = declarative_stream.incremental_sync | ||
| self._config = config | ||
| self._parameters = declarative_stream.parameters | ||
| self._partition_key_field = InterpolatedString.create( | ||
| self._get_partition_field(self._partition_router), parameters=self._parameters | ||
| ).eval(self._config) | ||
| self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config) | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]): | ||
| self._state = {} | ||
|
|
||
| def stream_slices(self) -> Iterable[StreamSlice]: | ||
| yield StreamSlice(partition={}, cursor_slice={self.request_cursor_field: self._state.get(self.cursor_field, self.START_DATETIME)}) | ||
|
|
||
| def _max_dt_str(self, *args: str) -> Optional[str]: | ||
| new_state_candidates = list(map(lambda x: datetime.datetime.strptime(x, self.DATETIME_FORMAT), filter(None, args))) | ||
| if not new_state_candidates: | ||
| return | ||
| max_dt = max(new_state_candidates) | ||
| # `.%f` gives us microseconds, we need milliseconds | ||
| (dt, micro) = max_dt.strftime(self.DATETIME_FORMAT).split(".") | ||
| return "%s.%03dZ" % (dt, int(micro[:-1:]) / 1000) | ||
|
|
||
| def set_initial_state(self, stream_state: StreamState) -> None: | ||
| cursor_value = stream_state.get(self.cursor_field) | ||
| if cursor_value: | ||
| self._state[self.cursor_field] = cursor_value | ||
|
|
||
| def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: | ||
| stream_slice_value = stream_slice.get(self.cursor_field) | ||
| current_state = self._state.get(self.cursor_field) | ||
| record_cursor_value = most_recent_record and most_recent_record[self.cursor_field] | ||
| max_dt = self._max_dt_str(stream_slice_value, current_state, record_cursor_value) | ||
| if not max_dt: | ||
| return | ||
| self._state[self.cursor_field] = max_dt | ||
|
|
||
| def should_be_synced(self, record: Record) -> bool: | ||
| """ | ||
| As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the | ||
| implementation is irrelevant for greenhouse | ||
| def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: | ||
| """ | ||
| return True | ||
|
|
||
| def is_greater_than_or_equal(self, first: Record, second: Record) -> bool: | ||
| LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer, | ||
| which leads to partition mismatch. | ||
| To prevent this type casting for partition key was added. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for adding the comment for why we needed the custom component Can you explain to me a little more about why this is needed. I think the explanation mostly makes sense, but I noticed that in the previous state format, it was stored as a string being the key, followed by that partition's state value. Why didn't run into the state key string vs. greenhouse id string in the old state format when we were using strings? Not strictly a blocker, but it just seems interesting that we now have to convert state to integers when strings used to work
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was working before because custom StreamSlicer overrides
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that makes things more clear. I missed the part where the old custom component was casting. thank you |
||
| """ | ||
| Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice | ||
| """ | ||
| first_cursor_value = first.get(self.cursor_field, "") | ||
| second_cursor_value = second.get(self.cursor_field, "") | ||
| if first_cursor_value and second_cursor_value: | ||
| return first_cursor_value >= second_cursor_value | ||
| elif first_cursor_value: | ||
| return True | ||
| else: | ||
| return False | ||
|
|
||
| def _parse_to_datetime(self, datetime_str: str) -> datetime.datetime: | ||
| return datetime.datetime.strptime(datetime_str, self.DATETIME_FORMAT) | ||
|
|
||
| def get_stream_state(self) -> StreamState: | ||
| return self._state | ||
|
|
||
| def get_request_params( | ||
| self, | ||
| *, | ||
| stream_state: Optional[StreamState] = None, | ||
| stream_slice: Optional[StreamSlice] = None, | ||
| next_page_token: Optional[Mapping[str, Any]] = None, | ||
| ) -> MutableMapping[str, Any]: | ||
| return stream_slice or {} | ||
|
|
||
| def get_request_headers(self, *args, **kwargs) -> Mapping[str, Any]: | ||
| return {} | ||
|
|
||
| def get_request_body_data(self, *args, **kwargs) -> Optional[Union[Mapping, str]]: | ||
| return {} | ||
|
|
||
| def get_request_body_json(self, *args, **kwargs) -> Optional[Mapping]: | ||
| return {} | ||
|
|
||
|
|
||
| @dataclass | ||
| class GreenHouseSubstreamSlicer(GreenHouseSlicer): | ||
| parent_stream: Stream | ||
| stream_slice_field: str | ||
| parent_key: str | ||
|
|
||
| def stream_slices(self) -> Iterable[StreamSlice]: | ||
| for parent_stream_slice in self.parent_stream.stream_slices( | ||
| sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=self.get_stream_state() | ||
| ): | ||
| for parent_record in self.parent_stream.read_records( | ||
| sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None | ||
| ): | ||
| parent_primary_key = parent_record.get(self.parent_key) | ||
|
|
||
| partition = {self.stream_slice_field: parent_primary_key} | ||
| cursor_slice = { | ||
| self.request_cursor_field: self._state.get(str(parent_primary_key), {}).get(self.cursor_field, self.START_DATETIME) | ||
| } | ||
|
|
||
| yield StreamSlice(partition=partition, cursor_slice=cursor_slice) | ||
|
|
||
| def set_initial_state(self, stream_state: StreamState) -> None: | ||
| if self.stream_slice_field in stream_state: | ||
| return | ||
| substream_ids = map(lambda x: str(x), set(stream_state.keys()) | set(self._state.keys())) | ||
| for id_ in substream_ids: | ||
| self._state[id_] = { | ||
| self.cursor_field: self._max_dt_str( | ||
| stream_state.get(id_, {}).get(self.cursor_field), self._state.get(id_, {}).get(self.cursor_field) | ||
| ) | ||
| } | ||
|
|
||
| def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None: | ||
| if most_recent_record: | ||
| substream_id = str(stream_slice[self.stream_slice_field]) | ||
| current_state = self._state.get(substream_id, {}).get(self.cursor_field) | ||
| last_state = most_recent_record[self.cursor_field] | ||
| max_dt = self._max_dt_str(last_state, current_state) | ||
| self._state[substream_id] = {self.cursor_field: max_dt} | ||
| return | ||
|
|
||
| def should_be_synced(self, record: Record) -> bool: | ||
| """ | ||
| As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the | ||
| implementation is irrelevant for greenhouse | ||
| """ | ||
| return True | ||
|
|
||
| def get_request_params( | ||
| self, | ||
| *, | ||
| stream_state: Optional[StreamState] = None, | ||
| stream_slice: Optional[StreamSlice] = None, | ||
| next_page_token: Optional[Mapping[str, Any]] = None, | ||
| ) -> MutableMapping[str, Any]: | ||
| # ignore other fields in a slice | ||
| return {self.request_cursor_field: stream_slice[self.request_cursor_field]} | ||
| states = [ | ||
| {"partition": {self._partition_key_field: int(key), "parent_slice": {}}, "cursor": value} for key, value in stream_state.items() | ||
| ] | ||
| return {"states": states} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| version: "0.70.0" | ||
| version: "6.33.6" | ||
|
|
||
| definitions: | ||
| selector: | ||
|
|
@@ -61,23 +61,29 @@ definitions: | |
| $ref: "#/definitions/retriever" | ||
| requester: "#/definitions/requester" | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSlicer | ||
| request_cursor_field: "updated_after" | ||
| cursor_field: "updated_at" | ||
| type: DatetimeBasedCursor | ||
| cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}" | ||
| cursor_datetime_formats: | ||
| - "%Y-%m-%dT%H:%M:%S.%_msZ" | ||
| datetime_format: "%Y-%m-%dT%H:%M:%S.%_msZ" | ||
| start_datetime: | ||
| type: MinMaxDatetime | ||
| datetime: "1970-01-01T00:00:00.000Z" | ||
| datetime_format: "%Y-%m-%dT%H:%M:%S.%_msZ" | ||
| start_time_option: | ||
| type: RequestOption | ||
| inject_into: request_parameter | ||
| field_name: "{{ parameters.get('cursor_request_option', 'updated_after') }}" | ||
| applications_stream: | ||
| $ref: "#/definitions/base_incremental_stream" | ||
| $parameters: | ||
| name: "applications" | ||
| path: "applications" | ||
| cursor_field: "applied_at" | ||
| cursor_request_option: "created_after" | ||
|
||
| retriever: | ||
| $ref: "#/definitions/retriever" | ||
| requester: "#/definitions/requester" | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSlicer | ||
| request_cursor_field: "created_after" | ||
| cursor_field: "applied_at" | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
|
|
@@ -967,22 +973,26 @@ definitions: | |
| - "null" | ||
| - string | ||
| applications_demographics_answers_stream: | ||
| $ref: "#/definitions/base_stream" | ||
| $ref: "#/definitions/base_incremental_stream" | ||
| $parameters: | ||
| name: "applications_demographics_answers" | ||
| retriever: | ||
| $ref: "#/definitions/retriever" | ||
| requester: | ||
| $ref: "#/definitions/requester" | ||
| path: "applications/{{ stream_slice.parent_id }}/demographics/answers" | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSubstreamSlicer | ||
| parent_stream: "#/definitions/applications_stream" | ||
| request_cursor_field: "updated_after" | ||
| stream_slice_field: "parent_id" | ||
| cursor_field: "updated_at" | ||
| parent_key: "id" | ||
| path: "applications/{{ stream_partition.application_id }}/demographics/answers" | ||
| partition_router: | ||
| type: SubstreamPartitionRouter | ||
| parent_stream_configs: | ||
| - type: ParentStreamConfig | ||
| parent_key: id | ||
| partition_field: application_id | ||
| stream: | ||
| $ref: "#/definitions/applications_stream" | ||
| incremental_dependency: true | ||
| state_migrations: | ||
| - type: CustomStateMigration | ||
| class_name: source_greenhouse.components.GreenhouseStateMigration | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
|
|
@@ -1033,22 +1043,26 @@ definitions: | |
| - string | ||
| format: date-time | ||
| applications_interviews_stream: | ||
| $ref: "#/definitions/base_stream" | ||
| $ref: "#/definitions/base_incremental_stream" | ||
| $parameters: | ||
| name: "applications_interviews" | ||
| retriever: | ||
| $ref: "#/definitions/retriever" | ||
| requester: | ||
| $ref: "#/definitions/requester" | ||
| path: "applications/{{ stream_slice.parent_id }}/scheduled_interviews" | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSubstreamSlicer | ||
| parent_stream: "#/definitions/applications_stream" | ||
| request_cursor_field: "updated_after" | ||
| stream_slice_field: "parent_id" | ||
| cursor_field: "updated_at" | ||
| parent_key: "id" | ||
| path: "applications/{{ stream_slice.application_id }}/scheduled_interviews" | ||
darynaishchenko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| partition_router: | ||
| type: SubstreamPartitionRouter | ||
| parent_stream_configs: | ||
| - type: ParentStreamConfig | ||
| parent_key: id | ||
| partition_field: application_id | ||
| stream: | ||
| $ref: "#/definitions/applications_stream" | ||
| incremental_dependency: true | ||
| state_migrations: | ||
| - type: CustomStateMigration | ||
| class_name: source_greenhouse.components.GreenhouseStateMigration | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
|
|
@@ -1951,23 +1965,26 @@ definitions: | |
| - "null" | ||
| - array | ||
| jobs_stages_stream: | ||
| $ref: "#/definitions/base_stream" | ||
| $ref: "#/definitions/base_incremental_stream" | ||
| $parameters: | ||
| name: "jobs_stages" | ||
| path: "jobs/{{ stream_slice.parent_id }}/stages" | ||
| retriever: | ||
| $ref: "#/definitions/retriever" | ||
| requester: | ||
| $ref: "#/definitions/requester" | ||
| path: "jobs/{{ stream_slice.parent_id }}/stages" | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSubstreamSlicer | ||
| parent_stream: "#/definitions/jobs_stream" | ||
| request_cursor_field: "updated_after" | ||
| stream_slice_field: "parent_id" | ||
| cursor_field: "updated_at" | ||
| parent_key: "id" | ||
| path: "jobs/{{ stream_slice.job_id }}/stages" | ||
darynaishchenko marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| partition_router: | ||
| type: SubstreamPartitionRouter | ||
| parent_stream_configs: | ||
| - type: ParentStreamConfig | ||
| parent_key: id | ||
| partition_field: job_id | ||
| stream: | ||
| $ref: "#/definitions/jobs_stream" | ||
| incremental_dependency: true | ||
| state_migrations: | ||
| - type: CustomStateMigration | ||
| class_name: source_greenhouse.components.GreenhouseStateMigration | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
|
|
@@ -3200,19 +3217,12 @@ definitions: | |
| $parameters: | ||
| name: "eeoc" | ||
| path: "eeoc" | ||
| cursor_field: "submitted_at" | ||
| cursor_request_option: "submitted_after" | ||
| primary_key: "application_id" | ||
| retriever: | ||
| $ref: "#/definitions/retriever" | ||
| requester: "#/definitions/requester" | ||
| stream_slicer: | ||
| request_cursor_field: "submitted_after" | ||
| cursor_field: "submitted_at" | ||
| class_name: source_greenhouse.components.GreenHouseSlicer | ||
| incremental_sync: | ||
| type: CustomIncrementalSync | ||
| class_name: source_greenhouse.components.GreenHouseSlicer | ||
| request_cursor_field: "submitted_after" | ||
| cursor_field: "submitted_at" | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.