Skip to content

Commit b122c0a

Browse files
aldogonzalez8maxi297octavia-squidington-iii
authored
PoC for emit file reference record (#443)
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent b410187 commit b122c0a

File tree

18 files changed

+569
-295
lines changed

18 files changed

+569
-295
lines changed

airbyte_cdk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152152
is_file_transfer_message=record.is_file_transfer_message,
153+
file_reference=record.file_reference,
153154
)
154155
stream = self._stream_name_to_instance[record.stream_name]
155156

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,9 @@ def _group_streams(
207207
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
208208
# so we need to treat them as synchronous
209209

210-
file_uploader = None
211-
if isinstance(declarative_stream, DeclarativeStream):
212-
file_uploader = (
213-
self._constructor.create_component(
214-
model_type=FileUploader,
215-
component_definition=name_to_stream_mapping[declarative_stream.name][
216-
"file_uploader"
217-
],
218-
config=config,
219-
)
220-
if "file_uploader" in name_to_stream_mapping[declarative_stream.name]
221-
else None
222-
)
210+
supports_file_transfer = (
211+
"file_uploader" in name_to_stream_mapping[declarative_stream.name]
212+
)
223213

224214
if (
225215
isinstance(declarative_stream, DeclarativeStream)
@@ -288,7 +278,6 @@ def _group_streams(
288278
declarative_stream.get_json_schema(),
289279
retriever,
290280
self.message_repository,
291-
file_uploader,
292281
),
293282
stream_slicer=declarative_stream.retriever.stream_slicer,
294283
)
@@ -319,7 +308,6 @@ def _group_streams(
319308
declarative_stream.get_json_schema(),
320309
retriever,
321310
self.message_repository,
322-
file_uploader,
323311
),
324312
stream_slicer=cursor,
325313
)
@@ -339,6 +327,7 @@ def _group_streams(
339327
else None,
340328
logger=self.logger,
341329
cursor=cursor,
330+
supports_file_transfer=supports_file_transfer,
342331
)
343332
)
344333
elif (
@@ -350,7 +339,6 @@ def _group_streams(
350339
declarative_stream.get_json_schema(),
351340
declarative_stream.retriever,
352341
self.message_repository,
353-
file_uploader,
354342
),
355343
declarative_stream.retriever.stream_slicer,
356344
)
@@ -371,6 +359,7 @@ def _group_streams(
371359
cursor_field=None,
372360
logger=self.logger,
373361
cursor=final_state_cursor,
362+
supports_file_transfer=supports_file_transfer,
374363
)
375364
)
376365
elif (
@@ -410,7 +399,6 @@ def _group_streams(
410399
declarative_stream.get_json_schema(),
411400
retriever,
412401
self.message_repository,
413-
file_uploader,
414402
),
415403
perpartition_cursor,
416404
)
@@ -425,6 +413,7 @@ def _group_streams(
425413
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
426414
logger=self.logger,
427415
cursor=perpartition_cursor,
416+
supports_file_transfer=supports_file_transfer,
428417
)
429418
)
430419
else:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,15 @@ definitions:
14491449
anyOf:
14501450
- "$ref": "#/definitions/CustomRecordExtractor"
14511451
- "$ref": "#/definitions/DpathExtractor"
1452+
filename_extractor:
1453+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
1454+
type: string
1455+
interpolation_context:
1456+
- config
1457+
- record
1458+
examples:
1459+
- "{{ record.id }}/{{ record.file_name }}/"
1460+
- "{{ record.id }}_{{ record.file_name }}/"
14521461
$parameters:
14531462
type: object
14541463
additional_properties: true

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
1819
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1920
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2021
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
4243
record_filter: Optional[RecordFilter] = None
4344
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4445
transform_before_filtering: bool = False
46+
file_uploader: Optional[FileUploader] = None
4547

4648
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4749
self._parameters = parameters
@@ -117,7 +119,10 @@ def filter_and_transform(
117119
transformed_filtered_data, schema=records_schema
118120
)
119121
for data in normalized_data:
120-
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122+
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
123+
if self.file_uploader:
124+
self.file_uploader.upload(record)
125+
yield record
121126

