Skip to content

Commit 9fb3b8c

Browse files
committed
file-mode-api: limit schema on cdk side with a model
1 parent 8701c27 commit 9fb3b8c

File tree

6 files changed

+54
-23
lines changed

6 files changed

+54
-23
lines changed

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from enum import Enum
99
from io import IOBase
1010
from os import makedirs, path
11-
from typing import Any, Dict, Iterable, List, Optional, Set
11+
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
1212

1313
from wcmatch.glob import GLOBSTAR, globmatch
1414

@@ -19,6 +19,7 @@
1919
preserve_directory_structure,
2020
use_file_transfer,
2121
)
22+
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
2223
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
2324

2425

@@ -151,7 +152,7 @@ def include_identities_stream(self) -> bool:
151152
@abstractmethod
152153
def upload(
153154
self, file: RemoteFile, local_directory: str, logger: logging.Logger
154-
) -> AirbyteRecordMessageFileReference:
155+
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
155156
"""
156157
This is required for connectors that will support writing to
157158
files. It will handle the logic to download,get,read,acquire or
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from datetime import datetime
6+
from typing import Optional
7+
8+
from pydantic.v1 import BaseModel
9+
10+
11+
class FileRecordData(BaseModel):
12+
"""
13+
A record in a file-based stream.
14+
"""
15+
16+
folder: str
17+
filename: str
18+
bytes: int = None
19+
20+
id: Optional[str] = None
21+
created_at: Optional[int] = None
22+
updated_at: Optional[int] = None
23+
mime_type: Optional[str] = None
24+
description: Optional[str] = None

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44
import logging
5-
from typing import Any, Dict, Iterable
5+
from typing import Any, Dict, Iterable, Tuple
66

77
from airbyte_cdk.models import AirbyteRecordMessageFileReference
88
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
9+
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
910
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
1011
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1112

@@ -19,7 +20,7 @@ def upload(
1920
file: RemoteFile,
2021
stream_reader: AbstractFileBasedStreamReader,
2122
logger: logging.Logger,
22-
) -> Iterable[AirbyteRecordMessageFileReference]:
23+
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:
2324
try:
2425
yield stream_reader.upload(
2526
file=file, local_directory=self._local_directory, logger=logger

airbyte_cdk/sources/file_based/schema_helpers.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}
2121

22-
# we use the File attributes for metadata records.
2322
file_transfer_schema = {
2423
"type": "object",
2524
"properties": {
26-
"uri": {"type": "string"},
27-
"last_modified": {"type": "string"},
25+
"folder": {"type": "string"},
26+
"file_name": {"type": "string"},
27+
"bytes": {"type": "integer"},
28+
"id": {"type": ["null", "string"]},
29+
"created_at": {"type": ["null", "integer"]},
30+
"updated_at": {"type": ["null", "integer"]},
2831
"mime_type": {"type": ["null", "string"]},
32+
"description": {"type": ["null", "string"]},
2933
},
3034
}
3135

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,7 @@ def _filter_schema_invalid_properties(
9797
self, configured_catalog_json_schema: Dict[str, Any]
9898
) -> Dict[str, Any]:
9999
if self.use_file_transfer:
100-
return {
101-
"type": "object",
102-
"properties": {
103-
"file_path": {"type": "string"},
104-
"file_size": {"type": "string"},
105-
self.ab_file_name_col: {"type": "string"},
106-
},
107-
}
100+
return file_transfer_schema
108101
else:
109102
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)
110103

@@ -166,12 +159,13 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
166159

167160
try:
168161
if self.use_file_transfer:
169-
for file_reference in file_transfer.upload(
162+
for file_record_data, file_reference in file_transfer.upload(
170163
file=file, stream_reader=self.stream_reader, logger=self.logger
171164
):
172-
record = self.transform_record({}, file, file_datetime_string)
173165
yield stream_data_to_airbyte_message(
174-
self.name, record, file_reference=file_reference
166+
self.name,
167+
file_record_data.dict(exclude_none=True),
168+
file_reference=file_reference,
175169
)
176170
else:
177171
for record in parser.parse_records(

unit_tests/sources/file_based/stream/test_default_file_based_stream.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
FileBasedSourceError,
3131
)
3232
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
33+
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
3334
from airbyte_cdk.sources.file_based.file_types import FileTransfer
3435
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
3536
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -281,6 +282,11 @@ def test_yield_and_raise_collected(self) -> None:
281282

282283
class DefaultFileBasedStreamFileTransferTest(unittest.TestCase):
283284
_NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
285+
_A_FILE_RECORD_DATA = FileRecordData(
286+
folder="/absolute/path/",
287+
filename="file.csv",
288+
bytes=10,
289+
)
284290
_A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference(
285291
file_size_bytes=10,
286292
source_file_relative_path="relative/path/file.csv",
@@ -316,7 +322,9 @@ def setUp(self) -> None:
316322
def test_when_read_records_from_slice_then_return_records(self) -> None:
317323
"""Verify that we have the new file method and data is empty"""
318324
with mock.patch.object(
319-
FileTransfer, "upload", return_value=[self._A_FILE_REFERENCE_MESSAGE]
325+
FileTransfer,
326+
"upload",
327+
return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)],
320328
):
321329
remote_file = RemoteFile(uri="uri", last_modified=self._NOW)
322330
messages = list(self._stream.read_records_from_slice({"files": [remote_file]}))
@@ -326,10 +334,9 @@ def test_when_read_records_from_slice_then_return_records(self) -> None:
326334
]
327335
assert list(map(lambda message: message.record.data, messages)) == [
328336
{
329-
"_ab_source_file_last_modified": remote_file.last_modified.strftime(
330-
"%Y-%m-%dT%H:%M:%S.%fZ"
331-
),
332-
"_ab_source_file_url": remote_file.uri,
337+
"bytes": 10,
338+
"filename": "file.csv",
339+
"folder": "/absolute/path/",
333340
}
334341
]
335342

0 commit comments

Comments
 (0)