Skip to content

fix(source-greenhouse): remove custom cursors#54702

Merged
Daryna Ishchenko (darynaishchenko) merged 14 commits intomasterfrom
daryna/source-greenhouce/milliseconds-cursor
Mar 14, 2025
Merged

fix(source-greenhouse): remove custom cursors#54702
Daryna Ishchenko (darynaishchenko) merged 14 commits intomasterfrom
daryna/source-greenhouce/milliseconds-cursor

Conversation

@darynaishchenko
Copy link
Collaborator

@darynaishchenko Daryna Ishchenko (darynaishchenko) commented Feb 27, 2025

What

https://github.com/airbytehq/airbyte-internal-issues/issues/11527

How

Based on cdk changes in airbytehq/airbyte-python-cdk#369 we now able to use %_ms as milliseconds identifier.
Removed custom cursors and added incremental sync based on DatetimeBasedCursor.
State migrations from legacy to per partition were added to make state change non-breaking.

Review guide

User Impact

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

@vercel
Copy link

vercel bot commented Feb 27, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Mar 14, 2025 5:30pm

@fumeoss
Copy link

Fume OSS (fumeoss) commented Feb 27, 2025

Summary

  • Removed custom cursors in the Greenhouse source integration and implemented incremental sync using DatetimeBasedCursor.
  • Introduced state migrations from legacy to per-partition storage to ensure non-breaking changes.
  • Updated the connector's version from 0.5.32 to 0.6.0 in the metadata and manifest files.
  • Modified the GreenhouseStateMigration class to handle partition key migrations correctly.
  • Updated documentation to reflect the changes made in the connector.
🧪 3 passed

🟢 Test Sub-stream State Migration for source-greenhouse

🟢 Test Legacy State Migration for Greenhouse Connector - Jira Issue #11527

🟢 Test Millisecond Precision Datetime Format for DatetimeBasedCursor Implementation

Do you want to test every PR like this? 👉 make Fume a regular reviewer at Airbyte

@darynaishchenko
Copy link
Collaborator Author

Regression tests:

test_catalog_are_the_same[failed]

is_resumable now true(due to cdk update)

TestDataIntegrity.test_record_schema_match_with_state [failed]

Null fields are not in records(due to cdk update)

Record count mismatch

some inctemental streams have more records in target version, data could be added between reads, we should be fine with this.

Copy link
Contributor

@natikgadzhi Natik Gadzhi (natikgadzhi) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes look good to me, granted I'm still not fully an expert in the area. Given the CDK change landed, I'd love to merge this, do a single regression test (if we haven't already) and start a progressive rollout.

The only ask I have is to bump pytest to 8, deps hygiene.

Daryna Ishchenko (@darynaishchenko) take this away, let's get this shipped.

def run():
source = SourceGreenhouse()
launch(source, sys.argv[1:])
def _get_source(args: List[str]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, non-blocking: I think I've seen conversations about our approach to run.py — this seems like boilerplate, feels off. Is there a reason we have to have this code here, as opposed to having this inside a generic source init?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Natik Gadzhi (@natikgadzhi) I agree that we should probably want to ideally have this generified (is that a word?) into the CDK implementation, but the issue i think when we first build this was that run.py isn't even a thing in the CDK itself. So we were left with a bit of a manual process.

And while this is annoying, ultimately all these individual run.py definitions should ultimately get removed as we move to manifest-only connectors where all of this is already addressed within source-declarative-manifest https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-declarative-manifest/source_declarative_manifest/run.py#L152-L160.

I think unless we find free cycles to make this easier in the CDK, I don't think this is a super high prioirty because it will ultimately be throwaway work once we treat everything as manifest-only.

Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good. if you have a chance can you rebase this w/ the latest master and re-run regression tests and I can do a final approval

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer,
which leads to partition mismatch.
To prevent this type casting for partition key was added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding the comment for why we needed the custom component

Can you explain to me a little more about why this is needed. I think the explanation mostly makes sense, but I noticed that in the previous state format, it was stored as a string being the key, followed by that partition's state value. Why didn't run into the state key string vs. greenhouse id string in the old state format when we were using strings?

Not strictly a blocker, but it just seems interesting that we now have to convert state to integers when strings used to work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was working before because custom StreamSlicer overrides stream_slices method:
so it has right partition where parent primary key(id) is integer: here
and converts parent primary key to string to access the cursor value from state: here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes things more clear. I missed the part where the old custom component was casting. thank you

name: "applications"
path: "applications"
cursor_field: "applied_at"
cursor_request_option: "created_after"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small suggestion, instead of adding these two arbitrary parameters that we then need to account for using field_name: "{{ parameters.get('cursor_request_option', 'updated_after'), I would suggest that we instead just define a new:

incremental_sync:
  type: DatetimeBasedCursor
  ...
  start_time_option:
    type: RequestOption
    inject_into: request_parameter
    field_name: "created_after"

I think that reads more easily than the extra parameter injection because at first I thought it was named wrong as cursor_request_option. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed cursor_request_option and cursor_field from parameters. updated incremental_sync to use $ref to basic incremental_sync instead and overrided start_time_option and cursor_field

Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few small suggestions and tweaks, but after those are fixed, this should be ready to go. approved ✅

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer,
which leads to partition mismatch.
To prevent this type casting for partition key was added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes things more clear. I missed the part where the old custom component was casting. thank you

Copy link
Contributor

@brianjlai Brian Lai (brianjlai) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few small suggestions and tweaks, but after those are fixed, this should be ready to go. approved ✅

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
LegacyToPerPartitionStateMigration migrates partition keys as string, while real type of id in greenhouse is integer,
which leads to partition mismatch.
To prevent this type casting for partition key was added.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes things more clear. I missed the part where the old custom component was casting. thank you

@darynaishchenko Daryna Ishchenko (darynaishchenko) merged commit 3b18ecd into master Mar 14, 2025
27 checks passed
@darynaishchenko Daryna Ishchenko (darynaishchenko) deleted the daryna/source-greenhouce/milliseconds-cursor branch March 14, 2025 17:47
Sven Pöche (Valgard) pushed a commit to mayflower/airbyte that referenced this pull request Mar 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/greenhouse

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants