Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 59f1e50a-331f-4f09-b3e8-2e8d4d355f44
dockerImageTag: 0.5.32
dockerImageTag: 0.6.0
dockerRepository: airbyte/source-greenhouse
documentationUrl: https://docs.airbyte.com/integrations/sources/greenhouse
githubIssueLabel: source-greenhouse
Expand Down
1,579 changes: 1,298 additions & 281 deletions airbyte-integrations/connectors/source-greenhouse/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.5.32"
version = "0.6.0"
name = "source-greenhouse"
description = "Source implementation for Greenhouse."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,9 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_greenhouse"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
dataclasses-jsonschema = "==2.15.1"
python = "^3.10,<3.12"
airbyte-cdk = "^6"

[tool.poetry.scripts]
source-greenhouse = "source_greenhouse.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,154 +2,35 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from dataclasses import InitVar, dataclass
from typing import Any, ClassVar, Iterable, Mapping, MutableMapping, Optional, Union
from typing import Any, Mapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental import Cursor
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.types import Config


@dataclass
class GreenHouseSlicer(Cursor):
parameters: InitVar[Mapping[str, Any]]
cursor_field: str
request_cursor_field: str
class GreenhouseStateMigration(LegacyToPerPartitionStateMigration):
declarative_stream: DeclarativeStream
config: Config

START_DATETIME: ClassVar[str] = "1970-01-01T00:00:00.000Z"
DATETIME_FORMAT: ClassVar[str] = "%Y-%m-%dT%H:%M:%S.%fZ"
def __init__(self, declarative_stream: DeclarativeStream, config: Config):
self._partition_router = declarative_stream.retriever.partition_router
self._cursor = declarative_stream.incremental_sync
self._config = config
self._parameters = declarative_stream.parameters
self._partition_key_field = InterpolatedString.create(
self._get_partition_field(self._partition_router), parameters=self._parameters
).eval(self._config)
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)

def __post_init__(self, parameters: Mapping[str, Any]):
self._state = {}

def stream_slices(self) -> Iterable[StreamSlice]:
yield StreamSlice(partition={}, cursor_slice={self.request_cursor_field: self._state.get(self.cursor_field, self.START_DATETIME)})

def _max_dt_str(self, *args: str) -> Optional[str]:
new_state_candidates = list(map(lambda x: datetime.datetime.strptime(x, self.DATETIME_FORMAT), filter(None, args)))
if not new_state_candidates:
return
max_dt = max(new_state_candidates)
# `.%f` gives us microseconds, we need milliseconds
(dt, micro) = max_dt.strftime(self.DATETIME_FORMAT).split(".")
return "%s.%03dZ" % (dt, int(micro[:-1:]) / 1000)

def set_initial_state(self, stream_state: StreamState) -> None:
cursor_value = stream_state.get(self.cursor_field)
if cursor_value:
self._state[self.cursor_field] = cursor_value

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
stream_slice_value = stream_slice.get(self.cursor_field)
current_state = self._state.get(self.cursor_field)
record_cursor_value = most_recent_record and most_recent_record[self.cursor_field]
max_dt = self._max_dt_str(stream_slice_value, current_state, record_cursor_value)
if not max_dt:
return
self._state[self.cursor_field] = max_dt

def should_be_synced(self, record: Record) -> bool:
"""
As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the
implementation is irrelevant for greenhouse
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
"""
return True

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer,
which leads to partition mismatch.
To prevent this type casting for partition key was added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding the comment for why we needed the custom component

Can you explain to me a little more about why this is needed. I think the explanation mostly makes sense, but I noticed that in the previous state format, it was stored as a string being the key, followed by that partition's state value. Why didn't run into the state key string vs. greenhouse id string in the old state format when we were using strings?

Not strictly a blocker, but it just seems interesting that we now have to convert state to integers when strings used to work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was working before because custom StreamSlicer overrides stream_slices method:
so it has right partition where parent primary key(id) is integer: here
and converts parent primary key to string to access the cursor value from state: here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes things more clear. I missed the part where the old custom component was casting. thank you

"""
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
"""
first_cursor_value = first.get(self.cursor_field, "")
second_cursor_value = second.get(self.cursor_field, "")
if first_cursor_value and second_cursor_value:
return first_cursor_value >= second_cursor_value
elif first_cursor_value:
return True
else:
return False

def _parse_to_datetime(self, datetime_str: str) -> datetime.datetime:
return datetime.datetime.strptime(datetime_str, self.DATETIME_FORMAT)

def get_stream_state(self) -> StreamState:
return self._state

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return stream_slice or {}

def get_request_headers(self, *args, **kwargs) -> Mapping[str, Any]:
return {}

def get_request_body_data(self, *args, **kwargs) -> Optional[Union[Mapping, str]]:
return {}

def get_request_body_json(self, *args, **kwargs) -> Optional[Mapping]:
return {}


@dataclass
class GreenHouseSubstreamSlicer(GreenHouseSlicer):
parent_stream: Stream
stream_slice_field: str
parent_key: str

def stream_slices(self) -> Iterable[StreamSlice]:
for parent_stream_slice in self.parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=self.get_stream_state()
):
for parent_record in self.parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
parent_primary_key = parent_record.get(self.parent_key)

partition = {self.stream_slice_field: parent_primary_key}
cursor_slice = {
self.request_cursor_field: self._state.get(str(parent_primary_key), {}).get(self.cursor_field, self.START_DATETIME)
}

yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

