Skip to content

Commit 68480b7

Browse files
feat(file-mode-api: add filename extractor component (#453)
1 parent 188f9a5 commit 68480b7

File tree

6 files changed

+288
-20
lines changed

6 files changed

+288
-20
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,15 @@ definitions:
14491449
anyOf:
14501450
- "$ref": "#/definitions/CustomRecordExtractor"
14511451
- "$ref": "#/definitions/DpathExtractor"
1452+
filename_extractor:
1453+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
1454+
type: string
1455+
interpolation_context:
1456+
- config
1457+
- record
1458+
examples:
1459+
- "{{ record.id }}/{{ record.file_name }}/"
1460+
- "{{ record.id }}_{{ record.file_name }}/"
14521461
$parameters:
14531462
type: object
14541463
additional_properties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,6 +2003,15 @@ class FileUploader(BaseModel):
20032003
None,
20042004
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
20052005
)
2006+
filename_extractor: Optional[str] = Field(
2007+
None,
2008+
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
2009+
examples=[
2010+
"{{ record.id }}/{{ record.file_name }}/",
2011+
"{{ record.id }}_{{ record.file_name }}/",
2012+
],
2013+
)
2014+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20062015

20072016

20082017
class DeclarativeStream(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3348,7 +3348,13 @@ def create_file_uploader(
33483348
name=name,
33493349
**kwargs,
33503350
)
3351-
return FileUploader(requester, download_target_extractor)
3351+
return FileUploader(
3352+
requester=requester,
3353+
download_target_extractor=download_target_extractor,
3354+
config=config,
3355+
parameters=model.parameters or {},
3356+
filename_extractor=model.filename_extractor if model.filename_extractor else None,
3357+
)
33523358

33533359
def create_moving_window_call_rate_policy(
33543360
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,74 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
15
import json
26
import logging
7+
import uuid
8+
from dataclasses import InitVar, dataclass, field
39
from pathlib import Path
4-
from typing import Optional
10+
from typing import Optional, Mapping, Union, Any
511

12+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
13+
InterpolatedString,
14+
)
615
from airbyte_cdk.models import AirbyteRecordMessageFileReference
716
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
817
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
918
SafeResponse,
1019
)
1120
from airbyte_cdk.sources.declarative.requesters import Requester
1221
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
22+
from airbyte_cdk.sources.types import Config
1323
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1424

1525
logger = logging.getLogger("airbyte")
1626

27+
28+
@dataclass
1729
class FileUploader:
18-
def __init__(
19-
self,
20-
requester: Requester,
21-
download_target_extractor: RecordExtractor,
22-
content_extractor: Optional[RecordExtractor] = None,
23-
) -> None:
24-
self._requester = requester
25-
self._download_target_extractor = download_target_extractor
26-
self._content_extractor = content_extractor
30+
requester: Requester
31+
download_target_extractor: RecordExtractor
32+
config: Config
33+
parameters: InitVar[Mapping[str, Any]]
34+
35+
filename_extractor: Optional[Union[InterpolatedString, str]] = None
36+
content_extractor: Optional[RecordExtractor] = None
37+
38+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39+
if self.filename_extractor:
40+
self.filename_extractor = InterpolatedString.create(
41+
self.filename_extractor,
42+
parameters=parameters,
43+
)
2744

2845
def upload(self, record: Record) -> None:
2946
mocked_response = SafeResponse()
3047
mocked_response.content = json.dumps(record.data).encode("utf-8")
31-
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
48+
download_target = list(self.download_target_extractor.extract_records(mocked_response))[0]
3249
if not isinstance(download_target, str):
3350
raise ValueError(
3451
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
3552
)
3653

37-
response = self._requester.send_request(
54+
response = self.requester.send_request(
3855
stream_slice=StreamSlice(
3956
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
4057
),
4158
)
4259

43-
if self._content_extractor:
60+
if self.content_extractor:
4461
raise NotImplementedError("TODO")
4562
else:
4663
files_directory = Path(get_files_directory())
47-
# TODO:: we could either interpolate record data if some relative_path is provided or
48-
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
49-
file_relative_path = Path(record.stream_name) / record.data["file_name"]
64+
65+
file_name = (
66+
self.filename_extractor.eval(self.config, record=record)
67+
if self.filename_extractor
68+
else str(uuid.uuid4())
69+
)
70+
file_name = file_name.lstrip("/")
71+
file_relative_path = Path(record.stream_name) / Path(file_name)
5072

5173
full_path = files_directory / file_relative_path
5274
full_path.parent.mkdir(parents=True, exist_ok=True)
@@ -56,7 +78,7 @@ def upload(self, record: Record) -> None:
5678
file_size_bytes = full_path.stat().st_size
5779

5880
logger.info("File uploaded successfully")
59-
logger.info(f"File url: {str(full_path)} ")
81+
logger.info(f"File url: {str(full_path)}")
6082
logger.info(f"File size: {file_size_bytes / 1024} KB")
6183
logger.info(f"File relative path: {str(file_relative_path)}")
6284

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import re
2+
13
from pathlib import Path
24
from typing import Any, Dict, List, Optional
35
from unittest import TestCase
@@ -29,9 +31,12 @@ def _source(
2931
catalog: ConfiguredAirbyteCatalog,
3032
config: Dict[str, Any],
3133
state: Optional[List[AirbyteStateMessage]] = None,
34+
yaml_file: Optional[str] = None,
3235
) -> YamlDeclarativeSource:
36+
if not yaml_file:
37+
yaml_file = "file_stream_manifest.yaml"
3338
return YamlDeclarativeSource(
34-
path_to_yaml=str(Path(__file__).parent / "file_stream_manifest.yaml"),
39+
path_to_yaml=str(Path(__file__).parent / yaml_file),
3540
catalog=catalog,
3641
config=config,
3742
state=state,
@@ -43,11 +48,12 @@ def read(
4348
catalog: ConfiguredAirbyteCatalog,
4449
state_builder: Optional[StateBuilder] = None,
4550
expecting_exception: bool = False,
51+
yaml_file: Optional[str] = None,
4652
) -> EntrypointOutput:
4753
config = config_builder.build()
4854
state = state_builder.build() if state_builder else StateBuilder().build()
4955
return entrypoint_read(
50-
_source(catalog, config, state), config, catalog, state, expecting_exception
56+
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
5157
)
5258

5359

@@ -96,7 +102,32 @@ def test_get_article_attachments(self) -> None:
96102
file_reference = output.records[0].record.file_reference
97103
assert file_reference
98104
assert file_reference.file_url
105+
assert re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url)
99106
assert file_reference.file_relative_path
107+
assert re.match(
108+
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path
109+
)
110+
assert file_reference.file_size_bytes
111+
112+
def test_get_article_attachments_with_filename_extractor(self) -> None:
113+
output = read(
114+
self._config(),
115+
CatalogBuilder()
116+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
117+
.build(),
118+
yaml_file="test_file_stream_with_filename_extractor.yaml",
119+
)
120+
121+
assert output.records
122+
file_reference = output.records[0].record.file_reference
123+
assert file_reference
124+
assert file_reference.file_url
125+
# todo: once we finally mock the response update to check file name
126+
assert not re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url)
127+
assert file_reference.file_relative_path
128+
assert not re.match(
129+
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path
130+
)
100131
assert file_reference.file_size_bytes
101132

102133
def test_discover_article_attachments(self) -> None:

0 commit comments

Comments
 (0)