diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index c28c8f4da..dd70929b9 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1449,6 +1449,15 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + filename_extractor: + description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided. + type: string + interpolation_context: + - config + - record + examples: + - "{{ record.id }}/{{ record.file_name }}/" + - "{{ record.id }}_{{ record.file_name }}/" $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index ae9ffd637..8ccf92caa 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2003,6 +2003,15 @@ class FileUploader(BaseModel): None, description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content", ) + filename_extractor: Optional[str] = Field( + None, + description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.", + examples=[ + "{{ record.id }}/{{ record.file_name }}/", + "{{ record.id }}_{{ record.file_name }}/", + ], + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c521be7e8..434ad7f0e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3348,7 +3348,13 @@ def create_file_uploader( name=name, **kwargs, ) - return FileUploader(requester, download_target_extractor) + return FileUploader( + requester=requester, + download_target_extractor=download_target_extractor, + config=config, + parameters=model.parameters or {}, + filename_extractor=model.filename_extractor if model.filename_extractor else None, + ) def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index ef7d9174b..2a4c5aace 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -1,8 +1,17 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + import json import logging +import uuid +from dataclasses import InitVar, dataclass, field from pathlib import Path -from typing import Optional +from typing import Optional, Mapping, Union, Any +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( @@ -10,43 +19,56 @@ ) from airbyte_cdk.sources.declarative.requesters import Requester from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.files_directory import get_files_directory logger = logging.getLogger("airbyte") + +@dataclass class FileUploader: - def __init__( - self, - requester: Requester, - download_target_extractor: RecordExtractor, - content_extractor: Optional[RecordExtractor] = None, - ) -> None: - self._requester = requester - self._download_target_extractor = download_target_extractor - self._content_extractor = content_extractor + requester: Requester + download_target_extractor: RecordExtractor + config: Config + parameters: InitVar[Mapping[str, Any]] + + filename_extractor: Optional[Union[InterpolatedString, str]] = None + content_extractor: Optional[RecordExtractor] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if self.filename_extractor: + self.filename_extractor = InterpolatedString.create( + self.filename_extractor, + parameters=parameters, + ) def upload(self, record: Record) -> None: mocked_response = SafeResponse() mocked_response.content = json.dumps(record.data).encode("utf-8") - download_target = list(self._download_target_extractor.extract_records(mocked_response))[0] + download_target = list(self.download_target_extractor.extract_records(mocked_response))[0] if not isinstance(download_target, str): raise ValueError( f"download_target is expected to be a str but was {type(download_target)}: {download_target}" ) - response = self._requester.send_request( + response = self.requester.send_request( stream_slice=StreamSlice( partition={}, cursor_slice={}, extra_fields={"download_target": download_target} ), ) - if self._content_extractor: + if self.content_extractor: raise NotImplementedError("TODO") else: files_directory = Path(get_files_directory()) - # TODO:: we could either interpolate record data if some relative_path is provided or - # use partition_field value in the slice {"partition_field": some_value_id} to create a path - file_relative_path = Path(record.stream_name) / record.data["file_name"] + + file_name = ( + self.filename_extractor.eval(self.config, record=record) + if self.filename_extractor + else str(uuid.uuid4()) + ) + file_name = file_name.lstrip("/") + file_relative_path = Path(record.stream_name) / Path(file_name) full_path = files_directory / file_relative_path full_path.parent.mkdir(parents=True, exist_ok=True) @@ -56,7 +78,7 @@ def upload(self, record: Record) -> None: file_size_bytes = full_path.stat().st_size logger.info("File uploaded successfully") - logger.info(f"File url: {str(full_path)} ") + logger.info(f"File url: {str(full_path)}") logger.info(f"File size: {file_size_bytes / 1024} KB") logger.info(f"File relative path: {str(file_relative_path)}") diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 1c0547830..1f3fbe26e 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -1,3 +1,5 @@ +import re + from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase @@ -29,9 +31,12 @@ def _source( catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, + yaml_file: Optional[str] = None, ) -> YamlDeclarativeSource: + if not yaml_file: + yaml_file = "file_stream_manifest.yaml" return YamlDeclarativeSource( - path_to_yaml=str(Path(__file__).parent / "file_stream_manifest.yaml"), + path_to_yaml=str(Path(__file__).parent / yaml_file), catalog=catalog, config=config, state=state, @@ -43,11 +48,12 @@ def read( catalog: ConfiguredAirbyteCatalog, state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, + yaml_file: Optional[str] = None, ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception ) @@ -96,7 +102,32 @@ def test_get_article_attachments(self) -> None: file_reference = output.records[0].record.file_reference assert file_reference assert file_reference.file_url + assert re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) assert file_reference.file_relative_path + assert re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path + ) + assert file_reference.file_size_bytes + + def test_get_article_attachments_with_filename_extractor(self) -> None: + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + ) + + assert output.records + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.file_url + # todo: once we finally mock the response update to check file name + assert not re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) + assert file_reference.file_relative_path + assert not re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path + ) assert file_reference.file_size_bytes def test_discover_article_attachments(self) -> None: diff --git a/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml b/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml new file mode 100644 index 000000000..d827189ab --- /dev/null +++ b/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml @@ -0,0 +1,191 @@ +version: 2.0.0 + +type: DeclarativeSource + +check: + type: CheckStream + stream_names: + - "articles" + +definitions: + bearer_authenticator: + type: BearerAuthenticator + api_token: "{{ config['credentials']['access_token'] }}" + basic_authenticator: + type: BasicHttpAuthenticator + username: "{{ config['credentials']['email'] + '/token' }}" + password: "{{ config['credentials']['api_token'] }}" + + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://{{ config['subdomain'] }}.zendesk.com/api/v2/ + http_method: GET + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: ["credentials", "credentials"] + authenticators: + oauth2.0: "#/definitions/bearer_authenticator" + api_token: "#/definitions/basic_authenticator" + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: + ["{{ parameters.get('data_path') or parameters.get('name') }}"] + schema_normalization: Default + paginator: + type: DefaultPaginator + page_size_option: + type: RequestOption + field_name: "per_page" + inject_into: request_parameter + pagination_strategy: + type: CursorPagination + page_size: 100 + cursor_value: '{{ response.get("next_page", {}) }}' + stop_condition: "{{ last_page_size == 0 }}" + page_token_option: + type: RequestPath + + base_stream: + type: DeclarativeStream + schema_loader: + type: JsonFileSchemaLoader + retriever: + $ref: "#/definitions/retriever" + + cursor_incremental_sync: + type: DatetimeBasedCursor + cursor_datetime_formats: + - "%s" + - "%Y-%m-%dT%H:%M:%SZ" + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%s" + cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}" + start_datetime: + datetime: "{{ timestamp(config.get('start_date')) | int if config.get('start_date') else day_delta(-730, '%s') }}" + start_time_option: + inject_into: request_parameter + field_name: "{{ parameters['cursor_filter'] }}" + type: RequestOption + + base_incremental_stream: + $ref: "#/definitions/base_stream" + incremental_sync: + $ref: "#/definitions/cursor_incremental_sync" + + # Incremental cursor-based streams + articles_stream: + $ref: "#/definitions/base_incremental_stream" + name: "articles" + primary_key: "id" + schema_loader: + type: InlineSchemaLoader + schema: + type: object + $schema: http://json-schema.org/schema# + properties: + id: + type: integer + additionalProperties: true + incremental_sync: + $ref: "#/definitions/cursor_incremental_sync" + start_time_option: + $ref: "#/definitions/cursor_incremental_sync/start_time_option" + field_name: "start_time" + retriever: + $ref: "#/definitions/retriever" + ignore_stream_slicer_parameters_on_paginated_requests: true + requester: + $ref: "#/definitions/retriever/requester" + path: "help_center/incremental/articles" + paginator: + type: DefaultPaginator + pagination_strategy: + type: CursorPagination + cursor_value: '{{ response.get("next_page", {}) }}' + stop_condition: "{{ config.get('ignore_pagination', False) or last_page_size == 0 }}" + page_token_option: + type: RequestPath + record_selector: + extractor: + type: DpathExtractor + field_path: ["articles"] + + article_attachments_stream: + $ref: "#/definitions/base_incremental_stream" + name: "article_attachments" + primary_key: "id" + schema_loader: + type: InlineSchemaLoader + schema: + type: object + $schema: http://json-schema.org/schema# + properties: + id: + type: integer + additionalProperties: true + retriever: + $ref: "#/definitions/retriever" + ignore_stream_slicer_parameters_on_paginated_requests: true + requester: + $ref: "#/definitions/retriever/requester" + path: "help_center/articles/{{ stream_partition.article_id }}/attachments" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - type: ParentStreamConfig + parent_key: "id" + partition_field: "article_id" + stream: + $ref: "#/definitions/articles_stream" + incremental_dependency: true + record_selector: + extractor: + type: DpathExtractor + field_path: ["article_attachments"] + file_uploader: + type: FileUploader + requester: + type: HttpRequester + url_base: "{{download_target}}" + http_method: GET + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: [ "credentials", "credentials" ] + authenticators: + oauth2.0: "#/definitions/bearer_authenticator" + api_token: "#/definitions/basic_authenticator" + download_target_extractor: + type: DpathExtractor + field_path: [ "content_url" ] + filename_extractor: "{{ record.id }}/{{ record.file_name }}/" + + +streams: + - $ref: "#/definitions/articles_stream" + - $ref: "#/definitions/article_attachments_stream" + +spec: + type: Spec + connection_specification: + type: object + $schema: http://json-schema.org/draft-07/schema# + required: + - subdomain + - start_date + properties: + subdomain: + type: string + name: subdomain + order: 0 + title: Subdomain + start_date: + type: string + order: 1 + title: Start date + format: date-time + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$ + additionalProperties: true