From ba231d74ecf91db1f65d55848b83b420f989ebf2 Mon Sep 17 00:00:00 2001 From: lmossman Date: Sun, 13 Apr 2025 14:23:02 -0700 Subject: [PATCH 01/10] Correct fields in declarative_component_schema --- .../declarative_component_schema.yaml | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2f09579ba..0ce1c44ad 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -440,7 +440,9 @@ definitions: description: Backoff time in seconds. anyOf: - type: number + title: Number of seconds - type: string + title: Interpolated Value interpolation_context: - config examples: @@ -1057,15 +1059,18 @@ definitions: type: string enum: [JwtAuthenticator] secret_key: + title: Secret Key type: string description: Secret used to sign the JSON web token. examples: - "{{ config['secret_key'] }}" base64_encode_secret_key: + title: Base64-encode Secret Key type: boolean description: When set to true, the secret key will be base64 encoded prior to being encoded as part of the JWT. Only set to "true" when required by the API. default: False algorithm: + title: Algorithm type: string description: Algorithm used to sign the JSON web token. enum: @@ -1673,13 +1678,15 @@ definitions: title: Pagination Strategy description: Strategy defining how records are paginated. anyOf: + - "$ref": "#/definitions/PageIncrement" + - "$ref": "#/definitions/OffsetIncrement" - "$ref": "#/definitions/CursorPagination" - "$ref": "#/definitions/CustomPaginationStrategy" - - "$ref": "#/definitions/OffsetIncrement" - - "$ref": "#/definitions/PageIncrement" page_size_option: + title: Inject Page Size Into Outgoing HTTP Request "$ref": "#/definitions/RequestOption" page_token_option: + title: Inject Page Token Into Outgoing HTTP Request anyOf: - "$ref": "#/definitions/RequestOption" - "$ref": "#/definitions/RequestPath" @@ -1777,6 +1784,8 @@ definitions: type: object additionalProperties: true SessionTokenAuthenticator: + title: Session Token Authenticator + description: Authenticator for requests using the session token as an API key that's injected into the request. type: object required: - type @@ -1907,23 +1916,23 @@ definitions: title: Authenticator description: Authentication method to use for requests sent to the API. anyOf: + - "$ref": "#/definitions/NoAuth" - "$ref": "#/definitions/ApiKeyAuthenticator" - "$ref": "#/definitions/BasicHttpAuthenticator" - "$ref": "#/definitions/BearerAuthenticator" - - "$ref": "#/definitions/CustomAuthenticator" - "$ref": "#/definitions/OAuthAuthenticator" - "$ref": "#/definitions/JwtAuthenticator" - - "$ref": "#/definitions/NoAuth" - "$ref": "#/definitions/SessionTokenAuthenticator" - - "$ref": "#/definitions/LegacySessionTokenAuthenticator" - "$ref": "#/definitions/SelectiveAuthenticator" + - "$ref": "#/definitions/CustomAuthenticator" + - "$ref": "#/definitions/LegacySessionTokenAuthenticator" error_handler: title: Error Handler description: Error handler component that defines how to handle errors. anyOf: - "$ref": "#/definitions/DefaultErrorHandler" - - "$ref": "#/definitions/CustomErrorHandler" - "$ref": "#/definitions/CompositeErrorHandler" + - "$ref": "#/definitions/CustomErrorHandler" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). @@ -1993,11 +2002,13 @@ definitions: description: Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. anyOf: - type: string + title: Interpolated Value - type: object + title: Key/Value Pairs additionalProperties: anyOf: - type: string - - $ref": "#/definitions/QueryProperties" + - $ref: "#/definitions/QueryProperties" interpolation_context: - next_page_token - stream_interval @@ -2877,7 +2888,9 @@ definitions: description: The number of records to include in each pages. anyOf: - type: integer + title: Number of Records - type: string + title: Interpolated Value interpolation_context: - config - response @@ -2909,7 +2922,9 @@ definitions: - config anyOf: - type: integer + title: Number of Records - type: string + title: Interpolated Value examples: - 100 - "100" @@ -3116,6 +3131,7 @@ definitions: type: string enum: [RecordFilter] condition: + title: Condition description: The predicate to filter a record. Records will be removed if evaluated to False. type: string default: "" @@ -3145,25 +3161,24 @@ definitions: enum: [RecordSelector] extractor: anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" record_filter: title: Record Filter description: Responsible for filtering records to be emitted by the Source. anyOf: - - "$ref": "#/definitions/CustomRecordFilter" - "$ref": "#/definitions/RecordFilter" + - "$ref": "#/definitions/CustomRecordFilter" schema_normalization: title: Schema Normalization description: Responsible for normalization according to the schema. anyOf: - "$ref": "#/definitions/SchemaNormalization" - "$ref": "#/definitions/CustomSchemaNormalization" - default: None transform_before_filtering: + title: Transform Before Filtering description: If true, transformation will be applied before record filtering. type: boolean - default: false $parameters: type: object additionalProperties: true @@ -3172,11 +3187,11 @@ definitions: description: Responsible for normalization according to the schema. type: string enum: - - None - Default - examples: - None + examples: - Default + - None RemoveFields: title: Remove Fields description: A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown. @@ -3274,6 +3289,7 @@ definitions: title: Session Token Authenticator description: Deprecated - use SessionTokenAuthenticator instead. Authenticator for requests authenticated using session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session. type: object + deprecated: true required: - type - header @@ -3332,6 +3348,7 @@ definitions: type: object additionalProperties: true StateDelegatingStream: + title: State Delegating Stream description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value. type: object required: @@ -3362,6 +3379,7 @@ definitions: type: object additionalProperties: true SimpleRetriever: + title: Synchronous Retriever description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router. type: object required: @@ -3378,8 +3396,8 @@ definitions: requester: description: Requester component that describes how to prepare HTTP requests to send to the source API. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" paginator: description: Paginator component that describes how to navigate through the API's pages. anyOf: @@ -3482,6 +3500,7 @@ definitions: items: type: string AsyncRetriever: + title: Asynchronous Retriever description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." type: object required: @@ -3507,18 +3526,18 @@ definitions: status_extractor: description: Responsible for fetching the actual status of the async job. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" download_target_extractor: description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" download_extractor: description: Responsible for fetching the records from provided urls. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/ResponseToFileExtractor" creation_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job. From ec426de772f5bbcb6a9d6a248a2f8841042a84a5 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 18 Apr 2025 11:18:26 -0700 Subject: [PATCH 02/10] reorder anyOf options and add titles --- .../declarative_component_schema.yaml | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 0ce1c44ad..a7272ebdb 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1948,10 +1948,10 @@ definitions: title: Request Body Payload (Non-JSON) description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form. anyOf: - - type: string - type: object additionalProperties: type: string + - type: string interpolation_context: - next_page_token - stream_interval @@ -1966,9 +1966,9 @@ definitions: title: Request Body JSON Payload description: Specifies how to populate the body of the request with a JSON payload. Can contain nested objects. anyOf: - - type: string - type: object additionalProperties: true + - type: string interpolation_context: - next_page_token - stream_interval @@ -1985,10 +1985,10 @@ definitions: title: Request Headers description: Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. anyOf: - - type: string - type: object additionalProperties: type: string + - type: string interpolation_context: - next_page_token - stream_interval @@ -2001,14 +2001,14 @@ definitions: title: Query Parameters description: Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. anyOf: - - type: string - title: Interpolated Value - type: object title: Key/Value Pairs additionalProperties: anyOf: - type: string - $ref: "#/definitions/QueryProperties" + - type: string + title: Interpolated Value interpolation_context: - next_page_token - stream_interval @@ -3018,10 +3018,13 @@ definitions: description: The stream field to be used to distinguish unique records. Can either be a single field, an array of fields representing a composite key, or an array of arrays representing a composite key where the fields are nested fields. anyOf: - type: string + title: Single Key - type: array + title: Composite Key items: type: string - type: array + title: Composite Key of Nested Fields items: type: array items: @@ -3412,22 +3415,21 @@ definitions: description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. default: [] anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" - type: array items: anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" decoder: title: Decoder description: Component decoding the response so records can be extracted. anyOf: - - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" @@ -3435,6 +3437,7 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" $parameters: type: object additionalProperties: true @@ -3542,13 +3545,13 @@ definitions: creation_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" polling_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" polling_job_timeout: description: The time in minutes after which the single Async Job should be considered as Timed Out. anyOf: @@ -3559,13 +3562,13 @@ definitions: download_target_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" download_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" download_paginator: description: Paginator component that describes how to navigate through the API's pages during download. anyOf: @@ -3574,34 +3577,33 @@ definitions: abort_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" delete_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" partition_router: title: Partition Router description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. default: [] anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" - type: array items: anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" decoder: title: Decoder description: Component decoding the response so records can be extracted. anyOf: - - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" @@ -3609,11 +3611,11 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" download_decoder: title: Download Decoder description: Component decoding the download response so records can be extracted. anyOf: - - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" @@ -3621,6 +3623,7 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" $parameters: type: object additionalProperties: true From c922da02a98ad0a82b4991322a18711463536527 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 21 Apr 2025 09:44:12 -0700 Subject: [PATCH 03/10] add more titles --- .../sources/declarative/declarative_component_schema.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a7272ebdb..ee65b679b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1986,9 +1986,11 @@ definitions: description: Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. anyOf: - type: object + title: Key/Value Pairs additionalProperties: type: string - type: string + title: Interpolated Value interpolation_context: - next_page_token - stream_interval From 7d0b2b806a88afdfcf1e318317ae1a6a1c60ed2b Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 21 Apr 2025 20:15:42 -0700 Subject: [PATCH 04/10] reorder fields --- .../declarative_component_schema.yaml | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ee65b679b..97c72efda 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1394,27 +1394,27 @@ definitions: type: type: string enum: [DeclarativeStream] + name: + title: Name + description: The stream name. + type: string + default: "" + example: + - "Users" retriever: title: Retriever description: Component used to coordinate how records are extracted across stream slices and request pages. anyOf: + - "$ref": "#/definitions/SimpleRetriever" - "$ref": "#/definitions/AsyncRetriever" - "$ref": "#/definitions/CustomRetriever" - - "$ref": "#/definitions/SimpleRetriever" incremental_sync: title: Incremental Sync description: Component used to fetch data incrementally based on a time field in the data. anyOf: - - "$ref": "#/definitions/CustomIncrementalSync" - "$ref": "#/definitions/DatetimeBasedCursor" - "$ref": "#/definitions/IncrementingCountCursor" - name: - title: Name - description: The stream name. - type: string - default: "" - example: - - "Users" + - "$ref": "#/definitions/CustomIncrementalSync" primary_key: title: Primary Key description: The primary key of the stream. @@ -1912,6 +1912,17 @@ definitions: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" - "/trades/{{ config['symbol_id'] }}/history" + http_method: + title: HTTP Method + description: The HTTP method used to fetch data from the source (can be GET or POST). + type: string + enum: + - GET + - POST + default: GET + examples: + - GET + - POST authenticator: title: Authenticator description: Authentication method to use for requests sent to the API. @@ -1926,24 +1937,6 @@ definitions: - "$ref": "#/definitions/SelectiveAuthenticator" - "$ref": "#/definitions/CustomAuthenticator" - "$ref": "#/definitions/LegacySessionTokenAuthenticator" - error_handler: - title: Error Handler - description: Error handler component that defines how to handle errors. - anyOf: - - "$ref": "#/definitions/DefaultErrorHandler" - - "$ref": "#/definitions/CompositeErrorHandler" - - "$ref": "#/definitions/CustomErrorHandler" - http_method: - title: HTTP Method - description: The HTTP method used to fetch data from the source (can be GET or POST). - type: string - enum: - - GET - - POST - default: GET - examples: - - GET - - POST request_body_data: title: Request Body Payload (Non-JSON) description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form. @@ -2021,6 +2014,13 @@ definitions: - query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - searchIn: "{{ ','.join(config.get('search_in', [])) }}" - sort_by[asc]: updated_at + error_handler: + title: Error Handler + description: Error handler component that defines how to handle errors. + anyOf: + - "$ref": "#/definitions/DefaultErrorHandler" + - "$ref": "#/definitions/CompositeErrorHandler" + - "$ref": "#/definitions/CustomErrorHandler" use_cache: title: Use Cache description: Enables stream requests caching. This field is automatically set by the CDK. @@ -3395,14 +3395,26 @@ definitions: type: type: string enum: [SimpleRetriever] - record_selector: - description: Component that describes how to extract records from a HTTP response. - "$ref": "#/definitions/RecordSelector" requester: description: Requester component that describes how to prepare HTTP requests to send to the source API. anyOf: - "$ref": "#/definitions/HttpRequester" - "$ref": "#/definitions/CustomRequester" + decoder: + title: Decoder + description: Component decoding the response so records can be extracted. + anyOf: + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + - "$ref": "#/definitions/IterableDecoder" + - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" + record_selector: + description: Component that describes how to extract records from a HTTP response. + "$ref": "#/definitions/RecordSelector" paginator: description: Paginator component that describes how to navigate through the API's pages. anyOf: @@ -3428,18 +3440,6 @@ definitions: - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" - "$ref": "#/definitions/CustomPartitionRouter" - decoder: - title: Decoder - description: Component decoding the response so records can be extracted. - anyOf: - - "$ref": "#/definitions/CsvDecoder" - - "$ref": "#/definitions/GzipDecoder" - - "$ref": "#/definitions/JsonDecoder" - - "$ref": "#/definitions/JsonlDecoder" - - "$ref": "#/definitions/IterableDecoder" - - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/ZipfileDecoder" - - "$ref": "#/definitions/CustomDecoder" $parameters: type: object additionalProperties: true From 99a1b9b5f16301e793236ad94b8b7193bd20bd2c Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 22 Apr 2025 21:30:57 -0700 Subject: [PATCH 05/10] more rearranging --- .../sources/declarative/declarative_component_schema.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 97c72efda..ce3fef512 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1424,8 +1424,8 @@ definitions: title: Schema Loader description: Component used to retrieve the schema for the current stream. anyOf: - - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/InlineSchemaLoader" + - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/JsonFileSchemaLoader" - "$ref": "#/definitions/CustomSchemaLoader" # TODO we have move the transformation to the RecordSelector level in the code but kept this here for @@ -2235,6 +2235,7 @@ definitions: title: Schema description: Describes a streams' schema. Refer to the Data Types documentation for more details on which types are valid. type: object + additionalProperties: true JsonFileSchemaLoader: title: Json File Schema Loader description: Loads the schema from a json file. @@ -3373,11 +3374,11 @@ definitions: example: - "Users" full_refresh_stream: - title: Retriever + title: Full Refresh Stream description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided. "$ref": "#/definitions/DeclarativeStream" incremental_stream: - title: Retriever + title: Incremental Stream description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided. "$ref": "#/definitions/DeclarativeStream" $parameters: From e7f9848a24e2d38f02fbcbf0a524e3e8f9747168 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 23 Apr 2025 08:00:24 +0300 Subject: [PATCH 06/10] generated updated models --- .../models/declarative_component_schema.py | 166 +++++++++--------- 1 file changed, 85 insertions(+), 81 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index f2fb6ccf9..d8db49722 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -417,15 +415,18 @@ class JwtAuthenticator(BaseModel): ..., description="Secret used to sign the JSON web token.", examples=["{{ config['secret_key'] }}"], + title="Secret Key", ) base64_encode_secret_key: Optional[bool] = Field( False, description='When set to true, the secret key will be base64 encoded prior to being encoded as part of the JWT. Only set to "true" when required by the API.', + title="Base64-encode Secret Key", ) algorithm: Algorithm = Field( ..., description="Algorithm used to sign the JSON web token.", examples=["ES256", "HS256", "RS256", "{{ config['algorithm'] }}"], + title="Algorithm", ) token_duration: Optional[int] = Field( 1200, @@ -880,20 +881,17 @@ class FlattenFields(BaseModel): class KeyTransformation(BaseModel): - prefix: Optional[Union[str, None]] = Field( + type: Literal["KeyTransformation"] + prefix: Optional[str] = Field( None, description="Prefix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "flattened_", - ], + examples=["flattened_"], title="Key Prefix", ) - suffix: Optional[Union[str, None]] = Field( + suffix: Optional[str] = Field( None, description="Suffix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "_flattened", - ], + examples=["_flattened"], title="Key Suffix", ) @@ -916,7 +914,7 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) - key_transformation: Optional[Union[KeyTransformation, None]] = Field( + key_transformation: Optional[KeyTransformation] = Field( None, description="Transformation for object keys. If not provided, original key will be used.", title="Key transformation", @@ -1260,13 +1258,14 @@ class RecordFilter(BaseModel): "{{ record['created_at'] >= stream_interval['start_time'] }}", "{{ record.status in ['active', 'expired'] }}", ], + title="Condition", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class SchemaNormalization(Enum): - None_ = "None" Default = "Default" + None_ = "None" class RemoveFields(BaseModel): @@ -1802,14 +1801,18 @@ class DefaultErrorHandler(BaseModel): class DefaultPaginator(BaseModel): type: Literal["DefaultPaginator"] pagination_strategy: Union[ - CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement + PageIncrement, OffsetIncrement, CursorPagination, CustomPaginationStrategy ] = Field( ..., description="Strategy defining how records are paginated.", title="Pagination Strategy", ) - page_size_option: Optional[RequestOption] = None - page_token_option: Optional[Union[RequestOption, RequestPath]] = None + page_size_option: Optional[RequestOption] = Field( + None, title="Inject Page Size Into Outgoing HTTP Request" + ) + page_token_option: Optional[Union[RequestOption, RequestPath]] = Field( + None, title="Inject Page Token Into Outgoing HTTP Request" + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1850,20 +1853,21 @@ class ListPartitionRouter(BaseModel): class RecordSelector(BaseModel): type: Literal["RecordSelector"] - extractor: Union[CustomRecordExtractor, DpathExtractor] - record_filter: Optional[Union[CustomRecordFilter, RecordFilter]] = Field( + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( None, description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - SchemaNormalization.None_, + None, description="Responsible for normalization according to the schema.", title="Schema Normalization", ) transform_before_filtering: Optional[bool] = Field( - False, + None, description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2088,7 +2092,6 @@ class FileUploader(BaseModel): "{{ record.id }}_{{ record.file_name }}/", ], ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): @@ -2096,26 +2099,26 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) incremental_sync: Optional[ - Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor] + Union[DatetimeBasedCursor, IncrementingCountCursor, CustomIncrementalSync] ] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) schema_loader: Optional[ Union[ - DynamicSchemaLoader, InlineSchemaLoader, + DynamicSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader, ] @@ -2224,38 +2227,31 @@ class HttpRequester(BaseModel): ], title="URL Path", ) + http_method: Optional[HttpMethod] = Field( + HttpMethod.GET, + description="The HTTP method used to fetch data from the source (can be GET or POST).", + examples=["GET", "POST"], + title="HTTP Method", + ) authenticator: Optional[ Union[ + NoAuth, ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator, - CustomAuthenticator, OAuthAuthenticator, JwtAuthenticator, - NoAuth, SessionTokenAuthenticator, - LegacySessionTokenAuthenticator, SelectiveAuthenticator, + CustomAuthenticator, + LegacySessionTokenAuthenticator, ] ] = Field( None, description="Authentication method to use for requests sent to the API.", title="Authenticator", ) - error_handler: Optional[ - Union[DefaultErrorHandler, CustomErrorHandler, CompositeErrorHandler] - ] = Field( - None, - description="Error handler component that defines how to handle errors.", - title="Error Handler", - ) - http_method: Optional[HttpMethod] = Field( - HttpMethod.GET, - description="The HTTP method used to fetch data from the source (can be GET or POST).", - examples=["GET", "POST"], - title="HTTP Method", - ) - request_body_data: Optional[Union[str, Dict[str, str]]] = Field( + request_body_data: Optional[Union[Dict[str, str], str]] = Field( None, description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.", examples=[ @@ -2263,7 +2259,7 @@ class HttpRequester(BaseModel): ], title="Request Body Payload (Non-JSON)", ) - request_body_json: Optional[Union[str, Dict[str, Any]]] = Field( + request_body_json: Optional[Union[Dict[str, Any], str]] = Field( None, description="Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.", examples=[ @@ -2273,13 +2269,13 @@ class HttpRequester(BaseModel): ], title="Request Body JSON Payload", ) - request_headers: Optional[Union[str, Dict[str, str]]] = Field( + request_headers: Optional[Union[Dict[str, str], str]] = Field( None, description="Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.", examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}], title="Request Headers", ) - request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field( + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( None, description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", examples=[ @@ -2292,6 +2288,13 @@ class HttpRequester(BaseModel): ], title="Query Parameters", ) + error_handler: Optional[ + Union[DefaultErrorHandler, CompositeErrorHandler, CustomErrorHandler] + ] = Field( + None, + description="Error handler component that defines how to handle errors.", + title="Error Handler", + ) use_cache: Optional[bool] = Field( False, description="Enables stream requests caching. This field is automatically set by the CDK.", @@ -2409,25 +2412,41 @@ class StateDelegatingStream(BaseModel): full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", - title="Retriever", + title="Full Refresh Stream", ) incremental_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.", - title="Retriever", + title="Incremental Stream", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class SimpleRetriever(BaseModel): type: Literal["SimpleRetriever"] - record_selector: RecordSelector = Field( + requester: Union[HttpRequester, CustomRequester] = Field( ..., - description="Component that describes how to extract records from a HTTP response.", + description="Requester component that describes how to prepare HTTP requests to send to the source API.", ) - requester: Union[CustomRequester, HttpRequester] = Field( + decoder: Optional[ + Union[ + CsvDecoder, + GzipDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + ZipfileDecoder, + CustomDecoder, + ] + ] = Field( + None, + description="Component decoding the response so records can be extracted.", + title="Decoder", + ) + record_selector: RecordSelector = Field( ..., - description="Requester component that describes how to prepare HTTP requests to send to the source API.", + description="Component that describes how to extract records from a HTTP response.", ) paginator: Optional[Union[DefaultPaginator, NoPagination]] = Field( None, @@ -2439,16 +2458,16 @@ class SimpleRetriever(BaseModel): ) partition_router: Optional[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, List[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, ] ], ] @@ -2457,22 +2476,6 @@ class SimpleRetriever(BaseModel): description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", title="Partition Router", ) - decoder: Optional[ - Union[ - CustomDecoder, - CsvDecoder, - GzipDecoder, - JsonDecoder, - JsonlDecoder, - IterableDecoder, - XmlDecoder, - ZipfileDecoder, - ] - ] = Field( - None, - description="Component decoding the response so records can be extracted.", - title="Decoder", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2485,21 +2488,21 @@ class AsyncRetriever(BaseModel): status_mapping: AsyncJobStatusMap = Field( ..., description="Async Job Status to Airbyte CDK Async Job Status mapping." ) - status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ - Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] + Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] ] = Field(None, description="Responsible for fetching the records from provided urls.") - creation_requester: Union[CustomRequester, HttpRequester] = Field( + creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", ) - polling_requester: Union[CustomRequester, HttpRequester] = Field( + polling_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.", ) @@ -2507,11 +2510,11 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.", ) - download_requester: Union[CustomRequester, HttpRequester] = Field( + download_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.", ) @@ -2519,26 +2522,26 @@ class AsyncRetriever(BaseModel): None, description="Paginator component that describes how to navigate through the API's pages during download.", ) - abort_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + abort_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective.", ) - delete_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + delete_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted.", ) partition_router: Optional[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, List[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, ] ], ] @@ -2549,7 +2552,6 @@ class AsyncRetriever(BaseModel): ) decoder: Optional[ Union[ - CustomDecoder, CsvDecoder, GzipDecoder, JsonDecoder, @@ -2557,6 +2559,7 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, ZipfileDecoder, + CustomDecoder, ] ] = Field( None, @@ -2565,7 +2568,6 @@ class AsyncRetriever(BaseModel): ) download_decoder: Optional[ Union[ - CustomDecoder, CsvDecoder, GzipDecoder, JsonDecoder, @@ -2573,6 +2575,7 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, ZipfileDecoder, + CustomDecoder, ] ] = Field( None, @@ -2650,6 +2653,7 @@ class DynamicDeclarativeStream(BaseModel): FileUploader.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() +HttpRequester.update_forward_refs() DynamicSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() PropertiesFromEndpoint.update_forward_refs() From 68b4118377ad4e7c38664fc7df953740639d5a54 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 23 Apr 2025 09:10:15 +0300 Subject: [PATCH 07/10] fixed a portion of tests, by adding transformation_before_filtering to be False if None --- .../declarative/parsers/model_to_component_factory.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 466b661f1..9e95a82ca 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2818,6 +2818,10 @@ def create_record_selector( else None ) + if model.transform_before_filtering is None: + # default to False if not set + model.transform_before_filtering = False + assert model.transform_before_filtering is not None # for mypy transform_before_filtering = model.transform_before_filtering @@ -2832,6 +2836,10 @@ def create_record_selector( ) transform_before_filtering = True + if model.schema_normalization is None: + # default to no schema normalization if not set + model.schema_normalization = SchemaNormalizationModel.None_ + schema_normalization = ( TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) if isinstance(model.schema_normalization, SchemaNormalizationModel) From 3e2dc96561eff8979a8a5271fe71a59a5012b728 Mon Sep 17 00:00:00 2001 From: brianjlai Date: Wed, 23 Apr 2025 11:11:11 -0700 Subject: [PATCH 08/10] fix the model to component factory to reference the QueryProperties model type instead of generic object --- .../parsers/model_to_component_factory.py | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9e95a82ca..4d4cd9440 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2194,11 +2194,20 @@ def create_http_requester( api_budget = self._api_budget + # Removes QueryProperties components from the interpolated mappings because it has been designed + # to be used by the SimpleRetriever and will be resolved from the provider from the slice directly + # instead of through jinja interpolation + request_parameters: Optional[Union[str, Mapping[str, str]]] + if isinstance(model.request_parameters, Mapping): + request_parameters = self._remove_query_properties(model.request_parameters) + else: + request_parameters = model.request_parameters + request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, request_body_json=model.request_body_json, request_headers=model.request_headers, - request_parameters=model.request_parameters, + request_parameters=request_parameters, query_properties_key=query_properties_key, config=config, parameters=model.parameters or {}, @@ -2946,16 +2955,9 @@ def create_simple_retriever( # When translating JSON schema into Pydantic models, enforcing types for arrays containing both # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. # This adds the extra validation that we couldn't get for free in Pydantic model generation - if ( - isinstance(request_parameter, Mapping) - and request_parameter.get("type") == "QueryProperties" - ): + if isinstance(request_parameter, QueryPropertiesModel): query_properties_key = key query_properties_definitions.append(request_parameter) - elif not isinstance(request_parameter, str): - raise ValueError( - f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}" - ) if len(query_properties_definitions) > 1: raise ValueError( @@ -2963,17 +2965,8 @@ def create_simple_retriever( ) if len(query_properties_definitions) == 1: - query_properties = self.create_component( - model_type=QueryPropertiesModel, - component_definition=query_properties_definitions[0], - config=config, - ) - - # Removes QueryProperties components from the interpolated mappings because it will be resolved in - # the provider from the slice directly instead of through jinja interpolation - if isinstance(model.requester.request_parameters, Mapping): - model.requester.request_parameters = self._remove_query_properties( - model.requester.request_parameters + query_properties = self._create_component_from_model( + model=query_properties_definitions[0], config=config ) requester = self._create_component_from_model( @@ -3096,13 +3089,12 @@ def create_simple_retriever( @staticmethod def _remove_query_properties( - request_parameters: Mapping[str, Union[Any, str]], - ) -> Mapping[str, Union[Any, str]]: + request_parameters: Mapping[str, Union[str, QueryPropertiesModel]], + ) -> Mapping[str, str]: return { parameter_field: request_parameter for parameter_field, request_parameter in request_parameters.items() - if not isinstance(request_parameter, Mapping) - or not request_parameter.get("type") == "QueryProperties" + if not isinstance(request_parameter, QueryPropertiesModel) } def create_state_delegating_stream( From 3309380a293a6b12ce3d50e72d77f8a2a26ae3a7 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 24 Apr 2025 17:49:52 +0300 Subject: [PATCH 09/10] fixed file_uploader model issue --- .../sources/declarative/declarative_component_schema.yaml | 3 +++ .../sources/declarative/models/declarative_component_schema.py | 1 + 2 files changed, 4 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index bd0af086f..6ae8bba62 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1489,6 +1489,9 @@ definitions: examples: - "{{ record.id }}/{{ record.file_name }}/" - "{{ record.id }}_{{ record.file_name }}/" + $parameters: + type: object + additionalProperties: true $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d8db49722..2762a1f70 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2092,6 +2092,7 @@ class FileUploader(BaseModel): "{{ record.id }}_{{ record.file_name }}/", ], ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): From a147ba1c820688ac8d768ea99a7511ff05767080 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 25 Apr 2025 11:44:24 +0300 Subject: [PATCH 10/10] removed redundant deprecated: true to LegacySessionTokenAuthenticator --- .../sources/declarative/declarative_component_schema.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6ae8bba62..2de404598 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3360,7 +3360,6 @@ definitions: title: Session Token Authenticator description: Deprecated - use SessionTokenAuthenticator instead. Authenticator for requests authenticated using session tokens. A session token is a random value generated by a server to identify a specific user for the duration of one interaction session. type: object - deprecated: true required: - type - header