Skip to content

Commit 3452003

Browse files
feat: add Raw Files parser option for File CDK
Co-Authored-By: Aaron <AJ> Steers <[email protected]>
1 parent d4fdd4f commit 3452003

File tree

5 files changed

+121
-1
lines changed

5 files changed

+121
-1
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from .avro_format import AvroFormat
2+
from .csv_format import CsvFormat
3+
from .excel_format import ExcelFormat
4+
from .file_based_stream_config import FileBasedStreamConfig
5+
from .jsonl_format import JsonlFormat
6+
from .parquet_format import ParquetFormat
7+
from .raw_format import RawFormat
8+
from .unstructured_format import UnstructuredFormat

airbyte_cdk/sources/file_based/config/file_based_stream_config.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
1313
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
1414
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
15+
from airbyte_cdk.sources.file_based.config.raw_format import RawFormat
1516
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
1617
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
1718
from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
@@ -58,7 +59,13 @@ class FileBasedStreamConfig(BaseModel):
5859
default=3,
5960
)
6061
format: Union[
61-
AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
62+
AvroFormat,
63+
CsvFormat,
64+
JsonlFormat,
65+
ParquetFormat,
66+
RawFormat,
67+
UnstructuredFormat,
68+
ExcelFormat,
6269
] = Field(
6370
title="Format",
6471
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from pydantic.v1 import BaseModel, Field
6+
7+
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
8+
9+
10+
class RawFormat(BaseModel):
11+
class Config(OneOfOptionConfig):
12+
title = "Raw Files Format"
13+
description = "Use this format when you want to copy files without parsing them. Must be used with the 'Copy Raw Files' delivery method."
14+
discriminator = "filetype"
15+
16+
filetype: str = Field(
17+
"raw",
18+
const=True,
19+
)

airbyte_cdk/sources/file_based/file_types/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
66
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
77
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
8+
from airbyte_cdk.sources.file_based.config.raw_format import RawFormat
89
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
910

1011
from .avro_parser import AvroParser
@@ -14,6 +15,7 @@
1415
from .file_type_parser import FileTypeParser
1516
from .jsonl_parser import JsonlParser
1617
from .parquet_parser import ParquetParser
18+
from .raw_parser import RawParser
1719
from .unstructured_parser import UnstructuredParser
1820

1921
default_parsers: Mapping[Type[Any], FileTypeParser] = {
@@ -22,6 +24,7 @@
2224
ExcelFormat: ExcelParser(),
2325
JsonlFormat: JsonlParser(),
2426
ParquetFormat: ParquetParser(),
27+
RawFormat: RawParser(),
2528
UnstructuredFormat: UnstructuredParser(),
2629
}
2730

@@ -31,6 +34,7 @@
3134
"ExcelParser",
3235
"JsonlParser",
3336
"ParquetParser",
37+
"RawParser",
3438
"UnstructuredParser",
3539
"FileTransfer",
3640
"default_parsers",
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
7+
8+
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
9+
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
10+
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
11+
AbstractFileBasedStreamReader,
12+
FileReadMode,
13+
)
14+
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
15+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
16+
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
17+
18+
19+
class RawParser(FileTypeParser):
20+
"""
21+
A parser that doesn't actually parse files. It's designed to be used with the "Copy Raw Files" delivery method.
22+
"""
23+
24+
@property
25+
def parser_max_n_files_for_schema_inference(self) -> Optional[int]:
26+
"""
27+
Just check one file as the schema is static
28+
"""
29+
return 1
30+
31+
@property
32+
def parser_max_n_files_for_parsability(self) -> Optional[int]:
33+
"""
34+
Do not check any files for parsability since we're not actually parsing them
35+
"""
36+
return 0
37+
38+
def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
39+
"""
40+
Verify that this parser is only used with the "Raw Files" format.
41+
The validation that this format is only used with "Copy Raw Files" delivery method
42+
will be handled at a higher level in the availability strategy.
43+
"""
44+
return True, None
45+
46+
async def infer_schema(
47+
self,
48+
config: FileBasedStreamConfig,
49+
file: RemoteFile,
50+
stream_reader: AbstractFileBasedStreamReader,
51+
logger: logging.Logger,
52+
) -> SchemaType:
53+
"""
54+
Return a minimal schema since we're not actually parsing the files.
55+
"""
56+
return {}
57+
58+
def parse_records(
59+
self,
60+
config: FileBasedStreamConfig,
61+
file: RemoteFile,
62+
stream_reader: AbstractFileBasedStreamReader,
63+
logger: logging.Logger,
64+
discovered_schema: Optional[Mapping[str, SchemaType]],
65+
) -> Iterable[Dict[str, Any]]:
66+
"""
67+
This method should never be called since we're using the "Copy Raw Files" delivery method.
68+
"""
69+
70+
# This is a safeguard in case this method is called
71+
# Since we're not actually parsing files, just return an empty iterator
72+
# The validation that this format is only used with "Copy Raw Files" delivery method
73+
# will be handled at a higher level in the availability strategy
74+
# Return an empty iterable
75+
return iter([])
76+
77+
@property
78+
def file_read_mode(self) -> FileReadMode:
79+
"""
80+
We don't actually read the files, but if we did, we'd use binary mode.
81+
"""
82+
return FileReadMode.READ_BINARY

0 commit comments

Comments
 (0)