Skip to content

Commit 2e6d92e

Browse files
committed
file-mode-api: move file uploader to record selector level.
1 parent 962ddbe commit 2e6d92e

File tree

6 files changed

+44
-51
lines changed

6 files changed

+44
-51
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,9 @@ def _group_streams(
207207
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
208208
# so we need to treat them as synchronous
209209

210-
file_uploader = None
211-
if isinstance(declarative_stream, DeclarativeStream):
212-
file_uploader = (
213-
self._constructor.create_component(
214-
model_type=FileUploader,
215-
component_definition=name_to_stream_mapping[declarative_stream.name][
216-
"file_uploader"
217-
],
218-
config=config,
219-
)
220-
if "file_uploader" in name_to_stream_mapping[declarative_stream.name]
221-
else None
222-
)
210+
supports_file_transfer = (
211+
"file_uploader" in name_to_stream_mapping[declarative_stream.name]
212+
)
223213

224214
if (
225215
isinstance(declarative_stream, DeclarativeStream)
@@ -288,7 +278,6 @@ def _group_streams(
288278
declarative_stream.get_json_schema(),
289279
retriever,
290280
self.message_repository,
291-
file_uploader,
292281
),
293282
stream_slicer=declarative_stream.retriever.stream_slicer,
294283
)
@@ -319,7 +308,6 @@ def _group_streams(
319308
declarative_stream.get_json_schema(),
320309
retriever,
321310
self.message_repository,
322-
file_uploader,
323311
),
324312
stream_slicer=cursor,
325313
)
@@ -339,7 +327,7 @@ def _group_streams(
339327
else None,
340328
logger=self.logger,
341329
cursor=cursor,
342-
supports_file_transfer=bool(file_uploader),
330+
supports_file_transfer=supports_file_transfer,
343331
)
344332
)
345333
elif (
@@ -351,7 +339,6 @@ def _group_streams(
351339
declarative_stream.get_json_schema(),
352340
declarative_stream.retriever,
353341
self.message_repository,
354-
file_uploader,
355342
),
356343
declarative_stream.retriever.stream_slicer,
357344
)
@@ -372,7 +359,7 @@ def _group_streams(
372359
cursor_field=None,
373360
logger=self.logger,
374361
cursor=final_state_cursor,
375-
supports_file_transfer=bool(file_uploader),
362+
supports_file_transfer=supports_file_transfer,
376363
)
377364
)
378365
elif (
@@ -412,7 +399,6 @@ def _group_streams(
412399
declarative_stream.get_json_schema(),
413400
retriever,
414401
self.message_repository,
415-
file_uploader,
416402
),
417403
perpartition_cursor,
418404
)
@@ -427,7 +413,7 @@ def _group_streams(
427413
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
428414
logger=self.logger,
429415
cursor=perpartition_cursor,
430-
supports_file_transfer=bool(file_uploader),
416+
supports_file_transfer=supports_file_transfer,
431417
)
432418
)
433419
else:

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1919
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2020
from airbyte_cdk.sources.utils.transform import TypeTransformer
21+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
2122

2223

2324
@dataclass
@@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
4243
record_filter: Optional[RecordFilter] = None
4344
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4445
transform_before_filtering: bool = False
46+
file_uploader: Optional[FileUploader] = None
4547

4648
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4749
self._parameters = parameters
@@ -117,7 +119,10 @@ def filter_and_transform(
117119
transformed_filtered_data, schema=records_schema
118120
)
119121
for data in normalized_data:
120-
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122+
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
123+
if self.file_uploader:
124+
self.file_uploader.upload(record)
125+
yield record
121126

122127
def _normalize_by_schema(
123128
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1989,6 +1989,22 @@ class Config:
19891989
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
19901990

19911991

1992+
class FileUploader(BaseModel):
1993+
type: Literal["FileUploader"]
1994+
requester: Union[CustomRequester, HttpRequester] = Field(
1995+
...,
1996+
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
1997+
)
1998+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
1999+
...,
2000+
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2001+
)
2002+
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2003+
None,
2004+
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2005+
)
2006+
2007+
19922008
class DeclarativeStream(BaseModel):
19932009
class Config:
19942010
extra = Extra.allow
@@ -2047,6 +2063,11 @@ class Config:
20472063
description="Array of state migrations to be applied on the input state",
20482064
title="State Migrations",
20492065
)
2066+
file_uploader: Optional[FileUploader] = Field(
2067+
None,
2068+
description="(experimental) Describes how to fetch a file",
2069+
title="File Uploader",
2070+
)
20502071
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20512072

20522073

@@ -2278,22 +2299,6 @@ class StateDelegatingStream(BaseModel):
22782299
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22792300

22802301

