Skip to content

Commit 0673366

Browse files
committed
file-mode-api: make filename_extractor optional and fallback to guid
1 parent 7b058c9 commit 0673366

File tree

7 files changed

+248
-19
lines changed

7 files changed

+248
-19
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,13 +1450,14 @@ definitions:
14501450
- "$ref": "#/definitions/CustomRecordExtractor"
14511451
- "$ref": "#/definitions/DpathExtractor"
14521452
filename_extractor:
1453-
description: Defines relative path and name to store the file
1453+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
14541454
type: string
14551455
interpolation_context:
14561456
- config
14571457
- record
14581458
examples:
1459-
- "{{ record.relative_path }}/{{ record.file_name }}/"
1459+
- "{{ record.id }}/{{ record.file_name }}/"
1460+
- "{{ record.id }}_{{ record.file_name }}/"
14601461
$parameters:
14611462
type: object
14621463
additional_properties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,11 +2003,12 @@ class FileUploader(BaseModel):
20032003
None,
20042004
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
20052005
)
2006-
filename_extractor: str = Field(
2007-
...,
2008-
description="File Name extractor.",
2006+
filename_extractor: Optional[str] = Field(
2007+
None,
2008+
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
20092009
examples=[
2010-
"{{ record.relative_path }}/{{ record.file_name }}/",
2010+
"{{ record.id }}/{{ record.file_name }}/",
2011+
"{{ record.id }}_{{ record.file_name }}/",
20112012
],
20122013
)
20132014
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3353,7 +3353,7 @@ def create_file_uploader(
33533353
download_target_extractor=download_target_extractor,
33543354
config=config,
33553355
parameters=model.parameters or {},
3356-
filename_extractor=model.filename_extractor,
3356+
filename_extractor=model.filename_extractor if model.filename_extractor else None,
33573357
)
33583358

33593359
def create_moving_window_call_rate_policy(

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
import logging
7+
import uuid
78
from dataclasses import InitVar, dataclass, field
89
from pathlib import Path
910
from typing import Optional, Mapping, Union, Any
@@ -31,14 +32,15 @@ class FileUploader:
3132
config: Config
3233
parameters: InitVar[Mapping[str, Any]]
3334

34-
filename_extractor: Union[InterpolatedString, str]
35+
filename_extractor: Optional[Union[InterpolatedString, str]] = None
3536
content_extractor: Optional[RecordExtractor] = None
3637

3738
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38-
self._filename_extractor = InterpolatedString.create(
39-
self.filename_extractor,
40-
parameters=parameters,
41-
)
39+
if self.filename_extractor:
40+
self.filename_extractor = InterpolatedString.create(
41+
self.filename_extractor,
42+
parameters=parameters,
43+
)
4244

4345
def upload(self, record: Record) -> None:
4446
mocked_response = SafeResponse()
@@ -60,9 +62,13 @@ def upload(self, record: Record) -> None:
6062
else:
6163
files_directory = Path(get_files_directory())
6264

63-
relative_path = self._filename_extractor.eval(self.config, record=record)
64-
relative_path = relative_path.lstrip("/")
65-
file_relative_path = Path(relative_path)
65+
file_name = (
66+
self.filename_extractor.eval(self.config, record=record)
67+
if self.filename_extractor
68+
else str(uuid.uuid4())
69+
)
70+
file_name = file_name.lstrip("/")
71+
file_relative_path = Path(record.stream_name) / Path(file_name)
6672

6773
full_path = files_directory / file_relative_path
6874
full_path.parent.mkdir(parents=True, exist_ok=True)
@@ -72,7 +78,7 @@ def upload(self, record: Record) -> None:
7278
file_size_bytes = full_path.stat().st_size
7379

7480
logger.info("File uploaded successfully")
75-
logger.info(f"File url: {str(full_path)} ")
81+
logger.info(f"File url: {str(full_path)}")
7682
logger.info(f"File size: {file_size_bytes / 1024} KB")
7783
logger.info(f"File relative path: {str(file_relative_path)}")
7884

unit_tests/sources/declarative/file/file_stream_manifest.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ definitions:
161161
download_target_extractor:
162162
type: DpathExtractor
163163
field_path: [ "content_url" ]
164-
filename_extractor: "{{ record.relative_path }}/{{ record.file_name }}/"
165164

166165

167166
streams:

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import re
2+
13
from pathlib import Path
24
from typing import Any, Dict, List, Optional
35
from unittest import TestCase
@@ -29,9 +31,12 @@ def _source(
2931
catalog: ConfiguredAirbyteCatalog,
3032
config: Dict[str, Any],
3133
state: Optional[List[AirbyteStateMessage]] = None,
34+
yaml_file: Optional[str] = None,
3235
) -> YamlDeclarativeSource:
36+
if not yaml_file:
37+
yaml_file = "file_stream_manifest.yaml"
3338
return YamlDeclarativeSource(
34-
path_to_yaml=str(Path(__file__).parent / "file_stream_manifest.yaml"),
39+
path_to_yaml=str(Path(__file__).parent / yaml_file),
3540
catalog=catalog,
3641
config=config,
3742
state=state,
@@ -43,11 +48,12 @@ def read(
4348
catalog: ConfiguredAirbyteCatalog,
4449
state_builder: Optional[StateBuilder] = None,
4550
expecting_exception: bool = False,
51+
yaml_file: Optional[str] = None,
4652
) -> EntrypointOutput:
4753
config = config_builder.build()
4854
state = state_builder.build() if state_builder else StateBuilder().build()
4955
return entrypoint_read(
50-
_source(catalog, config, state), config, catalog, state, expecting_exception
56+
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
5157
)
5258

5359

@@ -96,7 +102,32 @@ def test_get_article_attachments(self) -> None:
96102
file_reference = output.records[0].record.file_reference
97103
assert file_reference
98104
assert file_reference.file_url
105+
assert re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url)
99106
assert file_reference.file_relative_path
107+
assert re.match(
108+
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path
109+
)
110+
assert file_reference.file_size_bytes
111+
112+
def test_get_article_attachments_with_filename_extractor(self) -> None:
113+
output = read(
114+
self._config(),
115+
CatalogBuilder()
116+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
117+
.build(),
118+
yaml_file="test_file_stream_with_filename_extractor.yaml",
119+
)
120+
121+
assert output.records
122+
file_reference = output.records[0].record.file_reference
123+
assert file_reference
124+
assert file_reference.file_url
125+
# todo: once we finally mock the response update to check file name
126+
assert not re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url)
127+
assert file_reference.file_relative_path
128+
assert not re.match(
129+
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path
130+
)
100131
assert file_reference.file_size_bytes
101132

102133
def test_discover_article_attachments(self) -> None:
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
version: 2.0.0
2+
3+
type: DeclarativeSource
4+
5+
check:
6+
type: CheckStream
7+
stream_names:
8+
- "articles"
9+
10+
definitions:
11+
bearer_authenticator:
12+
type: BearerAuthenticator
13+
api_token: "{{ config['credentials']['access_token'] }}"
14+
basic_authenticator:
15+
type: BasicHttpAuthenticator
16+
username: "{{ config['credentials']['email'] + '/token' }}"
17+
password: "{{ config['credentials']['api_token'] }}"
18+
19+
retriever:
20+
type: SimpleRetriever
21+
requester:
22+
type: HttpRequester
23+
url_base: https://{{ config['subdomain'] }}.zendesk.com/api/v2/
24+
http_method: GET
25+
authenticator:
26+
type: SelectiveAuthenticator
27+
authenticator_selection_path: ["credentials", "credentials"]
28+
authenticators:
29+
oauth2.0: "#/definitions/bearer_authenticator"
30+
api_token: "#/definitions/basic_authenticator"
31+
record_selector:
32+
type: RecordSelector
33+
extractor:
34+
type: DpathExtractor
35+
field_path:
36+
["{{ parameters.get('data_path') or parameters.get('name') }}"]
37+
schema_normalization: Default
38+
paginator:
39+
type: DefaultPaginator
40+
page_size_option:
41+
type: RequestOption
42+
field_name: "per_page"
43+
inject_into: request_parameter
44+
pagination_strategy:
45+
type: CursorPagination
46+
page_size: 100
47+
cursor_value: '{{ response.get("next_page", {}) }}'
48+
stop_condition: "{{ last_page_size == 0 }}"
49+
page_token_option:
50+
type: RequestPath
51+
52+
base_stream:
53+
type: DeclarativeStream
54+
schema_loader:
55+
type: JsonFileSchemaLoader
56+
retriever:
57+
$ref: "#/definitions/retriever"
58+
59+
cursor_incremental_sync:
60+
type: DatetimeBasedCursor
61+
cursor_datetime_formats:
62+
- "%s"
63+
- "%Y-%m-%dT%H:%M:%SZ"
64+
- "%Y-%m-%dT%H:%M:%S%z"
65+
datetime_format: "%s"
66+
cursor_field: "{{ parameters.get('cursor_field', 'updated_at') }}"
67+
start_datetime:
68+
datetime: "{{ timestamp(config.get('start_date')) | int if config.get('start_date') else day_delta(-730, '%s') }}"
69+
start_time_option:
70+
inject_into: request_parameter
71+
field_name: "{{ parameters['cursor_filter'] }}"
72+
type: RequestOption
73+
74+
base_incremental_stream:
75+
$ref: "#/definitions/base_stream"
76+
incremental_sync:
77+
$ref: "#/definitions/cursor_incremental_sync"
78+
79+
# Incremental cursor-based streams
80+
articles_stream:
81+
$ref: "#/definitions/base_incremental_stream"
82+
name: "articles"
83+
primary_key: "id"
84+
schema_loader:
85+
type: InlineSchemaLoader
86+
schema:
87+
type: object
88+
$schema: http://json-schema.org/schema#
89+
properties:
90+
id:
91+
type: integer
92+
additionalProperties: true
93+
incremental_sync:
94+
$ref: "#/definitions/cursor_incremental_sync"
95+
start_time_option:
96+
$ref: "#/definitions/cursor_incremental_sync/start_time_option"
97+
field_name: "start_time"
98+
retriever:
99+
$ref: "#/definitions/retriever"
100+
ignore_stream_slicer_parameters_on_paginated_requests: true
101+
requester:
102+
$ref: "#/definitions/retriever/requester"
103+
path: "help_center/incremental/articles"
104+
paginator:
105+
type: DefaultPaginator
106+
pagination_strategy:
107+
type: CursorPagination
108+
cursor_value: '{{ response.get("next_page", {}) }}'
109+
stop_condition: "{{ config.get('ignore_pagination', False) or last_page_size == 0 }}"
110+
page_token_option:
111+
type: RequestPath
112+
record_selector:
113+
extractor:
114+
type: DpathExtractor
115+
field_path: ["articles"]
116+
117+
article_attachments_stream:
118+
$ref: "#/definitions/base_incremental_stream"
119+
name: "article_attachments"
120+
primary_key: "id"
121+
schema_loader:
122+
type: InlineSchemaLoader
123+
schema:
124+
type: object
125+
$schema: http://json-schema.org/schema#
126+
properties:
127+
id:
128+
type: integer
129+
additionalProperties: true
130+
retriever:
131+
$ref: "#/definitions/retriever"
132+
ignore_stream_slicer_parameters_on_paginated_requests: true
133+
requester:
134+
$ref: "#/definitions/retriever/requester"
135+
path: "help_center/articles/{{ stream_partition.article_id }}/attachments"
136+
partition_router:
137+
type: SubstreamPartitionRouter
138+
parent_stream_configs:
139+
- type: ParentStreamConfig
140+
parent_key: "id"
141+
partition_field: "article_id"
142+
stream:
143+
$ref: "#/definitions/articles_stream"
144+
incremental_dependency: true
145+
record_selector:
146+
extractor:
147+
type: DpathExtractor
148+
field_path: ["article_attachments"]
149+
file_uploader:
150+
type: FileUploader
151+
requester:
152+
type: HttpRequester
153+
url_base: "{{download_target}}"
154+
http_method: GET
155+
authenticator:
156+
type: SelectiveAuthenticator
157+
authenticator_selection_path: [ "credentials", "credentials" ]
158+
authenticators:
159+
oauth2.0: "#/definitions/bearer_authenticator"
160+
api_token: "#/definitions/basic_authenticator"
161+
download_target_extractor:
162+
type: DpathExtractor
163+
field_path: [ "content_url" ]
164+
filename_extractor: "{{ record.id }}/{{ record.file_name }}/"
165+
166+
167+
streams:
168+
- $ref: "#/definitions/articles_stream"
169+
- $ref: "#/definitions/article_attachments_stream"
170+
171+
spec:
172+
type: Spec
173+
connection_specification:
174+
type: object
175+
$schema: http://json-schema.org/draft-07/schema#
176+
required:
177+
- subdomain
178+
- start_date
179+
properties:
180+
subdomain:
181+
type: string
182+
name: subdomain
183+
order: 0
184+
title: Subdomain
185+
start_date:
186+
type: string
187+
order: 1
188+
title: Start date
189+
format: date-time
190+
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
191+
additionalProperties: true

0 commit comments

Comments
 (0)