122127
def _normalize_by_schema(
123128
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,31 @@ class Config:
19891989
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
19901990

19911991

1992+
class FileUploader(BaseModel):
1993+
type: Literal["FileUploader"]
1994+
requester: Union[CustomRequester, HttpRequester] = Field(
1995+
...,
1996+
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
1997+
)
1998+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
1999+
...,
2000+
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2001+
)
2002+
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2003+
None,
2004+
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2005+
)
2006+
filename_extractor: Optional[str] = Field(
2007+
None,
2008+
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
2009+
examples=[
2010+
"{{ record.id }}/{{ record.file_name }}/",
2011+
"{{ record.id }}_{{ record.file_name }}/",
2012+
],
2013+
)
2014+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2015+
2016+
19922017
class DeclarativeStream(BaseModel):
19932018
class Config:
19942019
extra = Extra.allow
@@ -2047,6 +2072,11 @@ class Config:
20472072
description="Array of state migrations to be applied on the input state",
20482073
title="State Migrations",
20492074
)
2075+
file_uploader: Optional[FileUploader] = Field(
2076+
None,
2077+
description="(experimental) Describes how to fetch a file",
2078+
title="File Uploader",
2079+
)
20502080
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20512081

20522082

@@ -2278,22 +2308,6 @@ class StateDelegatingStream(BaseModel):
22782308
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22792309

22802310

