Skip to content

Commit a9ba12a

Browse files
author
Oleksandr Bazarnov
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/add-deprecations-module
2 parents 97c83d9 + 2f29eff commit a9ba12a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1339
-392
lines changed

airbyte_cdk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/models/airbyte_protocol.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
99
from serpyco_rs.metadata import Alias
1010

11-
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
12-
1311
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
1412

1513

@@ -84,7 +82,7 @@ class AirbyteMessage:
8482
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
8583
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
8684
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
87-
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
85+
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
8886
state: Optional[AirbyteStateMessage] = None
8987
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
9088
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]

airbyte_cdk/models/file_transfer_record_message.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
149149
message = stream_data_to_airbyte_message(
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152-
is_file_transfer_message=record.is_file_transfer_message,
152+
file_reference=record.file_reference,
153153
)
154154
stream = self._stream_name_to_instance[record.stream_name]
155155

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
PerPartitionWithGlobalCursor,
2929
)
3030
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
31+
from airbyte_cdk.sources.declarative.models import FileUploader
3132
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3233
ConcurrencyLevel as ConcurrencyLevelModel,
3334
)
@@ -209,6 +210,11 @@ def _group_streams(
209210
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
210211
# so we need to treat them as synchronous
211212

213+
supports_file_transfer = (
214+
isinstance(declarative_stream, DeclarativeStream)
215+
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
216+
)
217+
212218
if (
213219
isinstance(declarative_stream, DeclarativeStream)
214220
and name_to_stream_mapping[declarative_stream.name]["type"]
@@ -325,6 +331,7 @@ def _group_streams(
325331
else None,
326332
logger=self.logger,
327333
cursor=cursor,
334+
supports_file_transfer=supports_file_transfer,
328335
)
329336
)
330337
elif (
@@ -356,6 +363,7 @@ def _group_streams(
356363
cursor_field=None,
357364
logger=self.logger,
358365
cursor=final_state_cursor,
366+
supports_file_transfer=supports_file_transfer,
359367
)
360368
)
361369
elif (
@@ -410,6 +418,7 @@ def _group_streams(
410418
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
411419
logger=self.logger,
412420
cursor=perpartition_cursor,
421+
supports_file_transfer=supports_file_transfer,
413422
)
414423
)
415424
else:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,42 @@ definitions:
14481448
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
14491449
- "$ref": "#/definitions/CustomStateMigration"
14501450
default: []
1451+
file_uploader:
1452+
title: File Uploader
1453+
description: (experimental) Describes how to fetch a file
1454+
type: object
1455+
required:
1456+
- type
1457+
- requester
1458+
- download_target_extractor
1459+
properties:
1460+
type:
1461+
type: string
1462+
enum: [ FileUploader ]
1463+
requester:
1464+
description: Requester component that describes how to prepare HTTP requests to send to the source API.
1465+
anyOf:
1466+
- "$ref": "#/definitions/CustomRequester"
1467+
- "$ref": "#/definitions/HttpRequester"
1468+
download_target_extractor:
1469+
description: Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response
1470+
anyOf:
1471+
- "$ref": "#/definitions/CustomRecordExtractor"
1472+
- "$ref": "#/definitions/DpathExtractor"
1473+
file_extractor:
1474+
description: Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content
1475+
anyOf:
1476+
- "$ref": "#/definitions/CustomRecordExtractor"
1477+
- "$ref": "#/definitions/DpathExtractor"
1478+
filename_extractor:
1479+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
1480+
type: string
1481+
interpolation_context:
1482+
- config
1483+
- record
1484+
examples:
1485+
- "{{ record.id }}/{{ record.file_name }}/"
1486+
- "{{ record.id }}_{{ record.file_name }}/"
14511487
$parameters:
14521488
type: object
14531489
additional_properties: true

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
1819
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1920
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2021
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
4243
record_filter: Optional[RecordFilter] = None
4344
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4445
transform_before_filtering: bool = False
46+
file_uploader: Optional[FileUploader] = None
4547

4648
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4749
self._parameters = parameters
@@ -117,7 +119,10 @@ def filter_and_transform(
117119
transformed_filtered_data, schema=records_schema
118120
)
119121
for data in normalized_data:
120-
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122+
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
123+
if self.file_uploader:
124+
self.file_uploader.upload(record)
125+
yield record
121126

122127
def _normalize_by_schema(
123128
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,31 @@ class Config:
20652065
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20662066

20672067

2068+
class FileUploader(BaseModel):
2069+
type: Literal["FileUploader"]
2070+
requester: Union[CustomRequester, HttpRequester] = Field(
2071+
...,
2072+
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
2073+
)
2074+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2075+
...,
2076+
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2077+
)
2078+
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2079+
None,
2080+
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2081+
)
2082+
filename_extractor: Optional[str] = Field(
2083+
None,
2084+
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
2085+
examples=[
2086+
"{{ record.id }}/{{ record.file_name }}/",
2087+
"{{ record.id }}_{{ record.file_name }}/",
2088+
],
2089+
)
2090+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2091+
2092+
20682093
class DeclarativeStream(BaseModel):
20692094
class Config:
20702095
extra = Extra.allow
@@ -2123,6 +2148,11 @@ class Config:
21232148
description="Array of state migrations to be applied on the input state",
21242149
title="State Migrations",
21252150
)
2151+
file_uploader: Optional[FileUploader] = Field(
2152+
None,
2153+
description="(experimental) Describes how to fetch a file",
2154+
title="File Uploader",
2155+
)
21262156
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
21272157

