Skip to content

Commit da7976d

Browse files
authored
Merge branch 'main' into christo/manifest-server-errors
2 parents c5c83e3 + b28c6e3 commit da7976d

File tree

7 files changed

+355
-33
lines changed

7 files changed

+355
-33
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2208,6 +2208,14 @@ def _build_concurrent_cursor(
22082208
and stream_slicer
22092209
and not isinstance(stream_slicer, SinglePartitionRouter)
22102210
):
2211+
if isinstance(model.incremental_sync, IncrementingCountCursorModel):
2212+
# We don't currently support usage of partition routing and IncrementingCountCursor at the
2213+
# same time because we didn't solve for design questions like what the lookback window would
2214+
# be as well as global cursor fall backs. We have not seen customers that have needed both
2215+
# at the same time yet and are currently punting on this until we need to solve it.
2216+
raise ValueError(
2217+
f"The low-code framework does not currently support usage of a PartitionRouter and an IncrementingCountCursor at the same time. Please specify only one of these options for stream {stream_name}."
2218+
)
22112219
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22122220
state_manager=self._connector_state_manager,
22132221
model_type=DatetimeBasedCursorModel,
@@ -2395,21 +2403,12 @@ def create_http_requester(
23952403

23962404
api_budget = self._api_budget
23972405

2398-
# Removes QueryProperties components from the interpolated mappings because it has been designed
2399-
# to be used by the SimpleRetriever and will be resolved from the provider from the slice directly
2400-
# instead of through jinja interpolation
2401-
request_parameters: Optional[Union[str, Mapping[str, str]]]
2402-
if isinstance(model.request_parameters, Mapping):
2403-
request_parameters = self._remove_query_properties(model.request_parameters)
2404-
else:
2405-
request_parameters = model.request_parameters
2406-
24072406
request_options_provider = InterpolatedRequestOptionsProvider(
24082407
request_body=model.request_body,
24092408
request_body_data=model.request_body_data,
24102409
request_body_json=model.request_body_json,
24112410
request_headers=model.request_headers,
2412-
request_parameters=request_parameters,
2411+
request_parameters=model.request_parameters, # type: ignore # QueryProperties have been removed in `create_simple_retriever`
24132412
query_properties_key=query_properties_key,
24142413
config=config,
24152414
parameters=model.parameters or {},
@@ -3199,7 +3198,8 @@ def _get_url(req: Requester) -> str:
31993198

32003199
query_properties: Optional[QueryProperties] = None
32013200
query_properties_key: Optional[str] = None
3202-
if self._query_properties_in_request_parameters(model.requester):
3201+
self._ensure_query_properties_to_model(model.requester)
3202+
if self._has_query_properties_in_request_parameters(model.requester):
32033203
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
32043204
# places instead of default to request_parameters which isn't clearly documented
32053205
if (
@@ -3211,7 +3211,7 @@ def _get_url(req: Requester) -> str:
32113211
)
32123212

32133213
query_properties_definitions = []
3214-
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters()
3214+
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _has_query_properties_in_request_parameters()
32153215
if isinstance(request_parameter, QueryPropertiesModel):
32163216
query_properties_key = key
32173217
query_properties_definitions.append(request_parameter)
@@ -3225,6 +3225,16 @@ def _get_url(req: Requester) -> str:
32253225
query_properties = self._create_component_from_model(
32263226
model=query_properties_definitions[0], config=config
32273227
)
3228+
3229+
# Removes QueryProperties components from the interpolated mappings because it has been designed
3230+
# to be used by the SimpleRetriever and will be resolved from the provider from the slice directly
3231+
# instead of through jinja interpolation
3232+
if hasattr(model.requester, "request_parameters") and isinstance(
3233+
model.requester.request_parameters, Mapping
3234+
):
3235+
model.requester.request_parameters = self._remove_query_properties(
3236+
model.requester.request_parameters
3237+
)
32283238
elif (
32293239
hasattr(model.requester, "fetch_properties_from_endpoint")
32303240
and model.requester.fetch_properties_from_endpoint
@@ -3361,7 +3371,7 @@ def _should_limit_slices_fetched(self) -> bool:
33613371
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)
33623372

33633373
@staticmethod
3364-
def _query_properties_in_request_parameters(
3374+
def _has_query_properties_in_request_parameters(
33653375
requester: Union[HttpRequesterModel, CustomRequesterModel],
33663376
) -> bool:
33673377
if not hasattr(requester, "request_parameters"):
@@ -4175,3 +4185,26 @@ def create_grouping_partition_router(
41754185
deduplicate=model.deduplicate if model.deduplicate is not None else True,
41764186
config=config,
41774187
)
4188+
4189+
def _ensure_query_properties_to_model(
4190+
self, requester: Union[HttpRequesterModel, CustomRequesterModel]
4191+
) -> None:
4192+
"""
4193+
For some reason, it seems like CustomRequesterModel request_parameters stays as dictionaries which means that
4194+
the other conditions relying on it being QueryPropertiesModel instead of a dict fail. Here, we migrate them to
4195+
proper model.
4196+
"""
4197+
if not hasattr(requester, "request_parameters"):
4198+
return
4199+
4200+
request_parameters = requester.request_parameters
4201+
if request_parameters and isinstance(request_parameters, Dict):
4202+
for request_parameter_key in request_parameters.keys():
4203+
request_parameter = request_parameters[request_parameter_key]
4204+
if (
4205+
isinstance(request_parameter, Dict)
4206+
and request_parameter.get("type") == "QueryProperties"
4207+
):
4208+
request_parameters[request_parameter_key] = QueryPropertiesModel.parse_obj(
4209+
request_parameter
4210+
)

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
#
44

55
import logging
6+
import time
67
from abc import ABC, abstractmethod
78
from datetime import datetime
89
from enum import Enum
910
from io import IOBase
1011
from os import makedirs, path
11-
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple
12+
from typing import Any, Iterable, List, MutableMapping, Optional, Set, Tuple
1213

14+
from airbyte_protocol_dataclasses.models import FailureType
1315
from wcmatch.glob import GLOBSTAR, globmatch
1416

1517
from airbyte_cdk.models import AirbyteRecordMessageFileReference
@@ -19,8 +21,9 @@
1921
preserve_directory_structure,
2022
use_file_transfer,
2123
)
24+
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
2225
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
23-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
26+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
2427

2528

2629
class FileReadMode(Enum):
@@ -34,6 +37,7 @@ class AbstractFileBasedStreamReader(ABC):
3437
FILE_NAME = "file_name"
3538
LOCAL_FILE_PATH = "local_file_path"
3639
FILE_FOLDER = "file_folder"
40+
FILE_SIZE_LIMIT = 1_500_000_000
3741

3842
def __init__(self) -> None:
3943
self._config = None
@@ -113,16 +117,6 @@ def filter_files_by_globs_and_start_date(
113117
seen.add(file.uri)
114118
yield file
115119

116-
@abstractmethod
117-
def file_size(self, file: RemoteFile) -> int:
118-
"""Utility method to get size of the remote file.
119-
120-
This is required for connectors that will support writing to
121-
files. If the connector does not support writing files, then the
122-
subclass can simply `return 0`.
123-
"""
124-
...
125-
126120
@staticmethod
127121
def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
128122
# Use the GLOBSTAR flag to enable recursive ** matching
@@ -153,9 +147,8 @@ def include_identities_stream(self) -> bool:
153147
return include_identities_stream(self.config)
154148
return False
155149

156-
@abstractmethod
157150
def upload(
158-
self, file: RemoteFile, local_directory: str, logger: logging.Logger
151+
self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger
159152
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
160153
"""
161154
This is required for connectors that will support writing to
@@ -173,7 +166,53 @@ def upload(
173166
- file_size_bytes (int): The size of the referenced file in bytes.
174167
- source_file_relative_path (str): The relative path to the referenced file in source.
175168
"""
176-
...
169+
if not isinstance(file, UploadableRemoteFile):
170+
raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}")
171+
172+
file_size = file.size
173+
174+
if file_size > self.FILE_SIZE_LIMIT:
175+
message = f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit."
176+
raise FileSizeLimitError(
177+
message=message, internal_message=message, failure_type=FailureType.config_error
178+
)
179+
180+
file_paths = self._get_file_transfer_paths(
181+
source_file_relative_path=file.source_file_relative_path,
182+
staging_directory=local_directory,
183+
)
184+
local_file_path = file_paths[self.LOCAL_FILE_PATH]
185+
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
186+
file_name = file_paths[self.FILE_NAME]
187+
188+
logger.info(
189+
f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
190+
)
191+
start_download_time = time.time()
192+
193+
file.download_to_local_directory(local_file_path)
194+
195+
write_duration = time.time() - start_download_time
196+
logger.info(
197+
f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
198+
)
199+
200+
file_record_data = FileRecordData(
201+
folder=file_paths[self.FILE_FOLDER],
202+
file_name=file_name,
203+
bytes=file_size,
204+
id=file.id,
205+
mime_type=file.mime_type,
206+
created_at=file.created_at,
207+
updated_at=file.updated_at,
208+
source_uri=file.uri,
209+
)
210+
file_reference = AirbyteRecordMessageFileReference(
211+
staging_file_url=local_file_path,
212+
source_file_relative_path=file_relative_path,
213+
file_size_bytes=file_size,
214+
)
215+
return file_record_data, file_reference
177216

178217
def _get_file_transfer_paths(
179218
self, source_file_relative_path: str, staging_directory: str

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airbyte_cdk.models import AirbyteRecordMessageFileReference
88
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
99
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
10-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
10+
from airbyte_cdk.sources.file_based.remote_file import UploadableRemoteFile
1111
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1212

1313

@@ -17,7 +17,7 @@ def __init__(self) -> None:
1717

1818
def upload(
1919
self,
20-
file: RemoteFile,
20+
file: UploadableRemoteFile,
2121
stream_reader: AbstractFileBasedStreamReader,
2222
logger: logging.Logger,
2323
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:

airbyte_cdk/sources/file_based/remote_file.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
from abc import ABC, abstractmethod
55
from datetime import datetime
66
from typing import Optional
77

@@ -16,3 +16,42 @@ class RemoteFile(BaseModel):
1616
uri: str
1717
last_modified: datetime
1818
mime_type: Optional[str] = None
19+
20+
21+
class UploadableRemoteFile(RemoteFile, ABC):
22+
"""
23+
A file in a file-based stream that supports uploading(file transferring).
24+
"""
25+
26+
id: Optional[str] = None
27+
created_at: Optional[str] = None
28+
updated_at: Optional[str] = None
29+
30+
@property
31+
@abstractmethod
32+
def size(self) -> int:
33+
"""
34+
Returns the file size in bytes.
35+
"""
36+
...
37+
38+
@abstractmethod
39+
def download_to_local_directory(self, local_file_path: str) -> None:
40+
"""
41+
Download the file from remote source to local storage.
42+
"""
43+
...
44+
45+
@property
46+
def source_file_relative_path(self) -> str:
47+
"""
48+
Returns the relative path of the source file.
49+
"""
50+
return self.uri
51+
52+
@property
53+
def file_uri_for_logging(self) -> str:
54+
"""
55+
Returns the URI for the file being logged.
56+
"""
57+
return self.uri

0 commit comments

Comments
 (0)