2281-
class FileUploader(BaseModel):
2282-
type: Literal["FileUploader"]
2283-
requester: Union[CustomRequester, HttpRequester] = Field(
2284-
...,
2285-
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
2286-
)
2287-
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2288-
...,
2289-
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2290-
)
2291-
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2292-
None,
2293-
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2294-
)
2295-
2296-
22972311
class SimpleRetriever(BaseModel):
22982312
type: Literal["SimpleRetriever"]
22992313
record_selector: RecordSelector = Field(
@@ -2324,11 +2338,6 @@ class SimpleRetriever(BaseModel):
23242338
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
23252339
title="Partition Router",
23262340
)
2327-
file_uploader: Optional[FileUploader] = Field(
2328-
None,
2329-
description="(experimental) Describes how to fetch a file",
2330-
title="File Uploader",
2331-
)
23322341
decoder: Optional[
23332342
Union[
23342343
CustomDecoder,
@@ -2485,6 +2494,7 @@ class DynamicDeclarativeStream(BaseModel):
24852494
DeclarativeSource1.update_forward_refs()
24862495
DeclarativeSource2.update_forward_refs()
24872496
SelectiveAuthenticator.update_forward_refs()
2497+
FileUploader.update_forward_refs()
24882498
DeclarativeStream.update_forward_refs()
24892499
SessionTokenAuthenticator.update_forward_refs()
24902500
DynamicSchemaLoader.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,11 @@ def create_declarative_stream(
17551755
transformations.append(
17561756
self._create_component_from_model(model=transformation_model, config=config)
17571757
)
1758+
file_uploader = None
1759+
if model.file_uploader:
1760+
file_uploader = self._create_component_from_model(
1761+
model=model.file_uploader, config=config
1762+
)
17581763

17591764
retriever = self._create_component_from_model(
17601765
model=model.retriever,
@@ -1766,6 +1771,7 @@ def create_declarative_stream(
17661771
stop_condition_on_cursor=stop_condition_on_cursor,
17671772
client_side_incremental_sync=client_side_incremental_sync,
17681773
transformations=transformations,
1774+
file_uploader=file_uploader,
17691775
incremental_sync=model.incremental_sync,
17701776
)
17711777
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
@@ -2607,6 +2613,7 @@ def create_record_selector(
26072613
transformations: List[RecordTransformation] | None = None,
26082614
decoder: Decoder | None = None,
26092615
client_side_incremental_sync: Dict[str, Any] | None = None,
2616+
file_uploader: Optional[FileUploader] = None,
26102617
**kwargs: Any,
26112618
) -> RecordSelector:
26122619
extractor = self._create_component_from_model(
@@ -2644,6 +2651,7 @@ def create_record_selector(
26442651
config=config,
26452652
record_filter=record_filter,
26462653
transformations=transformations or [],
2654+
file_uploader=file_uploader,
26472655
schema_normalization=schema_normalization,
26482656
parameters=model.parameters or {},
26492657
transform_before_filtering=transform_before_filtering,
@@ -2701,6 +2709,7 @@ def create_simple_retriever(
27012709
stop_condition_on_cursor: bool = False,
27022710
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
27032711
transformations: List[RecordTransformation],
2712+
file_uploader: Optional[FileUploader] = None,
27042713
incremental_sync: Optional[
27052714
Union[
27062715
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -2723,6 +2732,7 @@ def create_simple_retriever(
27232732
decoder=decoder,
27242733
transformations=transformations,
27252734
client_side_incremental_sync=client_side_incremental_sync,
2735+
file_uploader=file_uploader,
27262736
)
27272737
url_base = (
27282738
model.requester.url_base
@@ -3338,7 +3348,13 @@ def create_file_uploader(
33383348
name=name,
33393349
**kwargs,
33403350
)
3341-
return FileUploader(requester, download_target_extractor)
3351+
return FileUploader(
3352+
requester=requester,
3353+
download_target_extractor=download_target_extractor,
3354+
config=config,
3355+
parameters=model.parameters or {},
3356+
filename_extractor=model.filename_extractor if model.filename_extractor else None,
3357+
)
33423358

33433359
def create_moving_window_call_rate_policy(
33443360
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,89 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
15
import json
6+
import logging
7+
import uuid
8+
from dataclasses import InitVar, dataclass, field
29
from pathlib import Path
3-
from typing import Optional
10+
from typing import Any, Mapping, Optional, Union
411

12+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
513
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
14+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
15+
InterpolatedString,
16+
)
617
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
718
SafeResponse,
819
)
920
from airbyte_cdk.sources.declarative.requesters import Requester
1021
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
22+
from airbyte_cdk.sources.types import Config
23+
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1124

25+
logger = logging.getLogger("airbyte")
1226

27+
28+
@dataclass
1329
class FileUploader:
14-
def __init__(
15-
self,
16-
requester: Requester,
17-
download_target_extractor: RecordExtractor,
18-
content_extractor: Optional[RecordExtractor] = None,
19-
) -> None:
20-
self._requester = requester
21-
self._download_target_extractor = download_target_extractor
22-
self._content_extractor = content_extractor
30+
requester: Requester
31+
download_target_extractor: RecordExtractor
32+
config: Config
33+
parameters: InitVar[Mapping[str, Any]]
34+
35+
filename_extractor: Optional[Union[InterpolatedString, str]] = None
36+
content_extractor: Optional[RecordExtractor] = None
37+
38+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39+
if self.filename_extractor:
40+
self.filename_extractor = InterpolatedString.create(
41+
self.filename_extractor,
42+
parameters=parameters,
43+
)
2344

2445
def upload(self, record: Record) -> None:
25-
# TODO validate record shape - is the transformation applied at this point?
2646
mocked_response = SafeResponse()
2747
mocked_response.content = json.dumps(record.data).encode()
28-
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
48+
download_target = list(self.download_target_extractor.extract_records(mocked_response))[0]
2949
if not isinstance(download_target, str):
3050
raise ValueError(
3151
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
3252
)
3353

34-
response = self._requester.send_request(
54+
response = self.requester.send_request(
3555
stream_slice=StreamSlice(
3656
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
3757
),
3858
)
3959

40-
if self._content_extractor:
60+
if self.content_extractor:
4161
raise NotImplementedError("TODO")
4262
else:
43-
with open(str(Path(__file__).parent / record.data["file_name"]), "ab") as f:
63+
files_directory = Path(get_files_directory())
64+
65+
file_name = (
66+
self.filename_extractor.eval(self.config, record=record)
67+
if self.filename_extractor
68+
else str(uuid.uuid4())
69+
)
70+
file_name = file_name.lstrip("/")
71+
file_relative_path = Path(record.stream_name) / Path(file_name)
72+
73+
full_path = files_directory / file_relative_path
74+
full_path.parent.mkdir(parents=True, exist_ok=True)
75+
76+
with open(str(full_path), "wb") as f:
4477
f.write(response.content)
78+
file_size_bytes = full_path.stat().st_size
79+
80+
logger.info("File uploaded successfully")
81+
logger.info(f"File url: {str(full_path)}")
82+
logger.info(f"File size: {file_size_bytes / 1024} KB")
83+
logger.info(f"File relative path: {str(file_relative_path)}")
84+
85+
record.file_reference = AirbyteRecordMessageFileReference(
86+
file_url=str(full_path),
87+
file_relative_path=str(file_relative_path),
88+
file_size_bytes=file_size_bytes,
89+
)

0 commit comments

Comments
 (0)