21282158

@@ -2631,6 +2661,7 @@ class DynamicDeclarativeStream(BaseModel):
26312661
DeclarativeSource1.update_forward_refs()
26322662
DeclarativeSource2.update_forward_refs()
26332663
SelectiveAuthenticator.update_forward_refs()
2664+
FileUploader.update_forward_refs()
26342665
DeclarativeStream.update_forward_refs()
26352666
SessionTokenAuthenticator.update_forward_refs()
26362667
DynamicSchemaLoader.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
)
107107
from airbyte_cdk.sources.declarative.models import (
108108
CustomStateMigration,
109-
GzipDecoder,
110109
)
111110
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
112111
DEPRECATION_LOGS_TAG,
@@ -232,6 +231,9 @@
232231
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
233232
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
234233
)
234+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
235+
FileUploader as FileUploaderModel,
236+
)
235237
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
236238
FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel,
237239
)
@@ -483,6 +485,7 @@
483485
SimpleRetriever,
484486
SimpleRetrieverTestReadDecorator,
485487
)
488+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
486489
from airbyte_cdk.sources.declarative.schema import (
487490
ComplexFieldType,
488491
DefaultSchemaLoader,
@@ -682,6 +685,7 @@ def _init_mappings(self) -> None:
682685
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
683686
ZipfileDecoderModel: self.create_zipfile_decoder,
684687
HTTPAPIBudgetModel: self.create_http_api_budget,
688+
FileUploaderModel: self.create_file_uploader,
685689
FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy,
686690
MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy,
687691
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
@@ -1872,6 +1876,11 @@ def create_declarative_stream(
18721876
transformations.append(
18731877
self._create_component_from_model(model=transformation_model, config=config)
18741878
)
1879+
file_uploader = None
1880+
if model.file_uploader:
1881+
file_uploader = self._create_component_from_model(
1882+
model=model.file_uploader, config=config
1883+
)
18751884

18761885
retriever = self._create_component_from_model(
18771886
model=model.retriever,
@@ -1883,6 +1892,7 @@ def create_declarative_stream(
18831892
stop_condition_on_cursor=stop_condition_on_cursor,
18841893
client_side_incremental_sync=client_side_incremental_sync,
18851894
transformations=transformations,
1895+
file_uploader=file_uploader,
18861896
incremental_sync=model.incremental_sync,
18871897
)
18881898
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
@@ -2829,6 +2839,7 @@ def create_record_selector(
28292839
transformations: List[RecordTransformation] | None = None,
28302840
decoder: Decoder | None = None,
28312841
client_side_incremental_sync: Dict[str, Any] | None = None,
2842+
file_uploader: Optional[FileUploader] = None,
28322843
**kwargs: Any,
28332844
) -> RecordSelector:
28342845
extractor = self._create_component_from_model(
@@ -2866,6 +2877,7 @@ def create_record_selector(
28662877
config=config,
28672878
record_filter=record_filter,
28682879
transformations=transformations or [],
2880+
file_uploader=file_uploader,
28692881
schema_normalization=schema_normalization,
28702882
parameters=model.parameters or {},
28712883
transform_before_filtering=transform_before_filtering,
@@ -2923,6 +2935,7 @@ def create_simple_retriever(
29232935
stop_condition_on_cursor: bool = False,
29242936
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
29252937
transformations: List[RecordTransformation],
2938+
file_uploader: Optional[FileUploader] = None,
29262939
incremental_sync: Optional[
29272940
Union[
29282941
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -2962,6 +2975,7 @@ def _get_url() -> str:
29622975
decoder=decoder,
29632976
transformations=transformations,
29642977
client_side_incremental_sync=client_side_incremental_sync,
2978+
file_uploader=file_uploader,
29652979
)
29662980

29672981
query_properties: Optional[QueryProperties] = None
@@ -3623,6 +3637,30 @@ def create_fixed_window_call_rate_policy(
36233637
matchers=matchers,
36243638
)
36253639

3640+
def create_file_uploader(
3641+
self, model: FileUploaderModel, config: Config, **kwargs: Any
3642+
) -> FileUploader:
3643+
name = "File Uploader"
3644+
requester = self._create_component_from_model(
3645+
model=model.requester,
3646+
config=config,
3647+
name=name,
3648+
**kwargs,
3649+
)
3650+
download_target_extractor = self._create_component_from_model(
3651+
model=model.download_target_extractor,
3652+
config=config,
3653+
name=name,
3654+
**kwargs,
3655+
)
3656+
return FileUploader(
3657+
requester=requester,
3658+
download_target_extractor=download_target_extractor,
3659+
config=config,
3660+
parameters=model.parameters or {},
3661+
filename_extractor=model.filename_extractor if model.filename_extractor else None,
3662+
)
3663+
36263664
def create_moving_window_call_rate_policy(
36273665
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
36283666
) -> MovingWindowCallRatePolicy:

0 commit comments

Comments
 (0)