def set_initial_state(self, stream_state: StreamState) -> None:
if self.stream_slice_field in stream_state:
return
substream_ids = map(lambda x: str(x), set(stream_state.keys()) | set(self._state.keys()))
for id_ in substream_ids:
self._state[id_] = {
self.cursor_field: self._max_dt_str(
stream_state.get(id_, {}).get(self.cursor_field), self._state.get(id_, {}).get(self.cursor_field)
)
}

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
if most_recent_record:
substream_id = str(stream_slice[self.stream_slice_field])
current_state = self._state.get(substream_id, {}).get(self.cursor_field)
last_state = most_recent_record[self.cursor_field]
max_dt = self._max_dt_str(last_state, current_state)
self._state[substream_id] = {self.cursor_field: max_dt}
return

def should_be_synced(self, record: Record) -> bool:
"""
As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the
implementation is irrelevant for greenhouse
"""
return True

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
# ignore other fields in a slice
return {self.request_cursor_field: stream_slice[self.request_cursor_field]}
states = [
{"partition": {self._partition_key_field: int(key), "parent_slice": {}}, "cursor": value} for key, value in stream_state.items()
]
return {"states": states}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "0.70.0"
version: "6.33.6"

definitions:
selector:
Expand Down Expand Up @@ -61,23 +61,33 @@ definitions:
$ref: "#/definitions/retriever"
requester: "#/definitions/requester"
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSlicer
request_cursor_field: "updated_after"
type: DatetimeBasedCursor
cursor_field: "updated_at"
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%_msZ"
datetime_format: "%Y-%m-%dT%H:%M:%S.%_msZ"
start_datetime:
type: MinMaxDatetime
datetime: "1970-01-01T00:00:00.000Z"
datetime_format: "%Y-%m-%dT%H:%M:%S.%_msZ"
start_time_option:
type: RequestOption
inject_into: request_parameter
field_name: "updated_after"
applications_stream:
$ref: "#/definitions/base_incremental_stream"
$parameters:
name: "applications"
path: "applications"
incremental_sync:
$ref: "#/definitions/base_incremental_stream/incremental_sync"
cursor_field: "applied_at"
start_time_option:
$ref: "#/definitions/base_incremental_stream/incremental_sync/start_time_option"
field_name: "created_after"
retriever:
$ref: "#/definitions/retriever"
requester: "#/definitions/requester"
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSlicer
request_cursor_field: "created_after"
cursor_field: "applied_at"
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down Expand Up @@ -967,22 +977,26 @@ definitions:
- "null"
- string
applications_demographics_answers_stream:
$ref: "#/definitions/base_stream"
$ref: "#/definitions/base_incremental_stream"
$parameters:
name: "applications_demographics_answers"
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
path: "applications/{{ stream_slice.parent_id }}/demographics/answers"
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSubstreamSlicer
parent_stream: "#/definitions/applications_stream"
request_cursor_field: "updated_after"
stream_slice_field: "parent_id"
cursor_field: "updated_at"
parent_key: "id"
path: "applications/{{ stream_partition.application_id }}/demographics/answers"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: application_id
stream:
$ref: "#/definitions/applications_stream"
incremental_dependency: true
state_migrations:
- type: CustomStateMigration
class_name: source_greenhouse.components.GreenhouseStateMigration
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down Expand Up @@ -1033,22 +1047,26 @@ definitions:
- string
format: date-time
applications_interviews_stream:
$ref: "#/definitions/base_stream"
$ref: "#/definitions/base_incremental_stream"
$parameters:
name: "applications_interviews"
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
path: "applications/{{ stream_slice.parent_id }}/scheduled_interviews"
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSubstreamSlicer
parent_stream: "#/definitions/applications_stream"
request_cursor_field: "updated_after"
stream_slice_field: "parent_id"
cursor_field: "updated_at"
parent_key: "id"
path: "applications/{{ stream_slice.application_id }}/scheduled_interviews"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: application_id
stream:
$ref: "#/definitions/applications_stream"
incremental_dependency: true
state_migrations:
- type: CustomStateMigration
class_name: source_greenhouse.components.GreenhouseStateMigration
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down Expand Up @@ -1951,23 +1969,26 @@ definitions:
- "null"
- array
jobs_stages_stream:
$ref: "#/definitions/base_stream"
$ref: "#/definitions/base_incremental_stream"
$parameters:
name: "jobs_stages"
path: "jobs/{{ stream_slice.parent_id }}/stages"
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
path: "jobs/{{ stream_slice.parent_id }}/stages"
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSubstreamSlicer
parent_stream: "#/definitions/jobs_stream"
request_cursor_field: "updated_after"
stream_slice_field: "parent_id"
cursor_field: "updated_at"
parent_key: "id"
path: "jobs/{{ stream_slice.job_id }}/stages"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- type: ParentStreamConfig
parent_key: id
partition_field: job_id
stream:
$ref: "#/definitions/jobs_stream"
incremental_dependency: true
state_migrations:
- type: CustomStateMigration
class_name: source_greenhouse.components.GreenhouseStateMigration
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down Expand Up @@ -3204,15 +3225,12 @@ definitions:
retriever:
$ref: "#/definitions/retriever"
requester: "#/definitions/requester"
stream_slicer:
request_cursor_field: "submitted_after"
cursor_field: "submitted_at"
class_name: source_greenhouse.components.GreenHouseSlicer
incremental_sync:
type: CustomIncrementalSync
class_name: source_greenhouse.components.GreenHouseSlicer
request_cursor_field: "submitted_after"
$ref: "#/definitions/base_incremental_stream/incremental_sync"
cursor_field: "submitted_at"
start_time_option:
$ref: "#/definitions/base_incremental_stream/incremental_sync/start_time_option"
field_name: "submitted_after"
schema_loader:
type: InlineSchemaLoader
schema:
Expand Down
Loading
Loading