diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 36d25d390..2de404598 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -440,7 +440,9 @@ definitions: description: Backoff time in seconds. anyOf: - type: number + title: Number of seconds - type: string + title: Interpolated Value interpolation_context: - config examples: @@ -1057,15 +1059,18 @@ definitions: type: string enum: [JwtAuthenticator] secret_key: + title: Secret Key type: string description: Secret used to sign the JSON web token. examples: - "{{ config['secret_key'] }}" base64_encode_secret_key: + title: Base64-encode Secret Key type: boolean description: When set to true, the secret key will be base64 encoded prior to being encoded as part of the JWT. Only set to "true" when required by the API. default: False algorithm: + title: Algorithm type: string description: Algorithm used to sign the JSON web token. enum: @@ -1389,27 +1394,27 @@ definitions: type: type: string enum: [DeclarativeStream] + name: + title: Name + description: The stream name. + type: string + default: "" + example: + - "Users" retriever: title: Retriever description: Component used to coordinate how records are extracted across stream slices and request pages. anyOf: + - "$ref": "#/definitions/SimpleRetriever" - "$ref": "#/definitions/AsyncRetriever" - "$ref": "#/definitions/CustomRetriever" - - "$ref": "#/definitions/SimpleRetriever" incremental_sync: title: Incremental Sync description: Component used to fetch data incrementally based on a time field in the data. anyOf: - - "$ref": "#/definitions/CustomIncrementalSync" - "$ref": "#/definitions/DatetimeBasedCursor" - "$ref": "#/definitions/IncrementingCountCursor" - name: - title: Name - description: The stream name. - type: string - default: "" - example: - - "Users" + - "$ref": "#/definitions/CustomIncrementalSync" primary_key: title: Primary Key description: The primary key of the stream. @@ -1419,8 +1424,8 @@ definitions: title: Schema Loader description: Component used to retrieve the schema for the current stream. anyOf: - - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/InlineSchemaLoader" + - "$ref": "#/definitions/DynamicSchemaLoader" - "$ref": "#/definitions/JsonFileSchemaLoader" - "$ref": "#/definitions/CustomSchemaLoader" # TODO we have move the transformation to the RecordSelector level in the code but kept this here for @@ -1484,6 +1489,9 @@ definitions: examples: - "{{ record.id }}/{{ record.file_name }}/" - "{{ record.id }}_{{ record.file_name }}/" + $parameters: + type: object + additionalProperties: true $parameters: type: object additional_properties: true @@ -1709,13 +1717,15 @@ definitions: title: Pagination Strategy description: Strategy defining how records are paginated. anyOf: + - "$ref": "#/definitions/PageIncrement" + - "$ref": "#/definitions/OffsetIncrement" - "$ref": "#/definitions/CursorPagination" - "$ref": "#/definitions/CustomPaginationStrategy" - - "$ref": "#/definitions/OffsetIncrement" - - "$ref": "#/definitions/PageIncrement" page_size_option: + title: Inject Page Size Into Outgoing HTTP Request "$ref": "#/definitions/RequestOption" page_token_option: + title: Inject Page Token Into Outgoing HTTP Request anyOf: - "$ref": "#/definitions/RequestOption" - "$ref": "#/definitions/RequestPath" @@ -1813,6 +1823,8 @@ definitions: type: object additionalProperties: true SessionTokenAuthenticator: + title: Session Token Authenticator + description: Authenticator for requests using the session token as an API key that's injected into the request. type: object required: - type @@ -1939,27 +1951,6 @@ definitions: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" - "/trades/{{ config['symbol_id'] }}/history" - authenticator: - title: Authenticator - description: Authentication method to use for requests sent to the API. - anyOf: - - "$ref": "#/definitions/ApiKeyAuthenticator" - - "$ref": "#/definitions/BasicHttpAuthenticator" - - "$ref": "#/definitions/BearerAuthenticator" - - "$ref": "#/definitions/CustomAuthenticator" - - "$ref": "#/definitions/OAuthAuthenticator" - - "$ref": "#/definitions/JwtAuthenticator" - - "$ref": "#/definitions/NoAuth" - - "$ref": "#/definitions/SessionTokenAuthenticator" - - "$ref": "#/definitions/LegacySessionTokenAuthenticator" - - "$ref": "#/definitions/SelectiveAuthenticator" - error_handler: - title: Error Handler - description: Error handler component that defines how to handle errors. - anyOf: - - "$ref": "#/definitions/DefaultErrorHandler" - - "$ref": "#/definitions/CustomErrorHandler" - - "$ref": "#/definitions/CompositeErrorHandler" http_method: title: HTTP Method description: The HTTP method used to fetch data from the source (can be GET or POST). @@ -1971,14 +1962,28 @@ definitions: examples: - GET - POST + authenticator: + title: Authenticator + description: Authentication method to use for requests sent to the API. + anyOf: + - "$ref": "#/definitions/NoAuth" + - "$ref": "#/definitions/ApiKeyAuthenticator" + - "$ref": "#/definitions/BasicHttpAuthenticator" + - "$ref": "#/definitions/BearerAuthenticator" + - "$ref": "#/definitions/OAuthAuthenticator" + - "$ref": "#/definitions/JwtAuthenticator" + - "$ref": "#/definitions/SessionTokenAuthenticator" + - "$ref": "#/definitions/SelectiveAuthenticator" + - "$ref": "#/definitions/CustomAuthenticator" + - "$ref": "#/definitions/LegacySessionTokenAuthenticator" request_body_data: title: Request Body Payload (Non-JSON) description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form. anyOf: - - type: string - type: object additionalProperties: type: string + - type: string interpolation_context: - next_page_token - stream_interval @@ -1993,9 +1998,9 @@ definitions: title: Request Body JSON Payload description: Specifies how to populate the body of the request with a JSON payload. Can contain nested objects. anyOf: - - type: string - type: object additionalProperties: true + - type: string interpolation_context: - next_page_token - stream_interval @@ -2012,10 +2017,12 @@ definitions: title: Request Headers description: Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. anyOf: - - type: string - type: object + title: Key/Value Pairs additionalProperties: type: string + - type: string + title: Interpolated Value interpolation_context: - next_page_token - stream_interval @@ -2028,12 +2035,14 @@ definitions: title: Query Parameters description: Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. anyOf: - - type: string - type: object + title: Key/Value Pairs additionalProperties: anyOf: - type: string - - $ref": "#/definitions/QueryProperties" + - $ref: "#/definitions/QueryProperties" + - type: string + title: Interpolated Value interpolation_context: - next_page_token - stream_interval @@ -2044,6 +2053,13 @@ definitions: - query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - searchIn: "{{ ','.join(config.get('search_in', [])) }}" - sort_by[asc]: updated_at + error_handler: + title: Error Handler + description: Error handler component that defines how to handle errors. + anyOf: + - "$ref": "#/definitions/DefaultErrorHandler" + - "$ref": "#/definitions/CompositeErrorHandler" + - "$ref": "#/definitions/CustomErrorHandler" use_cache: title: Use Cache description: Enables stream requests caching. This field is automatically set by the CDK. @@ -2258,6 +2274,7 @@ definitions: title: Schema description: Describes a streams' schema. Refer to the Data Types documentation for more details on which types are valid. type: object + additionalProperties: true JsonFileSchemaLoader: title: Json File Schema Loader description: Loads the schema from a json file. @@ -2939,7 +2956,9 @@ definitions: description: The number of records to include in each pages. anyOf: - type: integer + title: Number of Records - type: string + title: Interpolated Value interpolation_context: - config - response @@ -2971,7 +2990,9 @@ definitions: - config anyOf: - type: integer + title: Number of Records - type: string + title: Interpolated Value examples: - 100 - "100" @@ -3065,10 +3086,13 @@ definitions: description: The stream field to be used to distinguish unique records. Can either be a single field, an array of fields representing a composite key, or an array of arrays representing a composite key where the fields are nested fields. anyOf: - type: string + title: Single Key - type: array + title: Composite Key items: type: string - type: array + title: Composite Key of Nested Fields items: type: array items: @@ -3178,6 +3202,7 @@ definitions: type: string enum: [RecordFilter] condition: + title: Condition description: The predicate to filter a record. Records will be removed if evaluated to False. type: string default: "" @@ -3207,25 +3232,24 @@ definitions: enum: [RecordSelector] extractor: anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" record_filter: title: Record Filter description: Responsible for filtering records to be emitted by the Source. anyOf: - - "$ref": "#/definitions/CustomRecordFilter" - "$ref": "#/definitions/RecordFilter" + - "$ref": "#/definitions/CustomRecordFilter" schema_normalization: title: Schema Normalization description: Responsible for normalization according to the schema. anyOf: - "$ref": "#/definitions/SchemaNormalization" - "$ref": "#/definitions/CustomSchemaNormalization" - default: None transform_before_filtering: + title: Transform Before Filtering description: If true, transformation will be applied before record filtering. type: boolean - default: false $parameters: type: object additionalProperties: true @@ -3234,11 +3258,11 @@ definitions: description: Responsible for normalization according to the schema. type: string enum: - - None - Default - examples: - None + examples: - Default + - None RemoveFields: title: Remove Fields description: A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown. @@ -3394,6 +3418,7 @@ definitions: type: object additionalProperties: true StateDelegatingStream: + title: State Delegating Stream description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value. type: object required: @@ -3413,17 +3438,18 @@ definitions: example: - "Users" full_refresh_stream: - title: Retriever + title: Full Refresh Stream description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided. "$ref": "#/definitions/DeclarativeStream" incremental_stream: - title: Retriever + title: Incremental Stream description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided. "$ref": "#/definitions/DeclarativeStream" $parameters: type: object additionalProperties: true SimpleRetriever: + title: Synchronous Retriever description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router. type: object required: @@ -3434,14 +3460,26 @@ definitions: type: type: string enum: [SimpleRetriever] - record_selector: - description: Component that describes how to extract records from a HTTP response. - "$ref": "#/definitions/RecordSelector" requester: description: Requester component that describes how to prepare HTTP requests to send to the source API. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" + decoder: + title: Decoder + description: Component decoding the response so records can be extracted. + anyOf: + - "$ref": "#/definitions/CsvDecoder" + - "$ref": "#/definitions/GzipDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + - "$ref": "#/definitions/IterableDecoder" + - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" + record_selector: + description: Component that describes how to extract records from a HTTP response. + "$ref": "#/definitions/RecordSelector" paginator: description: Paginator component that describes how to navigate through the API's pages. anyOf: @@ -3456,29 +3494,17 @@ definitions: description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. default: [] anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" - type: array items: anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" - decoder: - title: Decoder - description: Component decoding the response so records can be extracted. - anyOf: - - "$ref": "#/definitions/CustomDecoder" - - "$ref": "#/definitions/CsvDecoder" - - "$ref": "#/definitions/GzipDecoder" - - "$ref": "#/definitions/JsonDecoder" - - "$ref": "#/definitions/JsonlDecoder" - - "$ref": "#/definitions/IterableDecoder" - - "$ref": "#/definitions/XmlDecoder" - - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomPartitionRouter" $parameters: type: object additionalProperties: true @@ -3544,6 +3570,7 @@ definitions: items: type: string AsyncRetriever: + title: Asynchronous Retriever description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." type: object required: @@ -3569,29 +3596,29 @@ definitions: status_extractor: description: Responsible for fetching the actual status of the async job. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" download_target_extractor: description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" download_extractor: description: Responsible for fetching the records from provided urls. anyOf: - - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/ResponseToFileExtractor" creation_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" polling_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" polling_job_timeout: description: The time in minutes after which the single Async Job should be considered as Timed Out. anyOf: @@ -3602,13 +3629,13 @@ definitions: download_target_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" download_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" download_paginator: description: Paginator component that describes how to navigate through the API's pages during download. anyOf: @@ -3617,34 +3644,33 @@ definitions: abort_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" delete_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted. anyOf: - - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" + - "$ref": "#/definitions/CustomRequester" partition_router: title: Partition Router description: PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing. default: [] anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" - type: array items: anyOf: - - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" - "$ref": "#/definitions/GroupingPartitionRouter" + - "$ref": "#/definitions/CustomPartitionRouter" decoder: title: Decoder description: Component decoding the response so records can be extracted. anyOf: - - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" @@ -3652,11 +3678,11 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" download_decoder: title: Download Decoder description: Component decoding the download response so records can be extracted. anyOf: - - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" @@ -3664,6 +3690,7 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" + - "$ref": "#/definitions/CustomDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index f2fb6ccf9..2762a1f70 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -417,15 +415,18 @@ class JwtAuthenticator(BaseModel): ..., description="Secret used to sign the JSON web token.", examples=["{{ config['secret_key'] }}"], + title="Secret Key", ) base64_encode_secret_key: Optional[bool] = Field( False, description='When set to true, the secret key will be base64 encoded prior to being encoded as part of the JWT. Only set to "true" when required by the API.', + title="Base64-encode Secret Key", ) algorithm: Algorithm = Field( ..., description="Algorithm used to sign the JSON web token.", examples=["ES256", "HS256", "RS256", "{{ config['algorithm'] }}"], + title="Algorithm", ) token_duration: Optional[int] = Field( 1200, @@ -880,20 +881,17 @@ class FlattenFields(BaseModel): class KeyTransformation(BaseModel): - prefix: Optional[Union[str, None]] = Field( + type: Literal["KeyTransformation"] + prefix: Optional[str] = Field( None, description="Prefix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "flattened_", - ], + examples=["flattened_"], title="Key Prefix", ) - suffix: Optional[Union[str, None]] = Field( + suffix: Optional[str] = Field( None, description="Suffix to add for object keys. If not provided original keys remain unchanged.", - examples=[ - "_flattened", - ], + examples=["_flattened"], title="Key Suffix", ) @@ -916,7 +914,7 @@ class DpathFlattenFields(BaseModel): description="Whether to replace the origin record or not. Default is False.", title="Replace Origin Record", ) - key_transformation: Optional[Union[KeyTransformation, None]] = Field( + key_transformation: Optional[KeyTransformation] = Field( None, description="Transformation for object keys. If not provided, original key will be used.", title="Key transformation", @@ -1260,13 +1258,14 @@ class RecordFilter(BaseModel): "{{ record['created_at'] >= stream_interval['start_time'] }}", "{{ record.status in ['active', 'expired'] }}", ], + title="Condition", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class SchemaNormalization(Enum): - None_ = "None" Default = "Default" + None_ = "None" class RemoveFields(BaseModel): @@ -1802,14 +1801,18 @@ class DefaultErrorHandler(BaseModel): class DefaultPaginator(BaseModel): type: Literal["DefaultPaginator"] pagination_strategy: Union[ - CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement + PageIncrement, OffsetIncrement, CursorPagination, CustomPaginationStrategy ] = Field( ..., description="Strategy defining how records are paginated.", title="Pagination Strategy", ) - page_size_option: Optional[RequestOption] = None - page_token_option: Optional[Union[RequestOption, RequestPath]] = None + page_size_option: Optional[RequestOption] = Field( + None, title="Inject Page Size Into Outgoing HTTP Request" + ) + page_token_option: Optional[Union[RequestOption, RequestPath]] = Field( + None, title="Inject Page Token Into Outgoing HTTP Request" + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1850,20 +1853,21 @@ class ListPartitionRouter(BaseModel): class RecordSelector(BaseModel): type: Literal["RecordSelector"] - extractor: Union[CustomRecordExtractor, DpathExtractor] - record_filter: Optional[Union[CustomRecordFilter, RecordFilter]] = Field( + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( None, description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - SchemaNormalization.None_, + None, description="Responsible for normalization according to the schema.", title="Schema Normalization", ) transform_before_filtering: Optional[bool] = Field( - False, + None, description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2096,26 +2100,26 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) incremental_sync: Optional[ - Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor] + Union[DatetimeBasedCursor, IncrementingCountCursor, CustomIncrementalSync] ] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) schema_loader: Optional[ Union[ - DynamicSchemaLoader, InlineSchemaLoader, + DynamicSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader, ] @@ -2224,38 +2228,31 @@ class HttpRequester(BaseModel): ], title="URL Path", ) + http_method: Optional[HttpMethod] = Field( + HttpMethod.GET, + description="The HTTP method used to fetch data from the source (can be GET or POST).", + examples=["GET", "POST"], + title="HTTP Method", + ) authenticator: Optional[ Union[ + NoAuth, ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator, - CustomAuthenticator, OAuthAuthenticator, JwtAuthenticator, - NoAuth, SessionTokenAuthenticator, - LegacySessionTokenAuthenticator, SelectiveAuthenticator, + CustomAuthenticator, + LegacySessionTokenAuthenticator, ] ] = Field( None, description="Authentication method to use for requests sent to the API.", title="Authenticator", ) - error_handler: Optional[ - Union[DefaultErrorHandler, CustomErrorHandler, CompositeErrorHandler] - ] = Field( - None, - description="Error handler component that defines how to handle errors.", - title="Error Handler", - ) - http_method: Optional[HttpMethod] = Field( - HttpMethod.GET, - description="The HTTP method used to fetch data from the source (can be GET or POST).", - examples=["GET", "POST"], - title="HTTP Method", - ) - request_body_data: Optional[Union[str, Dict[str, str]]] = Field( + request_body_data: Optional[Union[Dict[str, str], str]] = Field( None, description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.", examples=[ @@ -2263,7 +2260,7 @@ class HttpRequester(BaseModel): ], title="Request Body Payload (Non-JSON)", ) - request_body_json: Optional[Union[str, Dict[str, Any]]] = Field( + request_body_json: Optional[Union[Dict[str, Any], str]] = Field( None, description="Specifies how to populate the body of the request with a JSON payload. Can contain nested objects.", examples=[ @@ -2273,13 +2270,13 @@ class HttpRequester(BaseModel): ], title="Request Body JSON Payload", ) - request_headers: Optional[Union[str, Dict[str, str]]] = Field( + request_headers: Optional[Union[Dict[str, str], str]] = Field( None, description="Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.", examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}], title="Request Headers", ) - request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field( + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( None, description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", examples=[ @@ -2292,6 +2289,13 @@ class HttpRequester(BaseModel): ], title="Query Parameters", ) + error_handler: Optional[ + Union[DefaultErrorHandler, CompositeErrorHandler, CustomErrorHandler] + ] = Field( + None, + description="Error handler component that defines how to handle errors.", + title="Error Handler", + ) use_cache: Optional[bool] = Field( False, description="Enables stream requests caching. This field is automatically set by the CDK.", @@ -2409,25 +2413,41 @@ class StateDelegatingStream(BaseModel): full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", - title="Retriever", + title="Full Refresh Stream", ) incremental_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.", - title="Retriever", + title="Incremental Stream", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class SimpleRetriever(BaseModel): type: Literal["SimpleRetriever"] - record_selector: RecordSelector = Field( + requester: Union[HttpRequester, CustomRequester] = Field( ..., - description="Component that describes how to extract records from a HTTP response.", + description="Requester component that describes how to prepare HTTP requests to send to the source API.", ) - requester: Union[CustomRequester, HttpRequester] = Field( + decoder: Optional[ + Union[ + CsvDecoder, + GzipDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + ZipfileDecoder, + CustomDecoder, + ] + ] = Field( + None, + description="Component decoding the response so records can be extracted.", + title="Decoder", + ) + record_selector: RecordSelector = Field( ..., - description="Requester component that describes how to prepare HTTP requests to send to the source API.", + description="Component that describes how to extract records from a HTTP response.", ) paginator: Optional[Union[DefaultPaginator, NoPagination]] = Field( None, @@ -2439,16 +2459,16 @@ class SimpleRetriever(BaseModel): ) partition_router: Optional[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, List[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, ] ], ] @@ -2457,22 +2477,6 @@ class SimpleRetriever(BaseModel): description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", title="Partition Router", ) - decoder: Optional[ - Union[ - CustomDecoder, - CsvDecoder, - GzipDecoder, - JsonDecoder, - JsonlDecoder, - IterableDecoder, - XmlDecoder, - ZipfileDecoder, - ] - ] = Field( - None, - description="Component decoding the response so records can be extracted.", - title="Decoder", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2485,21 +2489,21 @@ class AsyncRetriever(BaseModel): status_mapping: AsyncJobStatusMap = Field( ..., description="Async Job Status to Airbyte CDK Async Job Status mapping." ) - status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ - Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] + Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] ] = Field(None, description="Responsible for fetching the records from provided urls.") - creation_requester: Union[CustomRequester, HttpRequester] = Field( + creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", ) - polling_requester: Union[CustomRequester, HttpRequester] = Field( + polling_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.", ) @@ -2507,11 +2511,11 @@ class AsyncRetriever(BaseModel): None, description="The time in minutes after which the single Async Job should be considered as Timed Out.", ) - download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + download_target_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.", ) - download_requester: Union[CustomRequester, HttpRequester] = Field( + download_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.", ) @@ -2519,26 +2523,26 @@ class AsyncRetriever(BaseModel): None, description="Paginator component that describes how to navigate through the API's pages during download.", ) - abort_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + abort_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to abort a job once it is timed out from the source's perspective.", ) - delete_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + delete_requester: Optional[Union[HttpRequester, CustomRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to delete a job once the records are extracted.", ) partition_router: Optional[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, List[ Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, GroupingPartitionRouter, + CustomPartitionRouter, ] ], ] @@ -2549,7 +2553,6 @@ class AsyncRetriever(BaseModel): ) decoder: Optional[ Union[ - CustomDecoder, CsvDecoder, GzipDecoder, JsonDecoder, @@ -2557,6 +2560,7 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, ZipfileDecoder, + CustomDecoder, ] ] = Field( None, @@ -2565,7 +2569,6 @@ class AsyncRetriever(BaseModel): ) download_decoder: Optional[ Union[ - CustomDecoder, CsvDecoder, GzipDecoder, JsonDecoder, @@ -2573,6 +2576,7 @@ class AsyncRetriever(BaseModel): IterableDecoder, XmlDecoder, ZipfileDecoder, + CustomDecoder, ] ] = Field( None, @@ -2650,6 +2654,7 @@ class DynamicDeclarativeStream(BaseModel): FileUploader.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() +HttpRequester.update_forward_refs() DynamicSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() PropertiesFromEndpoint.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 466b661f1..4d4cd9440 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2194,11 +2194,20 @@ def create_http_requester( api_budget = self._api_budget + # Removes QueryProperties components from the interpolated mappings because it has been designed + # to be used by the SimpleRetriever and will be resolved from the provider from the slice directly + # instead of through jinja interpolation + request_parameters: Optional[Union[str, Mapping[str, str]]] + if isinstance(model.request_parameters, Mapping): + request_parameters = self._remove_query_properties(model.request_parameters) + else: + request_parameters = model.request_parameters + request_options_provider = InterpolatedRequestOptionsProvider( request_body_data=model.request_body_data, request_body_json=model.request_body_json, request_headers=model.request_headers, - request_parameters=model.request_parameters, + request_parameters=request_parameters, query_properties_key=query_properties_key, config=config, parameters=model.parameters or {}, @@ -2818,6 +2827,10 @@ def create_record_selector( else None ) + if model.transform_before_filtering is None: + # default to False if not set + model.transform_before_filtering = False + assert model.transform_before_filtering is not None # for mypy transform_before_filtering = model.transform_before_filtering @@ -2832,6 +2845,10 @@ def create_record_selector( ) transform_before_filtering = True + if model.schema_normalization is None: + # default to no schema normalization if not set + model.schema_normalization = SchemaNormalizationModel.None_ + schema_normalization = ( TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) if isinstance(model.schema_normalization, SchemaNormalizationModel) @@ -2938,16 +2955,9 @@ def create_simple_retriever( # When translating JSON schema into Pydantic models, enforcing types for arrays containing both # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. # This adds the extra validation that we couldn't get for free in Pydantic model generation - if ( - isinstance(request_parameter, Mapping) - and request_parameter.get("type") == "QueryProperties" - ): + if isinstance(request_parameter, QueryPropertiesModel): query_properties_key = key query_properties_definitions.append(request_parameter) - elif not isinstance(request_parameter, str): - raise ValueError( - f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}" - ) if len(query_properties_definitions) > 1: raise ValueError( @@ -2955,17 +2965,8 @@ def create_simple_retriever( ) if len(query_properties_definitions) == 1: - query_properties = self.create_component( - model_type=QueryPropertiesModel, - component_definition=query_properties_definitions[0], - config=config, - ) - - # Removes QueryProperties components from the interpolated mappings because it will be resolved in - # the provider from the slice directly instead of through jinja interpolation - if isinstance(model.requester.request_parameters, Mapping): - model.requester.request_parameters = self._remove_query_properties( - model.requester.request_parameters + query_properties = self._create_component_from_model( + model=query_properties_definitions[0], config=config ) requester = self._create_component_from_model( @@ -3088,13 +3089,12 @@ def create_simple_retriever( @staticmethod def _remove_query_properties( - request_parameters: Mapping[str, Union[Any, str]], - ) -> Mapping[str, Union[Any, str]]: + request_parameters: Mapping[str, Union[str, QueryPropertiesModel]], + ) -> Mapping[str, str]: return { parameter_field: request_parameter for parameter_field, request_parameter in request_parameters.items() - if not isinstance(request_parameter, Mapping) - or not request_parameter.get("type") == "QueryProperties" + if not isinstance(request_parameter, QueryPropertiesModel) } def create_state_delegating_stream(