From 2e6d92e1f0381d3da2adb783cf855ee8c323f5c0 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 27 Mar 2025 16:17:53 -0600 Subject: [PATCH 1/3] file-mode-api: move file uploader to record selector level. --- .../concurrent_declarative_source.py | 26 +++-------- .../declarative/extractors/record_selector.py | 7 ++- .../models/declarative_component_schema.py | 43 ++++++++++--------- .../parsers/model_to_component_factory.py | 10 +++++ .../declarative/retrievers/file_uploader.py | 1 - .../declarative_partition_generator.py | 8 ---- 6 files changed, 44 insertions(+), 51 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 657ab2df4..a499324a9 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -207,19 +207,9 @@ def _group_streams( # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - file_uploader = None - if isinstance(declarative_stream, DeclarativeStream): - file_uploader = ( - self._constructor.create_component( - model_type=FileUploader, - component_definition=name_to_stream_mapping[declarative_stream.name][ - "file_uploader" - ], - config=config, - ) - if "file_uploader" in name_to_stream_mapping[declarative_stream.name] - else None - ) + supports_file_transfer = ( + "file_uploader" in name_to_stream_mapping[declarative_stream.name] + ) if ( isinstance(declarative_stream, DeclarativeStream) @@ -288,7 +278,6 @@ def _group_streams( declarative_stream.get_json_schema(), retriever, self.message_repository, - file_uploader, ), stream_slicer=declarative_stream.retriever.stream_slicer, ) @@ -319,7 +308,6 @@ def _group_streams( declarative_stream.get_json_schema(), retriever, self.message_repository, - file_uploader, ), stream_slicer=cursor, ) @@ -339,7 +327,7 @@ def _group_streams( else None, logger=self.logger, cursor=cursor, - supports_file_transfer=bool(file_uploader), + supports_file_transfer=supports_file_transfer, ) ) elif ( @@ -351,7 +339,6 @@ def _group_streams( declarative_stream.get_json_schema(), declarative_stream.retriever, self.message_repository, - file_uploader, ), declarative_stream.retriever.stream_slicer, ) @@ -372,7 +359,7 @@ def _group_streams( cursor_field=None, logger=self.logger, cursor=final_state_cursor, - supports_file_transfer=bool(file_uploader), + supports_file_transfer=supports_file_transfer, ) ) elif ( @@ -412,7 +399,6 @@ def _group_streams( declarative_stream.get_json_schema(), retriever, self.message_repository, - file_uploader, ), perpartition_cursor, ) @@ -427,7 +413,7 @@ def _group_streams( cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, cursor=perpartition_cursor, - supports_file_transfer=bool(file_uploader), + supports_file_transfer=supports_file_transfer, ) ) else: diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index c37b8035b..73e43ff46 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -18,6 +18,7 @@ from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.utils.transform import TypeTransformer +from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader @dataclass @@ -42,6 +43,7 @@ class RecordSelector(HttpSelector): record_filter: Optional[RecordFilter] = None transformations: List[RecordTransformation] = field(default_factory=lambda: []) transform_before_filtering: bool = False + file_uploader: Optional[FileUploader] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -117,7 +119,10 @@ def filter_and_transform( transformed_filtered_data, schema=records_schema ) for data in normalized_data: - yield Record(data=data, stream_name=self.name, associated_slice=stream_slice) + record = Record(data=data, stream_name=self.name, associated_slice=stream_slice) + if self.file_uploader: + self.file_uploader.upload(record) + yield record def _normalize_by_schema( self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c2bd928b7..ae9ffd637 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1989,6 +1989,22 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class FileUploader(BaseModel): + type: Literal["FileUploader"] + requester: Union[CustomRequester, HttpRequester] = Field( + ..., + description="Requester component that describes how to prepare HTTP requests to send to the source API.", + ) + download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + ..., + description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response", + ) + file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field( + None, + description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content", + ) + + class DeclarativeStream(BaseModel): class Config: extra = Extra.allow @@ -2047,6 +2063,11 @@ class Config: description="Array of state migrations to be applied on the input state", title="State Migrations", ) + file_uploader: Optional[FileUploader] = Field( + None, + description="(experimental) Describes how to fetch a file", + title="File Uploader", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2278,22 +2299,6 @@ class StateDelegatingStream(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class FileUploader(BaseModel): - type: Literal["FileUploader"] - requester: Union[CustomRequester, HttpRequester] = Field( - ..., - description="Requester component that describes how to prepare HTTP requests to send to the source API.", - ) - download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( - ..., - description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response", - ) - file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field( - None, - description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content", - ) - - class SimpleRetriever(BaseModel): type: Literal["SimpleRetriever"] record_selector: RecordSelector = Field( @@ -2324,11 +2329,6 @@ class SimpleRetriever(BaseModel): description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.", title="Partition Router", ) - file_uploader: Optional[FileUploader] = Field( - None, - description="(experimental) Describes how to fetch a file", - title="File Uploader", - ) decoder: Optional[ Union[ CustomDecoder, @@ -2485,6 +2485,7 @@ class DynamicDeclarativeStream(BaseModel): DeclarativeSource1.update_forward_refs() DeclarativeSource2.update_forward_refs() SelectiveAuthenticator.update_forward_refs() +FileUploader.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() DynamicSchemaLoader.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e01505fdc..c521be7e8 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1755,6 +1755,11 @@ def create_declarative_stream( transformations.append( self._create_component_from_model(model=transformation_model, config=config) ) + file_uploader = None + if model.file_uploader: + file_uploader = self._create_component_from_model( + model=model.file_uploader, config=config + ) retriever = self._create_component_from_model( model=model.retriever, @@ -1766,6 +1771,7 @@ def create_declarative_stream( stop_condition_on_cursor=stop_condition_on_cursor, client_side_incremental_sync=client_side_incremental_sync, transformations=transformations, + file_uploader=file_uploader, incremental_sync=model.incremental_sync, ) cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None @@ -2607,6 +2613,7 @@ def create_record_selector( transformations: List[RecordTransformation] | None = None, decoder: Decoder | None = None, client_side_incremental_sync: Dict[str, Any] | None = None, + file_uploader: Optional[FileUploader] = None, **kwargs: Any, ) -> RecordSelector: extractor = self._create_component_from_model( @@ -2644,6 +2651,7 @@ def create_record_selector( config=config, record_filter=record_filter, transformations=transformations or [], + file_uploader=file_uploader, schema_normalization=schema_normalization, parameters=model.parameters or {}, transform_before_filtering=transform_before_filtering, @@ -2701,6 +2709,7 @@ def create_simple_retriever( stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], + file_uploader: Optional[FileUploader] = None, incremental_sync: Optional[ Union[ IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel @@ -2723,6 +2732,7 @@ def create_simple_retriever( decoder=decoder, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, + file_uploader=file_uploader, ) url_base = ( model.requester.url_base diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 084152e05..97e4aaead 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -24,7 +24,6 @@ def __init__( self._content_extractor = content_extractor def upload(self, record: Record) -> None: - # TODO validate record shape - is the transformation applied at this point? mocked_response = SafeResponse() mocked_response.content = json.dumps(record.data).encode("utf-8") download_target = list(self._download_target_extractor.extract_records(mocked_response))[0] diff --git a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py index 9e784233b..94ee03a56 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py @@ -3,7 +3,6 @@ from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.sources.declarative.retrievers import Retriever -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator @@ -19,7 +18,6 @@ def __init__( json_schema: Mapping[str, Any], retriever: Retriever, message_repository: MessageRepository, - file_uploader: Optional[FileUploader] = None, ) -> None: """ The DeclarativePartitionFactory takes a retriever_factory and not a retriever directly. The reason is that our components are not @@ -30,7 +28,6 @@ def __init__( self._json_schema = json_schema self._retriever = retriever self._message_repository = message_repository - self._file_uploader = file_uploader def create(self, stream_slice: StreamSlice) -> Partition: return DeclarativePartition( @@ -38,7 +35,6 @@ def create(self, stream_slice: StreamSlice) -> Partition: self._json_schema, self._retriever, self._message_repository, - self._file_uploader, stream_slice, ) @@ -50,14 +46,12 @@ def __init__( json_schema: Mapping[str, Any], retriever: Retriever, message_repository: MessageRepository, - file_uploader: Optional[FileUploader], stream_slice: StreamSlice, ): self._stream_name = stream_name self._json_schema = json_schema self._retriever = retriever self._message_repository = message_repository - self._file_uploader = file_uploader self._stream_slice = stream_slice self._hash = SliceHasher.hash(self._stream_name, self._stream_slice) @@ -73,8 +67,6 @@ def read(self) -> Iterable[Record]: associated_slice=self._stream_slice, ) ) - if self._file_uploader: - self._file_uploader.upload(record) yield record else: self._message_repository.emit_message(stream_data) From 7b058c94689a3e7193da20442f7d47c3f34e2e38 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Sun, 30 Mar 2025 19:10:55 -0600 Subject: [PATCH 2/3] file-mode-api: add filename_extractor component to define relative path to store the file. --- .../declarative_component_schema.yaml | 8 +++ .../models/declarative_component_schema.py | 8 +++ .../parsers/model_to_component_factory.py | 8 ++- .../declarative/retrievers/file_uploader.py | 57 +++++++++++++------ .../file/file_stream_manifest.yaml | 1 + 5 files changed, 64 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index c28c8f4da..ac9264102 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1449,6 +1449,14 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + filename_extractor: + description: Defines relative path and name to store the file + type: string + interpolation_context: + - config + - record + examples: + - "{{ record.relative_path }}/{{ record.file_name }}/" $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index ae9ffd637..351acde86 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2003,6 +2003,14 @@ class FileUploader(BaseModel): None, description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content", ) + filename_extractor: str = Field( + ..., + description="File Name extractor.", + examples=[ + "{{ record.relative_path }}/{{ record.file_name }}/", + ], + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class DeclarativeStream(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c521be7e8..d23f82b73 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3348,7 +3348,13 @@ def create_file_uploader( name=name, **kwargs, ) - return FileUploader(requester, download_target_extractor) + return FileUploader( + requester=requester, + download_target_extractor=download_target_extractor, + config=config, + parameters=model.parameters or {}, + filename_extractor=model.filename_extractor, + ) def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 97e4aaead..c1c0d5f0c 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -1,7 +1,16 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + import json +import logging +from dataclasses import InitVar, dataclass, field from pathlib import Path -from typing import Optional +from typing import Optional, Mapping, Union, Any +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( @@ -9,42 +18,51 @@ ) from airbyte_cdk.sources.declarative.requesters import Requester from airbyte_cdk.sources.declarative.types import Record, StreamSlice +from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.files_directory import get_files_directory +logger = logging.getLogger("airbyte") + +@dataclass class FileUploader: - def __init__( - self, - requester: Requester, - download_target_extractor: RecordExtractor, - content_extractor: Optional[RecordExtractor] = None, - ) -> None: - self._requester = requester - self._download_target_extractor = download_target_extractor - self._content_extractor = content_extractor + requester: Requester + download_target_extractor: RecordExtractor + config: Config + parameters: InitVar[Mapping[str, Any]] + + filename_extractor: Union[InterpolatedString, str] + content_extractor: Optional[RecordExtractor] = None + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._filename_extractor = InterpolatedString.create( + self.filename_extractor, + parameters=parameters, + ) def upload(self, record: Record) -> None: mocked_response = SafeResponse() mocked_response.content = json.dumps(record.data).encode("utf-8") - download_target = list(self._download_target_extractor.extract_records(mocked_response))[0] + download_target = list(self.download_target_extractor.extract_records(mocked_response))[0] if not isinstance(download_target, str): raise ValueError( f"download_target is expected to be a str but was {type(download_target)}: {download_target}" ) - response = self._requester.send_request( + response = self.requester.send_request( stream_slice=StreamSlice( partition={}, cursor_slice={}, extra_fields={"download_target": download_target} ), ) - if self._content_extractor: + if self.content_extractor: raise NotImplementedError("TODO") else: files_directory = Path(get_files_directory()) - # TODO:: we could either interpolate record data if some relative_path is provided or - # use partition_field value in the slice {"partition_field": some_value_id} to create a path - file_relative_path = Path(record.stream_name) / record.data["file_name"] + + relative_path = self._filename_extractor.eval(self.config, record=record) + relative_path = relative_path.lstrip("/") + file_relative_path = Path(relative_path) full_path = files_directory / file_relative_path full_path.parent.mkdir(parents=True, exist_ok=True) @@ -53,8 +71,13 @@ def upload(self, record: Record) -> None: f.write(response.content) file_size_bytes = full_path.stat().st_size + logger.info("File uploaded successfully") + logger.info(f"File url: {str(full_path)} ") + logger.info(f"File size: {file_size_bytes / 1024} KB") + logger.info(f"File relative path: {str(file_relative_path)}") + record.file_reference = AirbyteRecordMessageFileReference( - file_url=download_target, + file_url=str(full_path), file_relative_path=str(file_relative_path), file_size_bytes=file_size_bytes, ) diff --git a/unit_tests/sources/declarative/file/file_stream_manifest.yaml b/unit_tests/sources/declarative/file/file_stream_manifest.yaml index 6d8ffd638..def655769 100644 --- a/unit_tests/sources/declarative/file/file_stream_manifest.yaml +++ b/unit_tests/sources/declarative/file/file_stream_manifest.yaml @@ -161,6 +161,7 @@ definitions: download_target_extractor: type: DpathExtractor field_path: [ "content_url" ] + filename_extractor: "{{ record.relative_path }}/{{ record.file_name }}/" streams: From 06733660cbe39cf39047deade41e279c0f418508 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Mon, 31 Mar 2025 10:51:20 -0600 Subject: [PATCH 3/3] file-mode-api: make filename_extractor optional and fallback to guid --- .../declarative_component_schema.yaml | 5 +- .../models/declarative_component_schema.py | 9 +- .../parsers/model_to_component_factory.py | 2 +- .../declarative/retrievers/file_uploader.py | 24 ++- .../file/file_stream_manifest.yaml | 1 - .../declarative/file/test_file_stream.py | 35 +++- ...t_file_stream_with_filename_extractor.yaml | 191 ++++++++++++++++++ 7 files changed, 248 insertions(+), 19 deletions(-) create mode 100644 unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ac9264102..dd70929b9 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1450,13 +1450,14 @@ definitions: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" filename_extractor: - description: Defines relative path and name to store the file + description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided. type: string interpolation_context: - config - record examples: - - "{{ record.relative_path }}/{{ record.file_name }}/" + - "{{ record.id }}/{{ record.file_name }}/" + - "{{ record.id }}_{{ record.file_name }}/" $parameters: type: object additional_properties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 351acde86..8ccf92caa 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2003,11 +2003,12 @@ class FileUploader(BaseModel): None, description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content", ) - filename_extractor: str = Field( - ..., - description="File Name extractor.", + filename_extractor: Optional[str] = Field( + None, + description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.", examples=[ - "{{ record.relative_path }}/{{ record.file_name }}/", + "{{ record.id }}/{{ record.file_name }}/", + "{{ record.id }}_{{ record.file_name }}/", ], ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d23f82b73..434ad7f0e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3353,7 +3353,7 @@ def create_file_uploader( download_target_extractor=download_target_extractor, config=config, parameters=model.parameters or {}, - filename_extractor=model.filename_extractor, + filename_extractor=model.filename_extractor if model.filename_extractor else None, ) def create_moving_window_call_rate_policy( diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index c1c0d5f0c..2a4c5aace 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -4,6 +4,7 @@ import json import logging +import uuid from dataclasses import InitVar, dataclass, field from pathlib import Path from typing import Optional, Mapping, Union, Any @@ -31,14 +32,15 @@ class FileUploader: config: Config parameters: InitVar[Mapping[str, Any]] - filename_extractor: Union[InterpolatedString, str] + filename_extractor: Optional[Union[InterpolatedString, str]] = None content_extractor: Optional[RecordExtractor] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._filename_extractor = InterpolatedString.create( - self.filename_extractor, - parameters=parameters, - ) + if self.filename_extractor: + self.filename_extractor = InterpolatedString.create( + self.filename_extractor, + parameters=parameters, + ) def upload(self, record: Record) -> None: mocked_response = SafeResponse() @@ -60,9 +62,13 @@ def upload(self, record: Record) -> None: else: files_directory = Path(get_files_directory()) - relative_path = self._filename_extractor.eval(self.config, record=record) - relative_path = relative_path.lstrip("/") - file_relative_path = Path(relative_path) + file_name = ( + self.filename_extractor.eval(self.config, record=record) + if self.filename_extractor + else str(uuid.uuid4()) + ) + file_name = file_name.lstrip("/") + file_relative_path = Path(record.stream_name) / Path(file_name) full_path = files_directory / file_relative_path full_path.parent.mkdir(parents=True, exist_ok=True) @@ -72,7 +78,7 @@ def upload(self, record: Record) -> None: file_size_bytes = full_path.stat().st_size logger.info("File uploaded successfully") - logger.info(f"File url: {str(full_path)} ") + logger.info(f"File url: {str(full_path)}") logger.info(f"File size: {file_size_bytes / 1024} KB") logger.info(f"File relative path: {str(file_relative_path)}") diff --git a/unit_tests/sources/declarative/file/file_stream_manifest.yaml b/unit_tests/sources/declarative/file/file_stream_manifest.yaml index def655769..6d8ffd638 100644 --- a/unit_tests/sources/declarative/file/file_stream_manifest.yaml +++ b/unit_tests/sources/declarative/file/file_stream_manifest.yaml @@ -161,7 +161,6 @@ definitions: download_target_extractor: type: DpathExtractor field_path: [ "content_url" ] - filename_extractor: "{{ record.relative_path }}/{{ record.file_name }}/" streams: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 1c0547830..1f3fbe26e 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -1,3 +1,5 @@ +import re + from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase @@ -29,9 +31,12 @@ def _source( catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, + yaml_file: Optional[str] = None, ) -> YamlDeclarativeSource: + if not yaml_file: + yaml_file = "file_stream_manifest.yaml" return YamlDeclarativeSource( - path_to_yaml=str(Path(__file__).parent / "file_stream_manifest.yaml"), + path_to_yaml=str(Path(__file__).parent / yaml_file), catalog=catalog, config=config, state=state, @@ -43,11 +48,12 @@ def read( catalog: ConfiguredAirbyteCatalog, state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, + yaml_file: Optional[str] = None, ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception ) @@ -96,7 +102,32 @@ def test_get_article_attachments(self) -> None: file_reference = output.records[0].record.file_reference assert file_reference assert file_reference.file_url + assert re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) assert file_reference.file_relative_path + assert re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path + ) + assert file_reference.file_size_bytes + + def test_get_article_attachments_with_filename_extractor(self) -> None: + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + ) + + assert output.records + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.file_url + # todo: once we finally mock the response update to check file name + assert not re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) + assert file_reference.file_relative_path + assert not re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path + ) assert file_reference.file_size_bytes def test_discover_article_attachments(self) -> None: diff --git a/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml b/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml new file mode 100644 index 000000000..d827189ab --- /dev/null +++ b/unit_tests/sources/declarative/file/test_file_stream_with_filename_extractor.yaml @@ -0,0 +1,191 @@ +version: 2.0.0 + +type: DeclarativeSource + +check: + type: CheckStream + stream_names: + - "articles" + +definitions: + bearer_authenticator: + type: BearerAuthenticator + api_token: "{{ config['credentials']['access_token'] }}" + basic_authenticator: + type: BasicHttpAuthenticator + username: "{{ config['credentials']['email'] + '/token' }}" + password: "{{ config['credentials']['api_token'] }}" + + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://{{ config['subdomain'] }}.zendesk.com/api/v2/ + http_method: GET + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: ["credentials", "credentials"] + authenticators: + oauth2.0: "#/definitions/bearer_authenticator" + api_token: "#/definitions/basic_authenticator" + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: + ["{{ parameters.get('data_path') or parameters.get('name') }}"] + schema_normalization: Default + paginator: + type: DefaultPaginator + page_size_option: + type: RequestOption + field_name: "per_page" + inject_into: request_parameter + pagination_strategy: + type: CursorPagination + page_size: 100 + cursor_value: '{{ response.get("next_page", {}) }}' + stop_condition: "{{ last_page_size == 0 }}" + page_token_option: + type: RequestPath + + base_stream: + type: DeclarativeStream + schema_loader: + type: JsonFileSchemaLoader + retriever: + $ref: "#/definitions/retriever" + + cursor_incremental_sync: + type: DatetimeBasedCursor + cursor_datetime_formats: + - "%s" + - "%Y-%m-%dT%H:%M:%SZ" + - "%Y-%m-%dT%H:%M:%S%z" + datetime_format: "%s" + cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}" + start_datetime: + datetime: "{{ timestamp(config.get('start_date')) | int if config.get('start_date') else day_delta(-730, '%s') }}" + start_time_option: + inject_into: request_parameter + field_name: "{{ parameters['cursor_filter'] }}" + type: RequestOption + + base_incremental_stream: + $ref: "#/definitions/base_stream" + incremental_sync: + $ref: "#/definitions/cursor_incremental_sync" + + # Incremental cursor-based streams + articles_stream: + $ref: "#/definitions/base_incremental_stream" + name: "articles" + primary_key: "id" + schema_loader: + type: InlineSchemaLoader + schema: + type: object + $schema: http://json-schema.org/schema# + properties: + id: + type: integer + additionalProperties: true + incremental_sync: + $ref: "#/definitions/cursor_incremental_sync" + start_time_option: + $ref: "#/definitions/cursor_incremental_sync/start_time_option" + field_name: "start_time" + retriever: + $ref: "#/definitions/retriever" + ignore_stream_slicer_parameters_on_paginated_requests: true + requester: + $ref: "#/definitions/retriever/requester" + path: "help_center/incremental/articles" + paginator: + type: DefaultPaginator + pagination_strategy: + type: CursorPagination + cursor_value: '{{ response.get("next_page", {}) }}' + stop_condition: "{{ config.get('ignore_pagination', False) or last_page_size == 0 }}" + page_token_option: + type: RequestPath + record_selector: + extractor: + type: DpathExtractor + field_path: ["articles"] + + article_attachments_stream: + $ref: "#/definitions/base_incremental_stream" + name: "article_attachments" + primary_key: "id" + schema_loader: + type: InlineSchemaLoader + schema: + type: object + $schema: http://json-schema.org/schema# + properties: + id: + type: integer + additionalProperties: true + retriever: + $ref: "#/definitions/retriever" + ignore_stream_slicer_parameters_on_paginated_requests: true + requester: + $ref: "#/definitions/retriever/requester" + path: "help_center/articles/{{ stream_partition.article_id }}/attachments" + partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - type: ParentStreamConfig + parent_key: "id" + partition_field: "article_id" + stream: + $ref: "#/definitions/articles_stream" + incremental_dependency: true + record_selector: + extractor: + type: DpathExtractor + field_path: ["article_attachments"] + file_uploader: + type: FileUploader + requester: + type: HttpRequester + url_base: "{{download_target}}" + http_method: GET + authenticator: + type: SelectiveAuthenticator + authenticator_selection_path: [ "credentials", "credentials" ] + authenticators: + oauth2.0: "#/definitions/bearer_authenticator" + api_token: "#/definitions/basic_authenticator" + download_target_extractor: + type: DpathExtractor + field_path: [ "content_url" ] + filename_extractor: "{{ record.id }}/{{ record.file_name }}/" + + +streams: + - $ref: "#/definitions/articles_stream" + - $ref: "#/definitions/article_attachments_stream" + +spec: + type: Spec + connection_specification: + type: object + $schema: http://json-schema.org/draft-07/schema# + required: + - subdomain + - start_date + properties: + subdomain: + type: string + name: subdomain + order: 0 + title: Subdomain + start_date: + type: string + order: 1 + title: Start date + format: date-time + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$ + additionalProperties: true