2281-
class FileUploader(BaseModel):
2282-
type: Literal["FileUploader"]
2283-
requester: Union[CustomRequester, HttpRequester] = Field(
2284-
...,
2285-
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
2286-
)
2287-
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2288-
...,
2289-
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2290-
)
2291-
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2292-
None,
2293-
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2294-
)
2295-
2296-
22972302
class SimpleRetriever(BaseModel):
22982303
type: Literal["SimpleRetriever"]
22992304
record_selector: RecordSelector = Field(
@@ -2324,11 +2329,6 @@ class SimpleRetriever(BaseModel):
23242329
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
23252330
title="Partition Router",
23262331
)
2327-
file_uploader: Optional[FileUploader] = Field(
2328-
None,
2329-
description="(experimental) Describes how to fetch a file",
2330-
title="File Uploader",
2331-
)
23322332
decoder: Optional[
23332333
Union[
23342334
CustomDecoder,
@@ -2485,6 +2485,7 @@ class DynamicDeclarativeStream(BaseModel):
24852485
DeclarativeSource1.update_forward_refs()
24862486
DeclarativeSource2.update_forward_refs()
24872487
SelectiveAuthenticator.update_forward_refs()
2488+
FileUploader.update_forward_refs()
24882489
DeclarativeStream.update_forward_refs()
24892490
SessionTokenAuthenticator.update_forward_refs()
24902491
DynamicSchemaLoader.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,11 @@ def create_declarative_stream(
17551755
transformations.append(
17561756
self._create_component_from_model(model=transformation_model, config=config)
17571757
)
1758+
file_uploader = None
1759+
if model.file_uploader:
1760+
file_uploader = self._create_component_from_model(
1761+
model=model.file_uploader, config=config
1762+
)
17581763

17591764
retriever = self._create_component_from_model(
17601765
model=model.retriever,
@@ -1766,6 +1771,7 @@ def create_declarative_stream(
17661771
stop_condition_on_cursor=stop_condition_on_cursor,
17671772
client_side_incremental_sync=client_side_incremental_sync,
17681773
transformations=transformations,
1774+
file_uploader=file_uploader,
17691775
incremental_sync=model.incremental_sync,
17701776
)
17711777
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
@@ -2607,6 +2613,7 @@ def create_record_selector(
26072613
transformations: List[RecordTransformation] | None = None,
26082614
decoder: Decoder | None = None,
26092615
client_side_incremental_sync: Dict[str, Any] | None = None,
2616+
file_uploader: Optional[FileUploader] = None,
26102617
**kwargs: Any,
26112618
) -> RecordSelector:
26122619
extractor = self._create_component_from_model(
@@ -2644,6 +2651,7 @@ def create_record_selector(
26442651
config=config,
26452652
record_filter=record_filter,
26462653
transformations=transformations or [],
2654+
file_uploader=file_uploader,
26472655
schema_normalization=schema_normalization,
26482656
parameters=model.parameters or {},
26492657
transform_before_filtering=transform_before_filtering,
@@ -2701,6 +2709,7 @@ def create_simple_retriever(
27012709
stop_condition_on_cursor: bool = False,
27022710
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
27032711
transformations: List[RecordTransformation],
2712+
file_uploader: Optional[FileUploader] = None,
27042713
incremental_sync: Optional[
27052714
Union[
27062715
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -2723,6 +2732,7 @@ def create_simple_retriever(
27232732
decoder=decoder,
27242733
transformations=transformations,
27252734
client_side_incremental_sync=client_side_incremental_sync,
2735+
file_uploader=file_uploader,
27262736
)
27272737
url_base = (
27282738
model.requester.url_base

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def __init__(
2424
self._content_extractor = content_extractor
2525

2626
def upload(self, record: Record) -> None:
27-
# TODO validate record shape - is the transformation applied at this point?
2827
mocked_response = SafeResponse()
2928
mocked_response.content = json.dumps(record.data).encode("utf-8")
3029
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import Any, Iterable, Mapping, Optional
44

55
from airbyte_cdk.sources.declarative.retrievers import Retriever
6-
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
76
from airbyte_cdk.sources.message import MessageRepository
87
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
98
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
@@ -19,7 +18,6 @@ def __init__(
1918
json_schema: Mapping[str, Any],
2019
retriever: Retriever,
2120
message_repository: MessageRepository,
22-
file_uploader: Optional[FileUploader] = None,
2321
) -> None:
2422
"""
2523
The DeclarativePartitionFactory takes a retriever_factory and not a retriever directly. The reason is that our components are not
@@ -30,15 +28,13 @@ def __init__(
3028
self._json_schema = json_schema
3129
self._retriever = retriever
3230
self._message_repository = message_repository
33-
self._file_uploader = file_uploader
3431

3532
def create(self, stream_slice: StreamSlice) -> Partition:
3633
return DeclarativePartition(
3734
self._stream_name,
3835
self._json_schema,
3936
self._retriever,
4037
self._message_repository,
41-
self._file_uploader,
4238
stream_slice,
4339
)
4440

@@ -50,14 +46,12 @@ def __init__(
5046
json_schema: Mapping[str, Any],
5147
retriever: Retriever,
5248
message_repository: MessageRepository,
53-
file_uploader: Optional[FileUploader],
5449
stream_slice: StreamSlice,
5550
):
5651
self._stream_name = stream_name
5752
self._json_schema = json_schema
5853
self._retriever = retriever
5954
self._message_repository = message_repository
60-
self._file_uploader = file_uploader
6155
self._stream_slice = stream_slice
6256
self._hash = SliceHasher.hash(self._stream_name, self._stream_slice)
6357

@@ -73,8 +67,6 @@ def read(self) -> Iterable[Record]:
7367
associated_slice=self._stream_slice,
7468
)
7569
)
76-
if self._file_uploader:
77-
self._file_uploader.upload(record)
7870
yield record
7971
else:
8072
self._message_repository.emit_message(stream_data)

0 commit comments

Comments
 (0)