Skip to content

Commit e860e3f

Browse files
committed
file-mode-api: fix classes and names to fit our vocabulary.
1 parent 5d113ec commit e860e3f

File tree

11 files changed

+150
-149
lines changed

11 files changed

+150
-149
lines changed

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18-
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader
1919
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
2020
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2121
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -43,7 +43,7 @@ class RecordSelector(HttpSelector):
4343
record_filter: Optional[RecordFilter] = None
4444
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4545
transform_before_filtering: bool = False
46-
file_uploader: Optional[FileUploader] = None
46+
file_uploader: Optional[DefaultFileUploader] = None
4747

4848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4949
self._parameters = parameters

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -482,10 +482,10 @@
482482
SimpleRetrieverTestReadDecorator,
483483
)
484484
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
485-
BaseFileUploader,
486-
ConnectorBuilderFileUploader,
487485
FileUploader,
488-
FileWriter,
486+
ConnectorBuilderFileUploader,
487+
DefaultFileUploader,
488+
LocalFileSystemFileWriter,
489489
NoopFileWriter,
490490
)
491491
from airbyte_cdk.sources.declarative.schema import (
@@ -2812,7 +2812,7 @@ def create_record_selector(
28122812
transformations: List[RecordTransformation] | None = None,
28132813
decoder: Decoder | None = None,
28142814
client_side_incremental_sync: Dict[str, Any] | None = None,
2815-
file_uploader: Optional[FileUploader] = None,
2815+
file_uploader: Optional[DefaultFileUploader] = None,
28162816
**kwargs: Any,
28172817
) -> RecordSelector:
28182818
extractor = self._create_component_from_model(
@@ -2908,7 +2908,7 @@ def create_simple_retriever(
29082908
stop_condition_on_cursor: bool = False,
29092909
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
29102910
transformations: List[RecordTransformation],
2911-
file_uploader: Optional[FileUploader] = None,
2911+
file_uploader: Optional[DefaultFileUploader] = None,
29122912
incremental_sync: Optional[
29132913
Union[
29142914
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -3598,7 +3598,7 @@ def create_fixed_window_call_rate_policy(
35983598

35993599
def create_file_uploader(
36003600
self, model: FileUploaderModel, config: Config, **kwargs: Any
3601-
) -> BaseFileUploader:
3601+
) -> FileUploader:
36023602
name = "File Uploader"
36033603
requester = self._create_component_from_model(
36043604
model=model.requester,
@@ -3613,11 +3613,11 @@ def create_file_uploader(
36133613
**kwargs,
36143614
)
36153615
emit_connector_builder_messages = self._emit_connector_builder_messages
3616-
file_uploader = FileUploader(
3616+
file_uploader = DefaultFileUploader(
36173617
requester=requester,
36183618
download_target_extractor=download_target_extractor,
36193619
config=config,
3620-
file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(),
3620+
file_writer=NoopFileWriter() if emit_connector_builder_messages else LocalFileSystemFileWriter(),
36213621
parameters=model.parameters or {},
36223622
filename_extractor=model.filename_extractor if model.filename_extractor else None,
36233623
)
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
from .base_file_uploader import BaseFileUploader
2-
from .base_file_writer import BaseFileWriter
3-
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
41
from .file_uploader import FileUploader
52
from .file_writer import FileWriter
3+
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
4+
from .default_file_uploader import DefaultFileUploader
5+
from .local_file_system_file_writer import LocalFileSystemFileWriter
66
from .noop_file_writer import NoopFileWriter
77

88
__all__ = [
9-
"FileUploader",
10-
"FileWriter",
9+
"DefaultFileUploader",
10+
"LocalFileSystemFileWriter",
1111
"NoopFileWriter",
1212
"ConnectorBuilderFileUploader",
13-
"BaseFileUploader",
14-
"BaseFileWriter",
13+
"FileUploader",
14+
"FileWriter",
1515
]

airbyte_cdk/sources/declarative/retrievers/file_uploader/base_file_uploader.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@
66

77
from airbyte_cdk.sources.declarative.types import Record
88

9-
from .base_file_uploader import BaseFileUploader
109
from .file_uploader import FileUploader
10+
from .default_file_uploader import DefaultFileUploader
1111

1212

1313
@dataclass
14-
class ConnectorBuilderFileUploader(BaseFileUploader):
14+
class ConnectorBuilderFileUploader(FileUploader):
1515
"""
1616
Connector builder file uploader
1717
Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
1818
"""
1919

20-
file_uploader: FileUploader
20+
file_uploader: DefaultFileUploader
2121

2222
def upload(self, record: Record) -> None:
2323
self.file_uploader.upload(record=record)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import json
6+
import logging
7+
import uuid
8+
from dataclasses import InitVar, dataclass, field
9+
from pathlib import Path
10+
from typing import Any, Mapping, Optional, Union
11+
12+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
13+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
14+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
15+
InterpolatedString,
16+
)
17+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
18+
SafeResponse,
19+
)
20+
from airbyte_cdk.sources.declarative.requesters import Requester
21+
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
22+
from airbyte_cdk.sources.types import Config
23+
from airbyte_cdk.sources.utils.files_directory import get_files_directory
24+
25+
from .file_uploader import FileUploader
26+
from .file_writer import FileWriter
27+
28+
logger = logging.getLogger("airbyte")
29+
30+
31+
@dataclass
32+
class DefaultFileUploader(FileUploader):
33+
"""
34+
File uploader class
35+
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
36+
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
37+
"""
38+
39+
requester: Requester
40+
download_target_extractor: RecordExtractor
41+
config: Config
42+
file_writer: FileWriter
43+
parameters: InitVar[Mapping[str, Any]]
44+
45+
filename_extractor: Optional[Union[InterpolatedString, str]] = None
46+
content_extractor: Optional[RecordExtractor] = None
47+
48+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49+
if self.filename_extractor:
50+
self.filename_extractor = InterpolatedString.create(
51+
self.filename_extractor,
52+
parameters=parameters,
53+
)
54+
55+
def upload(self, record: Record) -> None:
56+
mocked_response = SafeResponse()
57+
mocked_response.content = json.dumps(record.data).encode()
58+
download_targets = list(self.download_target_extractor.extract_records(mocked_response))
59+
if not download_targets:
60+
raise ValueError("No download targets found")
61+
62+
download_target = download_targets[0] # we just expect one download target
63+
if not isinstance(download_target, str):
64+
raise ValueError(
65+
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
66+
)
67+
68+
response = self.requester.send_request(
69+
stream_slice=StreamSlice(
70+
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
71+
),
72+
)
73+
74+
if self.content_extractor:
75+
raise NotImplementedError("Content extraction is not yet implemented. The content_extractor component is currently not supported.")
76+
else:
77+
files_directory = Path(get_files_directory())
78+
79+
file_name = (
80+
self.filename_extractor.eval(self.config, record=record)
81+
if self.filename_extractor
82+
else str(uuid.uuid4())
83+
)
84+
file_name = file_name.lstrip("/")
85+
file_relative_path = Path(record.stream_name) / Path(file_name)
86+
87+
full_path = files_directory / file_relative_path
88+
full_path.parent.mkdir(parents=True, exist_ok=True)
89+
90+
file_size_bytes = self.file_writer.write(full_path, content=response.content)
91+
92+
logger.info("File uploaded successfully")
93+
logger.info(f"File url: {str(full_path)}")
94+
logger.info(f"File size: {file_size_bytes / 1024} KB")
95+
logger.info(f"File relative path: {str(file_relative_path)}")
96+
97+
record.file_reference = AirbyteRecordMessageFileReference(
98+
staging_file_url=str(full_path),
99+
source_file_relative_path=str(file_relative_path),
100+
file_size_bytes=file_size_bytes,
101+
)

airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py

Lines changed: 10 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,100 +2,21 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5-
import json
6-
import logging
7-
import uuid
8-
from dataclasses import InitVar, dataclass, field
9-
from pathlib import Path
10-
from typing import Any, Mapping, Optional, Union
5+
from abc import ABC, abstractmethod
6+
from dataclasses import dataclass
117

12-
from airbyte_cdk.models import AirbyteRecordMessageFileReference
13-
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
14-
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
15-
InterpolatedString,
16-
)
17-
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
18-
SafeResponse,
19-
)
20-
from airbyte_cdk.sources.declarative.requesters import Requester
21-
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
22-
from airbyte_cdk.sources.types import Config
23-
from airbyte_cdk.sources.utils.files_directory import get_files_directory
24-
25-
from .base_file_uploader import BaseFileUploader
26-
from .base_file_writer import BaseFileWriter
27-
28-
logger = logging.getLogger("airbyte")
8+
from airbyte_cdk.sources.declarative.types import Record
299

3010

3111
@dataclass
32-
class FileUploader(BaseFileUploader):
12+
class FileUploader(ABC):
3313
"""
34-
File uploader class
35-
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
36-
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
14+
Base class for file uploader
3715
"""
3816

39-
requester: Requester
40-
download_target_extractor: RecordExtractor
41-
config: Config
42-
file_writer: BaseFileWriter
43-
parameters: InitVar[Mapping[str, Any]]
44-
45-
filename_extractor: Optional[Union[InterpolatedString, str]] = None
46-
content_extractor: Optional[RecordExtractor] = None
47-
48-
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49-
if self.filename_extractor:
50-
self.filename_extractor = InterpolatedString.create(
51-
self.filename_extractor,
52-
parameters=parameters,
53-
)
54-
17+
@abstractmethod
5518
def upload(self, record: Record) -> None:
56-
mocked_response = SafeResponse()
57-
mocked_response.content = json.dumps(record.data).encode()
58-
download_targets = list(self.download_target_extractor.extract_records(mocked_response))
59-
if not download_targets:
60-
raise ValueError("No download targets found")
61-
62-
download_target = download_targets[0] # we just expect one download target
63-
if not isinstance(download_target, str):
64-
raise ValueError(
65-
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
66-
)
67-
68-
response = self.requester.send_request(
69-
stream_slice=StreamSlice(
70-
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
71-
),
72-
)
73-
74-
if self.content_extractor:
75-
raise NotImplementedError("TODO")
76-
else:
77-
files_directory = Path(get_files_directory())
78-
79-
file_name = (
80-
self.filename_extractor.eval(self.config, record=record)
81-
if self.filename_extractor
82-
else str(uuid.uuid4())
83-
)
84-
file_name = file_name.lstrip("/")
85-
file_relative_path = Path(record.stream_name) / Path(file_name)
86-
87-
full_path = files_directory / file_relative_path
88-
full_path.parent.mkdir(parents=True, exist_ok=True)
89-
90-
file_size_bytes = self.file_writer.write(full_path, content=response.content)
91-
92-
logger.info("File uploaded successfully")
93-
logger.info(f"File url: {str(full_path)}")
94-
logger.info(f"File size: {file_size_bytes / 1024} KB")
95-
logger.info(f"File relative path: {str(file_relative_path)}")
96-
97-
record.file_reference = AirbyteRecordMessageFileReference(
98-
staging_file_url=str(full_path),
99-
source_file_relative_path=str(file_relative_path),
100-
file_size_bytes=file_size_bytes,
101-
)
19+
"""
20+
Uploads the file to the specified location
21+
"""
22+
...

airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5+
from abc import ABC, abstractmethod
56
from pathlib import Path
67

7-
from .base_file_writer import BaseFileWriter
88

9+
class FileWriter(ABC):
10+
"""
11+
Base File writer class
12+
"""
913

10-
class FileWriter(BaseFileWriter):
14+
@abstractmethod
1115
def write(self, file_path: Path, content: bytes) -> int:
1216
"""
1317
Writes the file to the specified location
1418
"""
15-
with open(str(file_path), "wb") as f:
16-
f.write(content)
17-
18-
return file_path.stat().st_size
19+
...

0 commit comments

Comments
 (0)