Understanding the Airbyte CDK #3: read command
#33814
Marcos Marx (marcosmarxm)
started this conversation in
Guides & Tutorials
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
How the Airbyte CDK read command works
sequenceDiagram autonumber Entrypoint ->>+ AirbyteEntrypoint: launch() AirbyteEntrypoint ->>- Entrypoint: Create Source Object loop run() Note left of Entrypoint: Start read operation AirbyteEntrypoint ->>+ Source: read_catalog() Source ->>- AirbyteEntrypoint: ConfiguredAirbyteCatalog AirbyteEntrypoint ->>+ Source: read_state() Source ->>- AirbyteEntrypoint: AirbyteStateMessage AirbyteEntrypoint ->>+ AbstractSource: read(logger, config, catalog, state) loop Each Stream Note over AbstractSource,HttpStream: (incremental or full fresh) AbstractSource ->>+ HttpStream: read_full_refresh HttpStream ->>+ StreamImplementation: read_records StreamImplementation ->>- AbstractSource: parse_response yield records end AbstractSource ->>- Entrypoint: return each record endlaunch()function which receives the source object referenceread_catalog()will read the injected file in/tmp/workspace/<JOB_ID>/<ATTEMPT>and map to AirbyteCatalog objectConfiguredAirbyteCatalogobjectread_state()does the same but for the state fileAirbyteStateMessageobjectreadfirst validate if the config is valid and after calls thereadfunction from the Source classread_recordsfrom theHttpStreamclass. (Some special connectors overwrite this function but mostly use from the base class).request_paramandnext_page_tokenfunction outputparse_responsereads the json object returned from the API and output individual records which will be broadcast to the STDIN and be read by the Airbyte Worker and sent to the destinationFrom the
[Entrypoint.py](http://Entrypoint.py)file in Airbyte CDKcheckfunction first the read will mask any secretsconfigis compatible with thespecversion of the connectorreadfunction from theAbstractSourceclassThe
readfunction in theAbstractSourceis quite big one.discoverfunction it will call thestreamsfunction to retrieve all streams but now created a dictionary to easily map the stream class from their nameConnectorStateManagerwhich will handle all state messagesConfiguredStream(the selected stream by the user in the UI during the connection creation)HttpAvailabilityStrategyclass which checks the first recordSTARTED_read_streamincrementalorfull_refreshfor each case will call a different function to read the records.Full Refresh
Breaking down the
full_refreshmethodThe first step is to call the
stream_slices. This concept is somewhat complex and can mislead users. Let's use one example to make it easier:In our example our service has two endpoints
OrdersandOrderDetailsThe
Ordersreturn only the list of orders without any detailsAnd whatever we access the
OrderDetailswith theorder_details/2This can be translated as
Another situations where you can use
stream_slicesare:canceledcompletedpendingso you can create a parameter where the user has the option to retrieve records from each of them.Well, returning to the full refresh reading function…
It will iterate over all stream slices and call for each one of them the
read_recordsfunction.This function is implemented in the
HttpStreamclass (for most cases)Stay with me! I know this one is not the most basic function 😟
Trying to translate it's generating an anonymous function which calls the
parse_responseThe
parse_responseis one of the function you must implement during the connector creation.And calls the
_read_pagesfunction which iterates over all pages from our API_fetch_next_pagecreates and sends the actual request to the APIbody_jsonbody_dataheadersand therequest_parameteritself.read_pagesreceives the response and pass to theparse_responsefunction to yield the records.next_page_tokendict it will send the data to next request and keep thepagination_complete = FalseIncremental Reading
The incremental reading is quite similar to the full refresh. The difference is when it starts will retrieve the previous
stream_stateand build the first HTTP request using this data.And for each record read it will try to update the state which has a specific logic to get updated.
We can check here we update the counter and try to checkpoint the state
What customization you can add to your connector?
Beta Was this translation helpful? Give feedback.
All reactions