diff --git a/airbyte_cdk/models/airbyte_protocol.py b/airbyte_cdk/models/airbyte_protocol.py index 2528f7d7e..5c5624428 100644 --- a/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte_cdk/models/airbyte_protocol.py @@ -8,8 +8,6 @@ from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*' from serpyco_rs.metadata import Alias -from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage - # ruff: noqa: F405 # ignore fuzzy import issues with 'import *' @@ -84,7 +82,7 @@ class AirbyteMessage: spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined] connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined] catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined] - record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined] + record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined] state: Optional[AirbyteStateMessage] = None trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined] control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined] diff --git a/airbyte_cdk/models/file_transfer_record_message.py b/airbyte_cdk/models/file_transfer_record_message.py deleted file mode 100644 index dcc1b7a92..000000000 --- a/airbyte_cdk/models/file_transfer_record_message.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. - -from dataclasses import dataclass -from typing import Any, Dict, Optional - - -@dataclass -class AirbyteFileTransferRecordMessage: - stream: str - file: Dict[str, Any] - emitted_at: int - namespace: Optional[str] = None - data: Optional[Dict[str, Any]] = None diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index bb8b5ebfb..09bd921e1 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -149,7 +149,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]: message = stream_data_to_airbyte_message( stream_name=record.stream_name, data_or_message=record.data, - is_file_transfer_message=record.is_file_transfer_message, file_reference=record.file_reference, ) stream = self._stream_name_to_instance[record.stream_name] diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 77b1f05bf..48b8e2641 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -83,7 +83,7 @@ def upload(self, record: Record) -> None: logger.info(f"File relative path: {str(file_relative_path)}") record.file_reference = AirbyteRecordMessageFileReference( - file_url=str(full_path), - file_relative_path=str(file_relative_path), + staging_file_url=str(full_path), + source_file_relative_path=str(file_relative_path), file_size_bytes=file_size_bytes, ) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index cbf3d119b..a5fe44d42 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -8,16 +8,18 @@ from enum import Enum from io import IOBase from os import makedirs, path -from typing import Any, Dict, Iterable, List, Optional, Set +from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple from wcmatch.glob import GLOBSTAR, globmatch +from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import ( include_identities_stream, preserve_directory_structure, use_file_transfer, ) +from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -28,6 +30,10 @@ class FileReadMode(Enum): class AbstractFileBasedStreamReader(ABC): DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" + FILE_RELATIVE_PATH = "file_relative_path" + FILE_NAME = "file_name" + LOCAL_FILE_PATH = "local_file_path" + FILE_FOLDER = "file_folder" def __init__(self) -> None: self._config = None @@ -148,9 +154,9 @@ def include_identities_stream(self) -> bool: return False @abstractmethod - def get_file( + def upload( self, file: RemoteFile, local_directory: str, logger: logging.Logger - ) -> Dict[str, Any]: + ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]: """ This is required for connectors that will support writing to files. It will handle the logic to download,get,read,acquire or @@ -162,25 +168,41 @@ def get_file( logger (logging.Logger): Logger for logging information and errors. Returns: - dict: A dictionary containing the following: - - "file_url" (str): The absolute path of the downloaded file. - - "bytes" (int): The file size in bytes. - - "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as - this a mounted volume in the pod container. - + AirbyteRecordMessageFileReference: A file reference object containing: + - staging_file_url (str): The absolute path to the referenced file in the staging area. + - file_size_bytes (int): The size of the referenced file in bytes. + - source_file_relative_path (str): The relative path to the referenced file in source. """ ... - def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]: + def _get_file_transfer_paths( + self, source_file_relative_path: str, staging_directory: str + ) -> MutableMapping[str, Any]: + """ + This method is used to get the file transfer paths for a given source file relative path and local directory. + It returns a dictionary with the following keys: + - FILE_RELATIVE_PATH: The relative path to file in reference to the staging directory. + - LOCAL_FILE_PATH: The absolute path to the file. + - FILE_NAME: The name of the referenced file. + - FILE_FOLDER: The folder of the referenced file. + """ preserve_directory_structure = self.preserve_directory_structure() + + file_name = path.basename(source_file_relative_path) + file_folder = path.dirname(source_file_relative_path) if preserve_directory_structure: # Remove left slashes from source path format to make relative path for writing locally - file_relative_path = file.uri.lstrip("/") + file_relative_path = source_file_relative_path.lstrip("/") else: - file_relative_path = path.basename(file.uri) - local_file_path = path.join(local_directory, file_relative_path) - + file_relative_path = file_name + local_file_path = path.join(staging_directory, file_relative_path) # Ensure the local directory exists makedirs(path.dirname(local_file_path), exist_ok=True) - absolute_file_path = path.abspath(local_file_path) - return [file_relative_path, local_file_path, absolute_file_path] + + file_paths = { + self.FILE_RELATIVE_PATH: file_relative_path, + self.LOCAL_FILE_PATH: local_file_path, + self.FILE_NAME: file_name, + self.FILE_FOLDER: file_folder, + } + return file_paths diff --git a/airbyte_cdk/sources/file_based/file_record_data.py b/airbyte_cdk/sources/file_based/file_record_data.py new file mode 100644 index 000000000..7051cf057 --- /dev/null +++ b/airbyte_cdk/sources/file_based/file_record_data.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime +from typing import Optional + +from pydantic.v1 import BaseModel + + +class FileRecordData(BaseModel): + """ + A record in a file-based stream. + """ + + folder: str + filename: str + bytes: int + source_uri: str + id: Optional[str] = None + created_at: Optional[str] = None + updated_at: Optional[str] = None + mime_type: Optional[str] = None diff --git a/airbyte_cdk/sources/file_based/file_types/file_transfer.py b/airbyte_cdk/sources/file_based/file_types/file_transfer.py index 0c2855d41..ddc70e4b9 100644 --- a/airbyte_cdk/sources/file_based/file_types/file_transfer.py +++ b/airbyte_cdk/sources/file_based/file_types/file_transfer.py @@ -2,11 +2,11 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # import logging -import os -from typing import Any, Dict, Iterable +from typing import Iterable, Tuple -from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig +from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.utils.files_directory import get_files_directory @@ -15,15 +15,14 @@ class FileTransfer: def __init__(self) -> None: self._local_directory = get_files_directory() - def get_file( + def upload( self, - config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, - ) -> Iterable[Dict[str, Any]]: + ) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]: try: - yield stream_reader.get_file( + yield stream_reader.upload( file=file, local_directory=self._local_directory, logger=logger ) except Exception as ex: diff --git a/airbyte_cdk/sources/file_based/schema_helpers.py b/airbyte_cdk/sources/file_based/schema_helpers.py index 1b653db67..e1ce68062 100644 --- a/airbyte_cdk/sources/file_based/schema_helpers.py +++ b/airbyte_cdk/sources/file_based/schema_helpers.py @@ -18,9 +18,19 @@ SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]] schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}} + file_transfer_schema = { "type": "object", - "properties": {"data": {"type": "object"}, "file": {"type": "object"}}, + "properties": { + "folder": {"type": "string"}, + "file_name": {"type": "string"}, + "source_uri": {"type": "string"}, + "bytes": {"type": "integer"}, + "id": {"type": ["null", "string"]}, + "created_at": {"type": ["null", "string"]}, + "updated_at": {"type": ["null", "string"]}, + "mime_type": {"type": ["null", "string"]}, + }, } diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index f02602d58..c36e5179d 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -4,7 +4,7 @@ import copy import logging -from functools import cache, lru_cache +from functools import lru_cache from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union from typing_extensions import deprecated @@ -258,19 +258,14 @@ def read(self) -> Iterable[Record]: and record_data.record is not None ): # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued - # If stream is flagged for file_transfer the record should data in file key - record_message_data = ( - record_data.record.file - if self._use_file_transfer() - else record_data.record.data - ) + record_message_data = record_data.record.data if not record_message_data: raise ExceptionWithDisplayMessage("A record without data was found") else: yield Record( data=record_message_data, stream_name=self.stream_name(), - is_file_transfer_message=self._use_file_transfer(), + file_reference=record_data.record.file_reference, ) else: self._message_repository.emit_message(record_data) @@ -306,10 +301,6 @@ def __hash__(self) -> int: def stream_name(self) -> str: return self._stream.name - @cache - def _use_file_transfer(self) -> bool: - return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer - def __repr__(self) -> str: return f"FileBasedStreamPartition({self._stream.name}, {self._slice})" diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 42d01577c..0e7121325 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -11,7 +11,7 @@ from os import path from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.exceptions import ( @@ -56,6 +56,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): airbyte_columns = [ab_last_mod_col, ab_file_name_col] use_file_transfer = False preserve_directory_structure = True + _file_transfer = FileTransfer() def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: @@ -93,21 +94,6 @@ def primary_key(self) -> PrimaryKeyType: self.config ) - def _filter_schema_invalid_properties( - self, configured_catalog_json_schema: Dict[str, Any] - ) -> Dict[str, Any]: - if self.use_file_transfer: - return { - "type": "object", - "properties": { - "file_path": {"type": "string"}, - "file_size": {"type": "string"}, - self.ab_file_name_col: {"type": "string"}, - }, - } - else: - return super()._filter_schema_invalid_properties(configured_catalog_json_schema) - def _duplicated_files_names( self, slices: List[dict[str, List[RemoteFile]]] ) -> List[dict[str, List[str]]]: @@ -145,14 +131,6 @@ def transform_record( record[self.ab_file_name_col] = file.uri return record - def transform_record_for_file_transfer( - self, record: dict[str, Any], file: RemoteFile - ) -> dict[str, Any]: - # timstamp() returns a float representing the number of seconds since the unix epoch - record[self.modified] = int(file.last_modified.timestamp()) * 1000 - record[self.source_file_url] = file.uri - return record - def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: """ Yield all records from all remote files in `list_files_for_this_sync`. @@ -173,19 +151,13 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte try: if self.use_file_transfer: - self.logger.info(f"{self.name}: {file} file-based syncing") - # todo: complete here the code to not rely on local parser - file_transfer = FileTransfer() - for record in file_transfer.get_file( - self.config, file, self.stream_reader, self.logger + for file_record_data, file_reference in self._file_transfer.upload( + file=file, stream_reader=self.stream_reader, logger=self.logger ): - line_no += 1 - if not self.record_passes_validation_policy(record): - n_skipped += 1 - continue - record = self.transform_record_for_file_transfer(record, file) yield stream_data_to_airbyte_message( - self.name, record, is_file_transfer_message=True + self.name, + file_record_data.dict(exclude_none=True), + file_reference=file_reference, ) else: for record in parser.parse_records( @@ -259,6 +231,8 @@ def cursor_field(self) -> Union[str, List[str]]: @cache def get_json_schema(self) -> JsonSchema: + if self.use_file_transfer: + return file_transfer_schema extra_fields = { self.ab_last_mod_col: {"type": "string"}, self.ab_file_name_col: {"type": "string"}, @@ -282,9 +256,7 @@ def get_json_schema(self) -> JsonSchema: return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} def _get_raw_json_schema(self) -> JsonSchema: - if self.use_file_transfer: - return file_transfer_schema - elif self.config.input_schema: + if self.config.input_schema: return self.config.get_input_schema() # type: ignore elif self.config.schemaless: return schemaless_schema @@ -341,6 +313,11 @@ def get_files(self) -> Iterable[RemoteFile]: self.config.globs or [], self.config.legacy_prefix, self.logger ) + def as_airbyte_stream(self) -> AirbyteStream: + file_stream = super().as_airbyte_stream() + file_stream.is_file_based = self.use_file_transfer + return file_stream + def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: loop = asyncio.get_event_loop() schema = loop.run_until_complete(self._infer_schema(files)) diff --git a/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py index 52003c7ae..3136d9056 100644 --- a/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py @@ -61,9 +61,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte permissions_record = self.transform_record( permissions_record, file, file_datetime_string ) - yield stream_data_to_airbyte_message( - self.name, permissions_record, is_file_transfer_message=False - ) + yield stream_data_to_airbyte_message(self.name, permissions_record) except Exception as e: self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}") yield AirbyteMessage( diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index af40f2560..8feba835e 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -24,13 +24,11 @@ def __init__( data: Mapping[str, Any], stream_name: str, associated_slice: Optional[StreamSlice] = None, - is_file_transfer_message: bool = False, file_reference: Optional[AirbyteRecordMessageFileReference] = None, ): self._data = data self._associated_slice = associated_slice self.stream_name = stream_name - self.is_file_transfer_message = is_file_transfer_message self._file_reference = file_reference @property @@ -46,7 +44,7 @@ def file_reference(self) -> AirbyteRecordMessageFileReference: return self._file_reference @file_reference.setter - def file_reference(self, value: AirbyteRecordMessageFileReference): + def file_reference(self, value: AirbyteRecordMessageFileReference) -> None: self._file_reference = value def __repr__(self) -> str: diff --git a/airbyte_cdk/sources/utils/record_helper.py b/airbyte_cdk/sources/utils/record_helper.py index d41907cf1..d05321f4a 100644 --- a/airbyte_cdk/sources/utils/record_helper.py +++ b/airbyte_cdk/sources/utils/record_helper.py @@ -13,7 +13,6 @@ AirbyteTraceMessage, ) from airbyte_cdk.models import Type as MessageType -from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -23,7 +22,6 @@ def stream_data_to_airbyte_message( data_or_message: StreamData, transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), schema: Optional[Mapping[str, Any]] = None, - is_file_transfer_message: bool = False, file_reference: Optional[AirbyteRecordMessageFileReference] = None, ) -> AirbyteMessage: if schema is None: @@ -38,17 +36,12 @@ def stream_data_to_airbyte_message( # taken unless configured. See # docs/connector-development/cdk-python/schemas.md for details. transformer.transform(data, schema) - if is_file_transfer_message: - message = AirbyteFileTransferRecordMessage( - stream=stream_name, file=data, emitted_at=now_millis, data={} - ) - else: - message = AirbyteRecordMessage( - stream=stream_name, - data=data, - emitted_at=now_millis, - file_reference=file_reference, - ) + message = AirbyteRecordMessage( + stream=stream_name, + data=data, + emitted_at=now_millis, + file_reference=file_reference, + ) return AirbyteMessage(type=MessageType.RECORD, record=message) case AirbyteTraceMessage(): return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index f8e85bfb0..43c84204a 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]: def state_messages(self) -> List[AirbyteMessage]: return self._get_message_by_types([Type.STATE]) + @property + def connection_status_messages(self) -> List[AirbyteMessage]: + return self._get_message_by_types([Type.CONNECTION_STATUS]) + @property def most_recent_state(self) -> Any: state_messages = self._get_message_by_types([Type.STATE]) diff --git a/airbyte_cdk/test/mock_http/response_builder.py b/airbyte_cdk/test/mock_http/response_builder.py index 41766af1b..fd67461da 100644 --- a/airbyte_cdk/test/mock_http/response_builder.py +++ b/airbyte_cdk/test/mock_http/response_builder.py @@ -198,6 +198,14 @@ def find_template(resource: str, execution_folder: str) -> Dict[str, Any]: return json.load(template_file) # type: ignore # we assume the dev correctly set up the resource file +def find_binary_response(resource: str, execution_folder: str) -> bytes: + response_filepath = str( + get_unit_test_folder(execution_folder) / "resource" / "http" / "response" / f"{resource}" + ) + with open(response_filepath, "rb") as response_file: + return response_file.read() # type: ignore # we assume the dev correctly set up the resource file + + def create_record_builder( response_template: Dict[str, Any], records_path: Union[FieldPath, NestedPath], diff --git a/airbyte_cdk/test/standard_tests/__init__.py b/airbyte_cdk/test/standard_tests/__init__.py new file mode 100644 index 000000000..c6aeaaf1c --- /dev/null +++ b/airbyte_cdk/test/standard_tests/__init__.py @@ -0,0 +1,46 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +'''FAST Airbyte Standard Tests + +This module provides a set of base classes for declarative connector test suites. +The goal of this module is to provide a robust and extensible framework for testing Airbyte +connectors. + +Example usage: + +```python +# `test_airbyte_standards.py` +from airbyte_cdk.test import standard_tests + +pytest_plugins = [ + "airbyte_cdk.test.standard_tests.pytest_hooks", +] + + +class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite): + """Test suite for the source.""" +``` + +Available test suites base classes: +- `DeclarativeSourceTestSuite`: A test suite for declarative sources. +- `SourceTestSuiteBase`: A test suite for sources. +- `DestinationTestSuiteBase`: A test suite for destinations. + +''' + +from airbyte_cdk.test.standard_tests.connector_base import ( + ConnectorTestScenario, + ConnectorTestSuiteBase, +) +from airbyte_cdk.test.standard_tests.declarative_sources import ( + DeclarativeSourceTestSuite, +) +from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase +from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase + +__all__ = [ + "ConnectorTestScenario", + "ConnectorTestSuiteBase", + "DeclarativeSourceTestSuite", + "DestinationTestSuiteBase", + "SourceTestSuiteBase", +] diff --git a/airbyte_cdk/test/standard_tests/_job_runner.py b/airbyte_cdk/test/standard_tests/_job_runner.py new file mode 100644 index 000000000..bab170361 --- /dev/null +++ b/airbyte_cdk/test/standard_tests/_job_runner.py @@ -0,0 +1,159 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Job runner for Airbyte Standard Tests.""" + +import logging +import tempfile +import uuid +from dataclasses import asdict +from pathlib import Path +from typing import Any, Callable, Literal + +import orjson +from typing_extensions import Protocol, runtime_checkable + +from airbyte_cdk.models import ( + ConfiguredAirbyteCatalog, + Status, +) +from airbyte_cdk.test import entrypoint_wrapper +from airbyte_cdk.test.standard_tests.models import ( + ConnectorTestScenario, +) + + +def _errors_to_str( + entrypoint_output: entrypoint_wrapper.EntrypointOutput, +) -> str: + """Convert errors from entrypoint output to a string.""" + if not entrypoint_output.errors: + # If there are no errors, return an empty string. + return "" + + return "\n" + "\n".join( + [ + str(error.trace.error).replace( + "\\n", + "\n", + ) + for error in entrypoint_output.errors + if error.trace + ], + ) + + +@runtime_checkable +class IConnector(Protocol): + """A connector that can be run in a test scenario. + + Note: We currently use 'spec' to determine if we have a connector object. + In the future, it would be preferred to leverage a 'launch' method instead, + directly on the connector (which doesn't yet exist). + """ + + def spec(self, logger: logging.Logger) -> Any: + """Connectors should have a `spec` method.""" + + +def run_test_job( + connector: IConnector | type[IConnector] | Callable[[], IConnector], + verb: Literal["read", "check", "discover"], + test_scenario: ConnectorTestScenario, + *, + catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None, +) -> entrypoint_wrapper.EntrypointOutput: + """Run a test scenario from provided CLI args and return the result.""" + if not connector: + raise ValueError("Connector is required") + + if catalog and isinstance(catalog, ConfiguredAirbyteCatalog): + # Convert the catalog to a dict if it's already a ConfiguredAirbyteCatalog. + catalog = asdict(catalog) + + connector_obj: IConnector + if isinstance(connector, type) or callable(connector): + # If the connector is a class or a factory lambda, instantiate it. + connector_obj = connector() + elif isinstance(connector, IConnector): + connector_obj = connector + else: + raise ValueError( + f"Invalid connector input: {type(connector)}", + ) + + args: list[str] = [verb] + if test_scenario.config_path: + args += ["--config", str(test_scenario.config_path)] + elif test_scenario.config_dict: + config_path = ( + Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json" + ) + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(orjson.dumps(test_scenario.config_dict).decode()) + args += ["--config", str(config_path)] + + catalog_path: Path | None = None + if verb not in ["discover", "check"]: + # We need a catalog for read. + if catalog: + # Write the catalog to a temp json file and pass the path to the file as an argument. + catalog_path = ( + Path(tempfile.gettempdir()) + / "airbyte-test" + / f"temp_catalog_{uuid.uuid4().hex}.json" + ) + catalog_path.parent.mkdir(parents=True, exist_ok=True) + catalog_path.write_text(orjson.dumps(catalog).decode()) + elif test_scenario.configured_catalog_path: + catalog_path = Path(test_scenario.configured_catalog_path) + + if catalog_path: + args += ["--catalog", str(catalog_path)] + + # This is a bit of a hack because the source needs the catalog early. + # Because it *also* can fail, we have to redundantly wrap it in a try/except block. + + result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API + source=connector_obj, # type: ignore [arg-type] + args=args, + expecting_exception=test_scenario.expect_exception, + ) + if result.errors and not test_scenario.expect_exception: + raise AssertionError( + f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) + ) + + if verb == "check": + # Check is expected to fail gracefully without an exception. + # Instead, we assert that we have a CONNECTION_STATUS message with + # a failure status. + assert len(result.connection_status_messages) == 1, ( + "Expected exactly one CONNECTION_STATUS message. Got " + f"{len(result.connection_status_messages)}:\n" + + "\n".join([str(msg) for msg in result.connection_status_messages]) + + _errors_to_str(result) + ) + if test_scenario.expect_exception: + conn_status = result.connection_status_messages[0].connectionStatus + assert conn_status, ( + "Expected CONNECTION_STATUS message to be present. Got: \n" + + "\n".join([str(msg) for msg in result.connection_status_messages]) + ) + assert conn_status.status == Status.FAILED, ( + "Expected CONNECTION_STATUS message to be FAILED. Got: \n" + + "\n".join([str(msg) for msg in result.connection_status_messages]) + ) + + return result + + # For all other verbs, we assert check that an exception is raised (or not). + if test_scenario.expect_exception: + if not result.errors: + raise AssertionError("Expected exception but got none.") + + return result + + assert not result.errors, ( + f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) + ) + + return result diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py new file mode 100644 index 000000000..964d0230d --- /dev/null +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -0,0 +1,148 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for connector test suites.""" + +from __future__ import annotations + +import abc +import inspect +import sys +from collections.abc import Callable +from pathlib import Path +from typing import cast + +import yaml +from boltons.typeutils import classproperty + +from airbyte_cdk.models import ( + AirbyteMessage, + Type, +) +from airbyte_cdk.test import entrypoint_wrapper +from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job +from airbyte_cdk.test.standard_tests.models import ( + ConnectorTestScenario, +) + +ACCEPTANCE_TEST_CONFIG = "acceptance-test-config.yml" +MANIFEST_YAML = "manifest.yaml" + + +class ConnectorTestSuiteBase(abc.ABC): + """Base class for connector test suites.""" + + connector: type[IConnector] | Callable[[], IConnector] | None = None + """The connector class or a factory function that returns an scenario of IConnector.""" + + @classmethod + def get_test_class_dir(cls) -> Path: + """Get the file path that contains the class.""" + module = sys.modules[cls.__module__] + # Get the directory containing the test file + return Path(inspect.getfile(module)).parent + + @classmethod + def create_connector( + cls, + scenario: ConnectorTestScenario, + ) -> IConnector: + """Instantiate the connector class.""" + connector = cls.connector # type: ignore + if connector: + if callable(connector) or isinstance(connector, type): + # If the connector is a class or factory function, instantiate it: + return cast(IConnector, connector()) # type: ignore [redundant-cast] + + # Otherwise, we can't instantiate the connector. Fail with a clear error message. + raise NotImplementedError( + "No connector class or connector factory function provided. " + "Please provide a class or factory function in `cls.connector`, or " + "override `cls.create_connector()` to define a custom initialization process." + ) + + # Test Definitions + + def test_check( + self, + scenario: ConnectorTestScenario, + ) -> None: + """Run `connection` acceptance tests.""" + result: entrypoint_wrapper.EntrypointOutput = run_test_job( + self.create_connector(scenario), + "check", + test_scenario=scenario, + ) + conn_status_messages: list[AirbyteMessage] = [ + msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS + ] # noqa: SLF001 # Non-public API + assert len(conn_status_messages) == 1, ( + f"Expected exactly one CONNECTION_STATUS message. Got: {result._messages}" + ) + + @classmethod + def get_connector_root_dir(cls) -> Path: + """Get the root directory of the connector.""" + for parent in cls.get_test_class_dir().parents: + if (parent / MANIFEST_YAML).exists(): + return parent + if (parent / ACCEPTANCE_TEST_CONFIG).exists(): + return parent + if parent.name == "airbyte_cdk": + break + # If we reach here, we didn't find the manifest file in any parent directory + # Check if the manifest file exists in the current directory + for parent in Path.cwd().parents: + if (parent / MANIFEST_YAML).exists(): + return parent + if (parent / ACCEPTANCE_TEST_CONFIG).exists(): + return parent + if parent.name == "airbyte_cdk": + break + + raise FileNotFoundError( + "Could not find connector root directory relative to " + f"'{str(cls.get_test_class_dir())}' or '{str(Path.cwd())}'." + ) + + @classproperty + def acceptance_test_config_path(cls) -> Path: + """Get the path to the acceptance test config file.""" + result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG + if result.exists(): + return result + + raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}") + + @classmethod + def get_scenarios( + cls, + ) -> list[ConnectorTestScenario]: + """Get acceptance tests for a given category. + + This has to be a separate function because pytest does not allow + parametrization of fixtures with arguments from the test class itself. + """ + category = "connection" + all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text()) + if "acceptance_tests" not in all_tests_config: + raise ValueError( + f"Acceptance tests config not found in {cls.acceptance_test_config_path}." + f" Found only: {str(all_tests_config)}." + ) + if category not in all_tests_config["acceptance_tests"]: + return [] + if "tests" not in all_tests_config["acceptance_tests"][category]: + raise ValueError(f"No tests found for category {category}") + + tests_scenarios = [ + ConnectorTestScenario.model_validate(test) + for test in all_tests_config["acceptance_tests"][category]["tests"] + if "iam_role" not in test["config_path"] + ] + connector_root = cls.get_connector_root_dir().absolute() + for test in tests_scenarios: + if test.config_path: + test.config_path = connector_root / test.config_path + if test.configured_catalog_path: + test.configured_catalog_path = connector_root / test.configured_catalog_path + + return tests_scenarios diff --git a/airbyte_cdk/test/standard_tests/declarative_sources.py b/airbyte_cdk/test/standard_tests/declarative_sources.py new file mode 100644 index 000000000..18ac084fc --- /dev/null +++ b/airbyte_cdk/test/standard_tests/declarative_sources.py @@ -0,0 +1,92 @@ +import os +from hashlib import md5 +from pathlib import Path +from typing import Any, cast + +import yaml +from boltons.typeutils import classproperty + +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.standard_tests._job_runner import IConnector +from airbyte_cdk.test.standard_tests.connector_base import MANIFEST_YAML +from airbyte_cdk.test.standard_tests.models import ConnectorTestScenario +from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase + + +def md5_checksum(file_path: Path) -> str: + """Helper function to calculate the MD5 checksum of a file. + + This is used to calculate the checksum of the `components.py` file, if it exists. + """ + with open(file_path, "rb") as file: + return md5(file.read()).hexdigest() + + +class DeclarativeSourceTestSuite(SourceTestSuiteBase): + """Declarative source test suite. + + This inherits from the Python-based source test suite and implements the + `create_connector` method to create a declarative source object instead of + requiring a custom Python source object. + + The class also automatically locates the `manifest.yaml` file and the + `components.py` file (if it exists) in the connector's directory. + """ + + @classproperty + def manifest_yaml_path(cls) -> Path: + """Get the path to the manifest.yaml file.""" + result = cls.get_connector_root_dir() / MANIFEST_YAML + if result.exists(): + return result + + raise FileNotFoundError( + f"Manifest YAML file not found at {result}. " + "Please ensure that the test suite is run in the correct directory.", + ) + + @classproperty + def components_py_path(cls) -> Path | None: + """Get the path to the `components.py` file, if one exists. + + If not `components.py` file exists, return None. + """ + result = cls.get_connector_root_dir() / "components.py" + if result.exists(): + return result + + return None + + @classmethod + def create_connector( + cls, + scenario: ConnectorTestScenario, + ) -> IConnector: + """Create a connector scenario for the test suite. + + This overrides `create_connector` from the create a declarative source object + instead of requiring a custom python source object. + + Subclasses should not need to override this method. + """ + config: dict[str, Any] = scenario.get_config_dict() + + manifest_dict = yaml.safe_load(cls.manifest_yaml_path.read_text()) + if cls.components_py_path and cls.components_py_path.exists(): + os.environ["AIRBYTE_ENABLE_UNSAFE_CODE"] = "true" + config["__injected_components_py"] = cls.components_py_path.read_text() + config["__injected_components_py_checksums"] = { + "md5": md5_checksum(cls.components_py_path), + } + + return cast( + IConnector, + ConcurrentDeclarativeSource( + config=config, + catalog=None, + state=None, + source_config=manifest_dict, + ), + ) diff --git a/airbyte_cdk/test/standard_tests/destination_base.py b/airbyte_cdk/test/standard_tests/destination_base.py new file mode 100644 index 000000000..985fc92a3 --- /dev/null +++ b/airbyte_cdk/test/standard_tests/destination_base.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for destination test suites.""" + +from airbyte_cdk.test.standard_tests.connector_base import ConnectorTestSuiteBase + + +class DestinationTestSuiteBase(ConnectorTestSuiteBase): + """Base class for destination test suites. + + This class provides a base set of functionality for testing destination connectors, and it + inherits all generic connector tests from the `ConnectorTestSuiteBase` class. + + TODO: As of now, this class does not add any additional functionality or tests specific to + destination connectors. However, it serves as a placeholder for future enhancements and + customizations that may be needed for destination connectors. + """ diff --git a/airbyte_cdk/test/standard_tests/models/__init__.py b/airbyte_cdk/test/standard_tests/models/__init__.py new file mode 100644 index 000000000..13d67e16a --- /dev/null +++ b/airbyte_cdk/test/standard_tests/models/__init__.py @@ -0,0 +1,7 @@ +from airbyte_cdk.test.standard_tests.models.scenario import ( + ConnectorTestScenario, +) + +__all__ = [ + "ConnectorTestScenario", +] diff --git a/airbyte_cdk/test/standard_tests/models/scenario.py b/airbyte_cdk/test/standard_tests/models/scenario.py new file mode 100644 index 000000000..944b60921 --- /dev/null +++ b/airbyte_cdk/test/standard_tests/models/scenario.py @@ -0,0 +1,74 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Run acceptance tests in PyTest. + +These tests leverage the same `acceptance-test-config.yml` configuration files as the +acceptance tests in CAT, but they run in PyTest instead of CAT. This allows us to run +the acceptance tests in the same local environment as we are developing in, speeding +up iteration cycles. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Literal, cast + +import yaml +from pydantic import BaseModel + + +class ConnectorTestScenario(BaseModel): + """Acceptance test scenario, as a Pydantic model. + + This class represents an acceptance test scenario, which is a single test case + that can be run against a connector. It is used to deserialize and validate the + acceptance test configuration file. + """ + + class AcceptanceTestExpectRecords(BaseModel): + path: Path + exact_order: bool = False + + class AcceptanceTestFileTypes(BaseModel): + skip_test: bool + bypass_reason: str + + config_path: Path | None = None + config_dict: dict[str, Any] | None = None + + id: str | None = None + + configured_catalog_path: Path | None = None + timeout_seconds: int | None = None + expect_records: AcceptanceTestExpectRecords | None = None + file_types: AcceptanceTestFileTypes | None = None + status: Literal["succeed", "failed"] | None = None + + def get_config_dict(self) -> dict[str, Any]: + """Return the config dictionary. + + If a config dictionary has already been loaded, return it. Otherwise, load + the config file and return the dictionary. + """ + if self.config_dict: + return self.config_dict + + if self.config_path: + return cast(dict[str, Any], yaml.safe_load(self.config_path.read_text())) + + raise ValueError("No config dictionary or path provided.") + + @property + def expect_exception(self) -> bool: + return self.status and self.status == "failed" or False + + @property + def instance_name(self) -> str: + return self.config_path.stem if self.config_path else "Unnamed Scenario" + + def __str__(self) -> str: + if self.id: + return f"'{self.id}' Test Scenario" + if self.config_path: + return f"'{self.config_path.name}' Test Scenario" + + return f"'{hash(self)}' Test Scenario" diff --git a/airbyte_cdk/test/standard_tests/pytest_hooks.py b/airbyte_cdk/test/standard_tests/pytest_hooks.py new file mode 100644 index 000000000..b6197a0c3 --- /dev/null +++ b/airbyte_cdk/test/standard_tests/pytest_hooks.py @@ -0,0 +1,61 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Pytest hooks for Airbyte CDK tests. + +These hooks are used to customize the behavior of pytest during test discovery and execution. + +To use these hooks within a connector, add the following lines to the connector's `conftest.py` +file, or to another file that is imported during test discovery: + +```python +pytest_plugins = [ + "airbyte_cdk.test.standard_tests.pytest_hooks", +] +``` +""" + +import pytest + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + """ + A helper for pytest_generate_tests hook. + + If a test method (in a class subclassed from our base class) + declares an argument 'scenario', this function retrieves the + 'scenarios' attribute from the test class and parametrizes that + test with the values from 'scenarios'. + + ## Usage + + ```python + from airbyte_cdk.test.standard_tests.connector_base import ( + generate_tests, + ConnectorTestSuiteBase, + ) + + def pytest_generate_tests(metafunc): + generate_tests(metafunc) + + class TestMyConnector(ConnectorTestSuiteBase): + ... + + ``` + """ + # Check if the test function requires an 'scenario' argument + if "scenario" in metafunc.fixturenames: + # Retrieve the test class + test_class = metafunc.cls + if test_class is None: + return + + # Get the 'scenarios' attribute from the class + scenarios_attr = getattr(test_class, "get_scenarios", None) + if scenarios_attr is None: + raise ValueError( + f"Test class {test_class} does not have a 'scenarios' attribute. " + "Please define the 'scenarios' attribute in the test class." + ) + + scenarios = test_class.get_scenarios() + ids = [str(scenario) for scenario in scenarios] + metafunc.parametrize("scenario", scenarios, ids=ids) diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py new file mode 100644 index 000000000..83cc7326f --- /dev/null +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -0,0 +1,140 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Base class for source test suites.""" + +from dataclasses import asdict + +from airbyte_cdk.models import ( + AirbyteMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, + Type, +) +from airbyte_cdk.test import entrypoint_wrapper +from airbyte_cdk.test.standard_tests._job_runner import run_test_job +from airbyte_cdk.test.standard_tests.connector_base import ( + ConnectorTestSuiteBase, +) +from airbyte_cdk.test.standard_tests.models import ( + ConnectorTestScenario, +) + + +class SourceTestSuiteBase(ConnectorTestSuiteBase): + """Base class for source test suites. + + This class provides a base set of functionality for testing source connectors, and it + inherits all generic connector tests from the `ConnectorTestSuiteBase` class. + """ + + def test_check( + self, + scenario: ConnectorTestScenario, + ) -> None: + """Run standard `check` tests on the connector. + + Assert that the connector returns a single CONNECTION_STATUS message. + This test is designed to validate the connector's ability to establish a connection + and return its status with the expected message type. + """ + result: entrypoint_wrapper.EntrypointOutput = run_test_job( + self.create_connector(scenario), + "check", + test_scenario=scenario, + ) + conn_status_messages: list[AirbyteMessage] = [ + msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS + ] # noqa: SLF001 # Non-public API + num_status_messages = len(conn_status_messages) + assert num_status_messages == 1, ( + f"Expected exactly one CONNECTION_STATUS message. Got {num_status_messages}: \n" + + "\n".join([str(m) for m in result._messages]) + ) + + def test_discover( + self, + scenario: ConnectorTestScenario, + ) -> None: + """Standard test for `discover`.""" + run_test_job( + self.create_connector(scenario), + "discover", + test_scenario=scenario, + ) + + def test_basic_read( + self, + scenario: ConnectorTestScenario, + ) -> None: + """Run standard `read` test on the connector. + + This test is designed to validate the connector's ability to read data + from the source and return records. It first runs a `discover` job to + obtain the catalog of streams, and then it runs a `read` job to fetch + records from those streams. + """ + discover_result = run_test_job( + self.create_connector(scenario), + "discover", + test_scenario=scenario, + ) + if scenario.expect_exception: + assert discover_result.errors, "Expected exception but got none." + return + + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=stream, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append_dedup, + ) + for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr] + ] + ) + result = run_test_job( + self.create_connector(scenario), + "read", + test_scenario=scenario, + catalog=configured_catalog, + ) + + if not result.records: + raise AssertionError("Expected records but got none.") # noqa: TRY003 + + def test_fail_read_with_bad_catalog( + self, + scenario: ConnectorTestScenario, + ) -> None: + """Standard test for `read` when passed a bad catalog file.""" + invalid_configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + # Create ConfiguredAirbyteStream which is deliberately invalid + # with regard to the Airbyte Protocol. + # This should cause the connector to fail. + ConfiguredAirbyteStream( + stream=AirbyteStream( + name="__AIRBYTE__stream_that_does_not_exist", + json_schema={ + "type": "object", + "properties": {"f1": {"type": "string"}}, + }, + supported_sync_modes=[SyncMode.full_refresh], + ), + sync_mode="INVALID", # type: ignore [reportArgumentType] + destination_sync_mode="INVALID", # type: ignore [reportArgumentType] + ) + ] + ) + # Set expected status to "failed" to ensure the test fails if the connector. + scenario.status = "failed" + result: entrypoint_wrapper.EntrypointOutput = run_test_job( + self.create_connector(scenario), + "read", + test_scenario=scenario, + catalog=asdict(invalid_configured_catalog), + ) + assert result.errors, "Expected errors but got none." + assert result.trace_messages, "Expected trace messages but got none." diff --git a/poetry.lock b/poetry.lock index 936c060f7..40f6a035e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -131,21 +131,16 @@ frozenlist = ">=1.1.0" [[package]] name = "airbyte-protocol-models-dataclasses" -version = "0.14.1337.dev1742858109" +version = "0.15.0" description = "Declares the Airbyte Protocol using Python Dataclasses. Dataclasses in Python have less performance overhead compared to Pydantic models, making them a more efficient choice for scenarios where speed and memory usage are critical" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "airbyte_protocol_models_dataclasses-0.14.1337.dev1742858109-py3-none-any.whl", hash = "sha256:e9937574976a4bbe20339074e65f99654ab317b9fb1275806247aaa698d6793c"}, - {file = "airbyte_protocol_models_dataclasses-0.14.1337.dev1742858109.tar.gz", hash = "sha256:69fc693fe1e3545c38e0bf6b27aa3a0ead3ae00e57d75fdc94eb0366189f56dc"}, + {file = "airbyte_protocol_models_dataclasses-0.15.0-py3-none-any.whl", hash = "sha256:0fe8d7c2863c348b350efcf5f1af5872dc9071060408285e4708d97a9be5e2fb"}, + {file = "airbyte_protocol_models_dataclasses-0.15.0.tar.gz", hash = "sha256:a5bad4ee7ae0a04f1436967b7afd3306d28e1cd2e5acedf0cce588f0c80ed001"}, ] -[package.source] -type = "legacy" -url = "https://test.pypi.org/simple" -reference = "testpypi" - [[package]] name = "annotated-types" version = "0.7.0" @@ -304,6 +299,18 @@ charset-normalizer = ["charset-normalizer"] html5lib = ["html5lib"] lxml = ["lxml"] +[[package]] +name = "boltons" +version = "25.0.0" +description = "When they're not builtins, they're boltons." +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "boltons-25.0.0-py3-none-any.whl", hash = "sha256:dc9fb38bf28985715497d1b54d00b62ea866eca3938938ea9043e254a3a6ca62"}, + {file = "boltons-25.0.0.tar.gz", hash = "sha256:e110fbdc30b7b9868cb604e3f71d4722dd8f4dcb4a5ddd06028ba8f1ab0b5ace"}, +] + [[package]] name = "bracex" version = "2.5.post1" @@ -4341,30 +4348,30 @@ jupyter = ["ipywidgets (>=7.5.1,<9)"] [[package]] name = "ruff" -version = "0.7.4" +version = "0.11.5" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" groups = ["dev"] files = [ - {file = "ruff-0.7.4-py3-none-linux_armv6l.whl", hash = "sha256:a4919925e7684a3f18e18243cd6bea7cfb8e968a6eaa8437971f681b7ec51478"}, - {file = "ruff-0.7.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:cfb365c135b830778dda8c04fb7d4280ed0b984e1aec27f574445231e20d6c63"}, - {file = "ruff-0.7.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:63a569b36bc66fbadec5beaa539dd81e0527cb258b94e29e0531ce41bacc1f20"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0d06218747d361d06fd2fdac734e7fa92df36df93035db3dc2ad7aa9852cb109"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:e0cea28d0944f74ebc33e9f934238f15c758841f9f5edd180b5315c203293452"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:80094ecd4793c68b2571b128f91754d60f692d64bc0d7272ec9197fdd09bf9ea"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:997512325c6620d1c4c2b15db49ef59543ef9cd0f4aa8065ec2ae5103cedc7e7"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:00b4cf3a6b5fad6d1a66e7574d78956bbd09abfd6c8a997798f01f5da3d46a05"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7dbdc7d8274e1422722933d1edddfdc65b4336abf0b16dfcb9dedd6e6a517d06"}, - {file = "ruff-0.7.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e92dfb5f00eaedb1501b2f906ccabfd67b2355bdf117fea9719fc99ac2145bc"}, - {file = "ruff-0.7.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:3bd726099f277d735dc38900b6a8d6cf070f80828877941983a57bca1cd92172"}, - {file = "ruff-0.7.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:2e32829c429dd081ee5ba39aef436603e5b22335c3d3fff013cd585806a6486a"}, - {file = "ruff-0.7.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:662a63b4971807623f6f90c1fb664613f67cc182dc4d991471c23c541fee62dd"}, - {file = "ruff-0.7.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:876f5e09eaae3eb76814c1d3b68879891d6fde4824c015d48e7a7da4cf066a3a"}, - {file = "ruff-0.7.4-py3-none-win32.whl", hash = "sha256:75c53f54904be42dd52a548728a5b572344b50d9b2873d13a3f8c5e3b91f5cac"}, - {file = "ruff-0.7.4-py3-none-win_amd64.whl", hash = "sha256:745775c7b39f914238ed1f1b0bebed0b9155a17cd8bc0b08d3c87e4703b990d6"}, - {file = "ruff-0.7.4-py3-none-win_arm64.whl", hash = "sha256:11bff065102c3ae9d3ea4dc9ecdfe5a5171349cdd0787c1fc64761212fc9cf1f"}, - {file = "ruff-0.7.4.tar.gz", hash = "sha256:cd12e35031f5af6b9b93715d8c4f40360070b2041f81273d0527683d5708fce2"}, + {file = "ruff-0.11.5-py3-none-linux_armv6l.whl", hash = "sha256:2561294e108eb648e50f210671cc56aee590fb6167b594144401532138c66c7b"}, + {file = "ruff-0.11.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:ac12884b9e005c12d0bd121f56ccf8033e1614f736f766c118ad60780882a077"}, + {file = "ruff-0.11.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:4bfd80a6ec559a5eeb96c33f832418bf0fb96752de0539905cf7b0cc1d31d779"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0947c0a1afa75dcb5db4b34b070ec2bccee869d40e6cc8ab25aca11a7d527794"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ad871ff74b5ec9caa66cb725b85d4ef89b53f8170f47c3406e32ef040400b038"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e6cf918390cfe46d240732d4d72fa6e18e528ca1f60e318a10835cf2fa3dc19f"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:56145ee1478582f61c08f21076dc59153310d606ad663acc00ea3ab5b2125f82"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e5f66f8f1e8c9fc594cbd66fbc5f246a8d91f916cb9667e80208663ec3728304"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:80b4df4d335a80315ab9afc81ed1cff62be112bd165e162b5eed8ac55bfc8470"}, + {file = "ruff-0.11.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3068befab73620b8a0cc2431bd46b3cd619bc17d6f7695a3e1bb166b652c382a"}, + {file = "ruff-0.11.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:f5da2e710a9641828e09aa98b92c9ebbc60518fdf3921241326ca3e8f8e55b8b"}, + {file = "ruff-0.11.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ef39f19cb8ec98cbc762344921e216f3857a06c47412030374fffd413fb8fd3a"}, + {file = "ruff-0.11.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:b2a7cedf47244f431fd11aa5a7e2806dda2e0c365873bda7834e8f7d785ae159"}, + {file = "ruff-0.11.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:81be52e7519f3d1a0beadcf8e974715b2dfc808ae8ec729ecfc79bddf8dbb783"}, + {file = "ruff-0.11.5-py3-none-win32.whl", hash = "sha256:e268da7b40f56e3eca571508a7e567e794f9bfcc0f412c4b607931d3af9c4afe"}, + {file = "ruff-0.11.5-py3-none-win_amd64.whl", hash = "sha256:6c6dc38af3cfe2863213ea25b6dc616d679205732dc0fb673356c2d69608f800"}, + {file = "ruff-0.11.5-py3-none-win_arm64.whl", hash = "sha256:67e241b4314f4eacf14a601d586026a962f4002a475aa702c69980a38087aa4e"}, + {file = "ruff-0.11.5.tar.gz", hash = "sha256:cae2e2439cb88853e421901ec040a758960b576126dab520fa08e9de431d1bef"}, ] [[package]] @@ -5407,4 +5414,4 @@ vector-db-based = ["cohere", "langchain", "openai", "tiktoken"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "b581ab987c1608518a0bb26fcda2bdbda9b245940d7cbe397f2092e7c3a73efb" +content-hash = "7027fd1921446b4781a115f2c18e4d4baa22687c5d94f1ff59267b30c6141aad" diff --git a/pyproject.toml b/pyproject.toml index b1d10b8a1..630a83086 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,9 +30,10 @@ enable = true [tool.poetry.dependencies] python = ">=3.10,<3.13" -airbyte-protocol-models-dataclasses = { version = "0.14.1337.dev1742858109", source = "testpypi" } +airbyte-protocol-models-dataclasses = "^0.15" backoff = "*" +boltons = "^25.0.0" cachetools = "*" dpath = "^2.1.6" dunamai = "^1.22.0" @@ -85,11 +86,6 @@ xmltodict = ">=0.13,<0.15" anyascii = "^0.3.2" whenever = "^0.6.16" -[[tool.poetry.source]] -name = "testpypi" -url = "https://test.pypi.org/simple/" -priority = "supplemental" - [tool.poetry.group.dev.dependencies] freezegun = "*" mypy = "*" @@ -158,6 +154,9 @@ check-lockfile = {cmd = "poetry check", help = "Check the poetry lock file."} lint-fix = { cmd = "poetry run ruff check --fix .", help = "Auto-fix any lint issues that Ruff can automatically resolve (excluding 'unsafe' fixes)." } lint-fix-unsafe = { cmd = "poetry run ruff check --fix --unsafe-fixes .", help = "Lint-fix modified files, including 'unsafe' fixes. It is recommended to first commit any pending changes and then always manually review any unsafe changes applied." } +# ruff fix everything (ignoring non-Python fixes) +ruff-fix = { sequence = ["lint-fix", "_format-fix-ruff"] , help = "Lint-fix and format-fix all code." } + # Combined Check and Fix tasks check-all = {sequence = ["lint", "format-check", "type-check", "check-lockfile"], help = "Lint, format, and type-check modified files.", ignore_fail = "return_non_zero"} diff --git a/unit_tests/resource/http/response/file_api/article_attachment_content.png b/unit_tests/resource/http/response/file_api/article_attachment_content.png new file mode 100644 index 000000000..fdcd10361 Binary files /dev/null and b/unit_tests/resource/http/response/file_api/article_attachment_content.png differ diff --git a/unit_tests/resource/http/response/file_api/article_attachments.json b/unit_tests/resource/http/response/file_api/article_attachments.json new file mode 100644 index 000000000..832499618 --- /dev/null +++ b/unit_tests/resource/http/response/file_api/article_attachments.json @@ -0,0 +1,19 @@ +{ + "article_attachments": [ + { + "id": 12138758717583, + "url": "https://d3v-airbyte.zendesk.com/api/v2/help_center/articles/attachments/12138758717583", + "article_id": 12138789487375, + "display_file_name": "some_image_name.webp", + "file_name": "some_image_name.png", + "locale": "en-us", + "content_url": "https://d3v-airbyte.zendesk.com/hc/article_attachments/12138758717583", + "relative_path": "/hc/article_attachments/12138758717583", + "content_type": "image/webp", + "size": 109284, + "inline": true, + "created_at": "2025-03-11T23:33:57Z", + "updated_at": "2025-03-11T23:33:57Z" + } + ] +} \ No newline at end of file diff --git a/unit_tests/resource/http/response/file_api/articles.json b/unit_tests/resource/http/response/file_api/articles.json new file mode 100644 index 000000000..6b97dea63 --- /dev/null +++ b/unit_tests/resource/http/response/file_api/articles.json @@ -0,0 +1,37 @@ +{ + "count": 7, + "next_page": null, + "end_time": 1741736037, + "articles": [ + { + "id": 12138789487375, + "url": "https://d3v-airbyte.zendesk.com/api/v2/help_center/en-us/articles/12138789487375.json", + "html_url": "https://d3v-airbyte.zendesk.com/hc/en-us/articles/12138789487375-This-is-an-article-with-an-attachment", + "author_id": 360786799676, + "comments_disabled": false, + "draft": true, + "promoted": false, + "position": 0, + "vote_sum": 0, + "vote_count": 0, + "section_id": 7253394947215, + "created_at": "2025-03-11T23:33:57Z", + "updated_at": "2025-03-11T23:33:57Z", + "name": "This is an article with an attachment!", + "title": "This is an article with an attachment!", + "source_locale": "en-us", + "locale": "en-us", + "outdated": false, + "outdated_locales": [], + "edited_at": "2025-03-11T23:33:57Z", + "user_segment_id": 7253375826191, + "permission_group_id": 7253379449487, + "content_tag_ids": [], + "label_names": [], + "body": "

Here be some text\"some_image.webp\"

", + "user_segment_ids": [ + 7253375826191 + ] + } + ] + } \ No newline at end of file diff --git a/unit_tests/source_declarative_manifest/resources/__init__.py b/unit_tests/resources/__init__.py similarity index 100% rename from unit_tests/source_declarative_manifest/resources/__init__.py rename to unit_tests/resources/__init__.py diff --git a/unit_tests/source_declarative_manifest/resources/invalid_local_manifest.yaml b/unit_tests/resources/invalid_local_manifest.yaml similarity index 100% rename from unit_tests/source_declarative_manifest/resources/invalid_local_manifest.yaml rename to unit_tests/resources/invalid_local_manifest.yaml diff --git a/unit_tests/source_declarative_manifest/resources/invalid_local_pokeapi_config.json b/unit_tests/resources/invalid_local_pokeapi_config.json similarity index 100% rename from unit_tests/source_declarative_manifest/resources/invalid_local_pokeapi_config.json rename to unit_tests/resources/invalid_local_pokeapi_config.json diff --git a/unit_tests/source_declarative_manifest/resources/invalid_remote_config.json b/unit_tests/resources/invalid_remote_config.json similarity index 100% rename from unit_tests/source_declarative_manifest/resources/invalid_remote_config.json rename to unit_tests/resources/invalid_remote_config.json diff --git a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/README.md b/unit_tests/resources/source_pokeapi_w_components_py/README.md similarity index 100% rename from unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/README.md rename to unit_tests/resources/source_pokeapi_w_components_py/README.md diff --git a/unit_tests/resources/source_pokeapi_w_components_py/__init__.py b/unit_tests/resources/source_pokeapi_w_components_py/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unit_tests/resources/source_pokeapi_w_components_py/acceptance-test-config.yml b/unit_tests/resources/source_pokeapi_w_components_py/acceptance-test-config.yml new file mode 100644 index 000000000..e707a9099 --- /dev/null +++ b/unit_tests/resources/source_pokeapi_w_components_py/acceptance-test-config.yml @@ -0,0 +1,29 @@ +# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) +# for more information about how to configure these tests +# connector_image: airbyte/source-pokeapi:dev +acceptance_tests: + spec: + tests: + - spec_path: "manifest.yaml" + backward_compatibility_tests_config: + disable_for_version: "0.1.5" + connection: + tests: + - config_path: "valid_config.yaml" + status: "succeed" + discovery: + tests: + - config_path: "valid_config.yaml" + backward_compatibility_tests_config: + disable_for_version: "0.1.5" + basic_read: + tests: + - config_path: "valid_config.yaml" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + incremental: + bypass_reason: "This connector does not implement incremental sync" + full_refresh: + tests: + - config_path: "valid_config.yaml" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components.py b/unit_tests/resources/source_pokeapi_w_components_py/components.py similarity index 51% rename from unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components.py rename to unit_tests/resources/source_pokeapi_w_components_py/components.py index 5e7e16f71..214a53641 100644 --- a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components.py +++ b/unit_tests/resources/source_pokeapi_w_components_py/components.py @@ -1,6 +1,7 @@ """A sample implementation of custom components that does nothing but will cause syncs to fail if missing.""" -from typing import Any, Mapping +from collections.abc import Iterable, MutableMapping +from typing import Any import requests @@ -18,3 +19,14 @@ class MyCustomExtractor(DpathExtractor): """ pass + + +class MyCustomFailingExtractor(DpathExtractor): + """Dummy class, intentionally raises an exception when extract_records is called.""" + + def extract_records( + self, + response: requests.Response, + ) -> Iterable[MutableMapping[Any, Any]]: + """Raise an exception when called.""" + raise IntentionalException("This is an intentional failure for testing purposes.") diff --git a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components_failing.py b/unit_tests/resources/source_pokeapi_w_components_py/components_failing.py similarity index 68% rename from unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components_failing.py rename to unit_tests/resources/source_pokeapi_w_components_py/components_failing.py index 5c05881e7..95a7c0662 100644 --- a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/components_failing.py +++ b/unit_tests/resources/source_pokeapi_w_components_py/components_failing.py @@ -1,11 +1,7 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# """A sample implementation of custom components that does nothing but will cause syncs to fail if missing.""" from collections.abc import Iterable, MutableMapping -from dataclasses import InitVar, dataclass -from typing import Any, Mapping, Optional, Union +from typing import Any import requests @@ -17,8 +13,11 @@ class IntentionalException(Exception): class MyCustomExtractor(DpathExtractor): + """Dummy class, intentionally raises an exception when extract_records is called.""" + def extract_records( self, response: requests.Response, ) -> Iterable[MutableMapping[Any, Any]]: - raise IntentionalException + """Raise an exception when called.""" + raise IntentionalException("This is an intentional failure for testing purposes.") diff --git a/unit_tests/resources/source_pokeapi_w_components_py/integration_tests/__init__.py b/unit_tests/resources/source_pokeapi_w_components_py/integration_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unit_tests/resources/source_pokeapi_w_components_py/integration_tests/test_airbyte_standards.py b/unit_tests/resources/source_pokeapi_w_components_py/integration_tests/test_airbyte_standards.py new file mode 100644 index 000000000..7229343ae --- /dev/null +++ b/unit_tests/resources/source_pokeapi_w_components_py/integration_tests/test_airbyte_standards.py @@ -0,0 +1,18 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""FAST Airbyte Standard Tests for the source_pokeapi_w_components source.""" + +from airbyte_cdk.test.standard_tests import DeclarativeSourceTestSuite + +pytest_plugins = [ + "airbyte_cdk.test.standard_tests.pytest_hooks", +] + + +class TestSuiteSourcePokeAPI(DeclarativeSourceTestSuite): + """Test suite for the source_pokeapi_w_components source. + + This class inherits from SourceTestSuiteBase and implements all of the tests in the suite. + + As long as the class name starts with "Test", pytest will automatically discover and run the + tests in this class. + """ diff --git a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/manifest.yaml b/unit_tests/resources/source_pokeapi_w_components_py/manifest.yaml similarity index 100% rename from unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/manifest.yaml rename to unit_tests/resources/source_pokeapi_w_components_py/manifest.yaml diff --git a/unit_tests/resources/source_pokeapi_w_components_py/valid_config.yaml b/unit_tests/resources/source_pokeapi_w_components_py/valid_config.yaml new file mode 100644 index 000000000..209b0a787 --- /dev/null +++ b/unit_tests/resources/source_pokeapi_w_components_py/valid_config.yaml @@ -0,0 +1 @@ +{ "start_date": "2024-01-01", "pokemon_name": "pikachu" } diff --git a/unit_tests/source_declarative_manifest/resources/valid_local_manifest.yaml b/unit_tests/resources/valid_local_manifest.yaml similarity index 100% rename from unit_tests/source_declarative_manifest/resources/valid_local_manifest.yaml rename to unit_tests/resources/valid_local_manifest.yaml diff --git a/unit_tests/source_declarative_manifest/resources/valid_local_pokeapi_config.json b/unit_tests/resources/valid_local_pokeapi_config.json similarity index 100% rename from unit_tests/source_declarative_manifest/resources/valid_local_pokeapi_config.json rename to unit_tests/resources/valid_local_pokeapi_config.json diff --git a/unit_tests/source_declarative_manifest/resources/valid_remote_config.json b/unit_tests/resources/valid_remote_config.json similarity index 100% rename from unit_tests/source_declarative_manifest/resources/valid_remote_config.json rename to unit_tests/resources/valid_remote_config.json diff --git a/unit_tests/source_declarative_manifest/conftest.py b/unit_tests/source_declarative_manifest/conftest.py index 3d61e65e8..e1d135285 100644 --- a/unit_tests/source_declarative_manifest/conftest.py +++ b/unit_tests/source_declarative_manifest/conftest.py @@ -2,34 +2,34 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # -import os +from pathlib import Path import pytest import yaml -def get_fixture_path(file_name): - return os.path.join(os.path.dirname(__file__), file_name) +def get_resource_path(file_name) -> str: + return Path(__file__).parent.parent / "resources" / file_name @pytest.fixture def valid_remote_config(): - return get_fixture_path("resources/valid_remote_config.json") + return get_resource_path("valid_remote_config.json") @pytest.fixture def invalid_remote_config(): - return get_fixture_path("resources/invalid_remote_config.json") + return get_resource_path("invalid_remote_config.json") @pytest.fixture def valid_local_manifest(): - return get_fixture_path("resources/valid_local_manifest.yaml") + return get_resource_path("valid_local_manifest.yaml") @pytest.fixture def invalid_local_manifest(): - return get_fixture_path("resources/invalid_local_manifest.yaml") + return get_resource_path("invalid_local_manifest.yaml") @pytest.fixture @@ -46,9 +46,9 @@ def invalid_local_manifest_yaml(invalid_local_manifest): @pytest.fixture def valid_local_config_file(): - return get_fixture_path("resources/valid_local_pokeapi_config.json") + return get_resource_path("valid_local_pokeapi_config.json") @pytest.fixture def invalid_local_config_file(): - return get_fixture_path("resources/invalid_local_pokeapi_config.json") + return get_resource_path("invalid_local_pokeapi_config.json") diff --git a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/valid_config.yaml b/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/valid_config.yaml deleted file mode 100644 index 78af092bb..000000000 --- a/unit_tests/source_declarative_manifest/resources/source_pokeapi_w_components_py/valid_config.yaml +++ /dev/null @@ -1 +0,0 @@ -{ "start_date": "2024-01-01", "pokemon": "pikachu" } diff --git a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py index 521572bec..12a236ad9 100644 --- a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py +++ b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py @@ -5,7 +5,6 @@ import datetime import json import logging -import os import sys import types from collections.abc import Callable, Mapping @@ -33,6 +32,7 @@ custom_code_execution_permitted, register_components_module_from_string, ) +from airbyte_cdk.test.standard_tests.connector_base import MANIFEST_YAML SAMPLE_COMPONENTS_PY_TEXT = """ def sample_function() -> str: @@ -44,8 +44,8 @@ def sample_method(self) -> str: """ -def get_fixture_path(file_name) -> str: - return os.path.join(os.path.dirname(__file__), file_name) +def get_resource_path(file_name) -> str: + return Path(__file__).parent.parent / "resources" / file_name def test_components_module_from_string() -> None: @@ -90,15 +90,14 @@ def get_py_components_config_dict( *, failing_components: bool = False, ) -> dict[str, Any]: - connector_dir = Path(get_fixture_path("resources/source_pokeapi_w_components_py")) - manifest_yml_path: Path = connector_dir / "manifest.yaml" + connector_dir = Path(get_resource_path("source_pokeapi_w_components_py")) + manifest_yaml_path: Path = connector_dir / MANIFEST_YAML custom_py_code_path: Path = connector_dir / ( "components.py" if not failing_components else "components_failing.py" ) config_yaml_path: Path = connector_dir / "valid_config.yaml" - secrets_yaml_path: Path = connector_dir / "secrets.yaml" - manifest_dict = yaml.safe_load(manifest_yml_path.read_text()) + manifest_dict = yaml.safe_load(manifest_yaml_path.read_text()) assert manifest_dict, "Failed to load the manifest file." assert isinstance(manifest_dict, Mapping), ( f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" @@ -266,8 +265,8 @@ def test_sync_with_injected_py_components( streams=[ ConfiguredAirbyteStream( stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", + sync_mode="full_refresh", # type: ignore (intentional bad value) + destination_sync_mode="overwrite", # type: ignore (intentional bad value) ) for stream in catalog.streams ] diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 62b05a8db..e6ee40d5b 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -1,3 +1,4 @@ +import json import re from pathlib import Path from typing import Any, Dict, List, Optional @@ -10,6 +11,8 @@ from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import find_binary_response, find_template from airbyte_cdk.test.state_builder import StateBuilder @@ -21,7 +24,7 @@ def build(self) -> Dict[str, Any]: "credentials": { "credentials": "api_token", "email": "integration-test@airbyte.io", - "api_token": "fake token", + "api_token": "fake_token", }, } @@ -63,71 +66,129 @@ def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) - ) +SERVER_URL = "https://d3v-airbyte.zendesk.com" +STREAM_URL = f"{SERVER_URL}/api/v2/help_center/incremental/articles?start_time=1672531200" +STREAM_ATTACHMENTS_URL = ( + f"{SERVER_URL}/api/v2/help_center/articles/12138789487375/attachments?per_page=100&=1672531200" +) +STREAM_ATTACHMENT_CONTENT_URL = f"{SERVER_URL}/hc/article_attachments/12138758717583" + + class FileStreamTest(TestCase): def _config(self) -> ConfigBuilder: return ConfigBuilder() def test_check(self) -> None: - source = _source( - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles")) - .build(), - self._config().build(), - ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) - check_result = source.check(Mock(), self._config().build()) + source = _source( + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles")) + .build(), + self._config().build(), + ) + + check_result = source.check(Mock(), self._config().build()) - assert check_result.status == Status.SUCCEEDED + assert check_result.status == Status.SUCCEEDED def test_get_articles(self) -> None: - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles")) - .build(), - ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("articles")) + .build(), + ) - assert output.records + assert output.records def test_get_article_attachments(self) -> None: - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) - .build(), - ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENTS_URL), + HttpResponse( + json.dumps(find_template("file_api/article_attachments", __file__)), 200 + ), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL), + HttpResponse( + find_binary_response("file_api/article_attachment_content.png", __file__), 200 + ), + ) - assert output.records - file_reference = output.records[0].record.file_reference - assert file_reference - assert file_reference.file_url - assert re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) - assert file_reference.file_relative_path - assert re.match( - r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path - ) - assert file_reference.file_size_bytes + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + ) + + assert output.records + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.staging_file_url + assert re.match( + r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.staging_file_url + ) + assert file_reference.source_file_relative_path + assert re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path + ) + assert file_reference.file_size_bytes def test_get_article_attachments_with_filename_extractor(self) -> None: - output = read( - self._config(), - CatalogBuilder() - .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) - .build(), - yaml_file="test_file_stream_with_filename_extractor.yaml", - ) + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENTS_URL), + HttpResponse( + json.dumps(find_template("file_api/article_attachments", __file__)), 200 + ), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL), + HttpResponse( + find_binary_response("file_api/article_attachment_content.png", __file__), 200 + ), + ) - assert output.records - file_reference = output.records[0].record.file_reference - assert file_reference - assert file_reference.file_url - # todo: once we finally mock the response update to check file name - assert not re.match(r"^.*/article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_url) - assert file_reference.file_relative_path - assert not re.match( - r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.file_relative_path - ) - assert file_reference.file_size_bytes + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + ) + + assert output.records + file_reference = output.records[0].record.file_reference + assert file_reference + assert ( + file_reference.staging_file_url + == "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png" + ) + assert file_reference.source_file_relative_path + assert not re.match( + r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path + ) + assert file_reference.file_size_bytes def test_discover_article_attachments(self) -> None: output = discover(self._config()) diff --git a/unit_tests/sources/file_based/in_memory_files_source.py b/unit_tests/sources/file_based/in_memory_files_source.py index c8ee78f0f..4cd465895 100644 --- a/unit_tests/sources/file_based/in_memory_files_source.py +++ b/unit_tests/sources/file_based/in_memory_files_source.py @@ -140,7 +140,7 @@ def get_matching_files( def file_size(self, file: RemoteFile) -> int: return 0 - def get_file( + def upload( self, file: RemoteFile, local_directory: str, logger: logging.Logger ) -> Dict[str, Any]: return {} diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index b1d74334a..e67365ae3 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -690,6 +690,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, }, ] } @@ -1140,6 +1141,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1227,6 +1229,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, }, { "json_schema": { @@ -1242,6 +1245,7 @@ "default_cursor_field": ["_ab_source_file_last_modified"], "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, }, ] } @@ -2104,6 +2108,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -2188,6 +2193,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "source_defined_cursor": True, "default_cursor_field": ["_ab_source_file_last_modified"], }, @@ -2205,6 +2211,7 @@ "default_cursor_field": ["_ab_source_file_last_modified"], "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, }, ] } @@ -2623,6 +2630,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } diff --git a/unit_tests/sources/file_based/scenarios/incremental_scenarios.py b/unit_tests/sources/file_based/scenarios/incremental_scenarios.py index aea4b4846..e34b7f4de 100644 --- a/unit_tests/sources/file_based/scenarios/incremental_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/incremental_scenarios.py @@ -92,6 +92,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "json_schema": { "type": "object", "properties": { @@ -172,6 +173,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "json_schema": { "type": "object", "properties": { @@ -270,6 +272,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "json_schema": { "type": "object", "properties": { @@ -330,6 +333,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "json_schema": { "type": "object", "properties": { @@ -446,6 +450,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -546,6 +551,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, "json_schema": { "type": "object", "properties": { @@ -672,6 +678,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -811,6 +818,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -972,6 +980,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1124,6 +1133,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1254,6 +1264,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1444,6 +1455,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1627,6 +1639,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1743,6 +1756,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } @@ -1883,6 +1897,7 @@ "source_defined_cursor": True, "supported_sync_modes": ["full_refresh", "incremental"], "is_resumable": True, + "is_file_based": False, } ] } diff --git a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 1b85ed8dd..55191d7f6 100644 --- a/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -12,8 +12,15 @@ import pytest -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level +from airbyte_cdk.models import ( + AirbyteLogMessage, + AirbyteMessage, + AirbyteRecordMessageFileReference, + AirbyteStream, + Level, +) from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.sources.file_based import FileBasedStreamConfig from airbyte_cdk.sources.file_based.availability_strategy import ( AbstractFileBasedAvailabilityStrategy, ) @@ -24,6 +31,7 @@ FileBasedSourceError, ) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.file_types import FileTransfer from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -275,11 +283,17 @@ def test_yield_and_raise_collected(self) -> None: class DefaultFileBasedStreamFileTransferTest(unittest.TestCase): _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc) - _A_RECORD = { - "bytes": 10, - "file_relative_path": "relative/path/file.csv", - "file_url": "/absolute/path/file.csv", - } + _A_FILE_RECORD_DATA = FileRecordData( + folder="/absolute/path/", + filename="file.csv", + bytes=10, + source_uri="file:///absolute/path/file.csv", + ) + _A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference( + file_size_bytes=10, + source_file_relative_path="relative/path/file.csv", + staging_file_url="/absolute/path/file.csv", + ) def setUp(self) -> None: self._stream_config = Mock() @@ -307,37 +321,27 @@ def setUp(self) -> None: use_file_transfer=True, ) - self._stream_not_mirroring = DefaultFileBasedStream( - config=self._stream_config, - catalog_schema=self._catalog_schema, - stream_reader=self._stream_reader, - availability_strategy=self._availability_strategy, - discovery_policy=self._discovery_policy, - parsers={MockFormat: self._parser}, - validation_policy=self._validation_policy, - cursor=self._cursor, - errors_collector=FileBasedErrorsCollector(), - use_file_transfer=True, - preserve_directory_structure=False, - ) - def test_when_read_records_from_slice_then_return_records(self) -> None: """Verify that we have the new file method and data is empty""" - with mock.patch.object(FileTransfer, "get_file", return_value=[self._A_RECORD]): - messages = list( - self._stream.read_records_from_slice( - {"files": [RemoteFile(uri="uri", last_modified=self._NOW)]} - ) - ) - assert list(map(lambda message: message.record.file, messages)) == [self._A_RECORD] - assert list(map(lambda message: message.record.data, messages)) == [{}] + with mock.patch.object( + FileTransfer, + "upload", + return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)], + ): + remote_file = RemoteFile(uri="uri", last_modified=self._NOW) + messages = list(self._stream.read_records_from_slice({"files": [remote_file]})) - def test_when_transform_record_then_return_updated_record(self) -> None: - file = RemoteFile(uri="uri", last_modified=self._NOW) - last_updated = int(self._NOW.timestamp()) * 1000 - transformed_record = self._stream.transform_record_for_file_transfer(self._A_RECORD, file) - assert transformed_record[self._stream.modified] == last_updated - assert transformed_record[self._stream.source_file_url] == file.uri + assert list(map(lambda message: message.record.file_reference, messages)) == [ + self._A_FILE_REFERENCE_MESSAGE + ] + assert list(map(lambda message: message.record.data, messages)) == [ + { + "bytes": 10, + "filename": "file.csv", + "folder": "/absolute/path/", + "source_uri": "file:///absolute/path/file.csv", + } + ] def test_when_compute_slices(self) -> None: all_files = [ @@ -467,3 +471,86 @@ def test_when_compute_slices_with_duplicates(self) -> None: assert "2 duplicates found for file name monthly-kickoff-202402.mpeg" in str(exc_info.value) assert "2 duplicates found for file name monthly-kickoff-202401.mpeg" in str(exc_info.value) assert "3 duplicates found for file name monthly-kickoff-202403.mpeg" in str(exc_info.value) + + +class DefaultFileBasedStreamSchemaTest(unittest.TestCase): + _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc) + _A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference( + file_size_bytes=10, + source_file_relative_path="relative/path/file.csv", + staging_file_url="/absolute/path/file.csv", + ) + + def setUp(self) -> None: + self._stream_config = Mock(spec=FileBasedStreamConfig) + self._stream_config.format = MockFormat() + self._stream_config.name = "a stream name" + self._stream_config.input_schema = "" + self._stream_config.schemaless = False + self._stream_config.primary_key = [] + self._catalog_schema = Mock() + self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) + self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy) + self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) + self._parser = Mock(spec=FileTypeParser) + self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy) + self._validation_policy.name = "validation policy name" + self._cursor = Mock(spec=AbstractFileBasedCursor) + + def test_non_file_based_stream(self) -> None: + """ + Test that the stream is correct when file transfer is not used. + """ + non_file_based_stream = DefaultFileBasedStream( + config=self._stream_config, + catalog_schema=self._catalog_schema, + stream_reader=self._stream_reader, + availability_strategy=self._availability_strategy, + discovery_policy=self._discovery_policy, + parsers={MockFormat: self._parser}, + validation_policy=self._validation_policy, + cursor=self._cursor, + errors_collector=FileBasedErrorsCollector(), + use_file_transfer=False, + ) + with ( + mock.patch.object(non_file_based_stream, "get_json_schema", return_value={}), + mock.patch.object( + DefaultFileBasedStream, + "primary_key", + new_callable=mock.PropertyMock, + return_value=["id"], + ), + ): + airbyte_stream = non_file_based_stream.as_airbyte_stream() + assert isinstance(airbyte_stream, AirbyteStream) + assert not airbyte_stream.is_file_based + + def test_file_based_stream(self) -> None: + """ + Test that the stream is correct when file transfer used. + """ + non_file_based_stream = DefaultFileBasedStream( + config=self._stream_config, + catalog_schema=self._catalog_schema, + stream_reader=self._stream_reader, + availability_strategy=self._availability_strategy, + discovery_policy=self._discovery_policy, + parsers={MockFormat: self._parser}, + validation_policy=self._validation_policy, + cursor=self._cursor, + errors_collector=FileBasedErrorsCollector(), + use_file_transfer=True, + ) + with ( + mock.patch.object(non_file_based_stream, "get_json_schema", return_value={}), + mock.patch.object( + DefaultFileBasedStream, + "primary_key", + new_callable=mock.PropertyMock, + return_value=["id"], + ), + ): + airbyte_stream = non_file_based_stream.as_airbyte_stream() + assert isinstance(airbyte_stream, AirbyteStream) + assert airbyte_stream.is_file_based diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index ace2999e0..8d04b946f 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -5,6 +5,7 @@ import logging from datetime import datetime from io import IOBase +from os import path from typing import Any, ClassVar, Dict, Iterable, List, Mapping, Optional, Set import pytest @@ -81,7 +82,7 @@ def open_file(self, file: RemoteFile) -> IOBase: def file_size(self, file: RemoteFile) -> int: return 0 - def get_file( + def upload( self, file: RemoteFile, local_directory: str, logger: logging.Logger ) -> Dict[str, Any]: return {} @@ -389,7 +390,7 @@ def test_globs_and_prefixes_from_globs( @pytest.mark.parametrize( - "config, source_file, expected_file_relative_path, expected_local_file_path, expected_absolute_file_path", + "config, source_file_path, expected_file_relative_path, expected_local_file_path", [ pytest.param( { @@ -402,7 +403,6 @@ def test_globs_and_prefixes_from_globs( "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", - "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", id="preserve_directories_present_and_true", ), pytest.param( @@ -416,7 +416,6 @@ def test_globs_and_prefixes_from_globs( "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "monthly-kickoff-202402.mpeg", "/tmp/transfer-files/monthly-kickoff-202402.mpeg", - "/tmp/transfer-files/monthly-kickoff-202402.mpeg", id="preserve_directories_present_and_false", ), pytest.param( @@ -424,7 +423,6 @@ def test_globs_and_prefixes_from_globs( "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", - "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", id="preserve_directories_not_present_defaults_true", ), pytest.param( @@ -432,29 +430,29 @@ def test_globs_and_prefixes_from_globs( "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", - "/tmp/transfer-files/mirror_paths_testing/not_duplicates/data/jan/monthly-kickoff-202402.mpeg", id="file_transfer_flag_not_present_defaults_true", ), ], ) def test_preserve_sub_directories_scenarios( config: Mapping[str, Any], - source_file: str, + source_file_path: str, expected_file_relative_path: str, expected_local_file_path: str, - expected_absolute_file_path: str, ) -> None: - remote_file = RemoteFile( - uri=source_file, - last_modified=datetime(2025, 1, 9, 11, 27, 20), - mime_type=None, - ) + """ + Test scenarios when preserve_directory_structure is True or False, the flag indicates whether we need to + use a relative path to upload the file or simply place it in the root. + """ reader = TestStreamReader() reader.config = TestSpec(**config) - file_relative_path, local_file_path, absolute_file_path = reader._get_file_transfer_paths( - remote_file, "/tmp/transfer-files/" + file_paths = reader._get_file_transfer_paths( + source_file_path, staging_directory="/tmp/transfer-files/" ) - assert file_relative_path == expected_file_relative_path - assert local_file_path == expected_local_file_path - assert absolute_file_path == expected_absolute_file_path + assert ( + file_paths[AbstractFileBasedStreamReader.FILE_RELATIVE_PATH] == expected_file_relative_path + ) + assert file_paths[AbstractFileBasedStreamReader.LOCAL_FILE_PATH] == expected_local_file_path + assert file_paths[AbstractFileBasedStreamReader.FILE_NAME] == path.basename(source_file_path) + assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path) diff --git a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py index 4397392ed..e3daa4249 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py @@ -117,6 +117,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -162,6 +163,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -194,6 +196,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -227,6 +230,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, }, { "json_schema": { @@ -238,6 +242,7 @@ "name": "stream2", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, }, ] } @@ -269,6 +274,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -302,6 +308,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -335,6 +342,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index c6197918e..185c5dceb 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -311,6 +311,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -351,6 +352,7 @@ "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]], "is_resumable": False, + "is_file_based": False, } ] } @@ -430,6 +432,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, }, { "json_schema": { @@ -442,6 +445,7 @@ "name": "stream2", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, }, ] } @@ -481,6 +485,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -522,6 +527,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } @@ -563,6 +569,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "is_resumable": False, + "is_file_based": False, } ] } diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index e288cdc1b..d6ea64583 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -86,7 +86,7 @@ def setUp(self): self._record.partition = self._partition self._record.data = self._record_data self._record.stream_name = _STREAM_NAME - self._record.is_file_transfer_message = False + self._record.file_reference = None def test_stream_is_not_done_initially(self): stream_instances_to_read_from = [self._stream] diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 43502f1f1..2c9afe4da 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -75,6 +75,7 @@ def test_as_airbyte_stream(self): source_defined_primary_key=None, namespace=None, is_resumable=False, + is_file_based=False, ) actual_airbyte_stream = self._stream.as_airbyte_stream() @@ -112,6 +113,7 @@ def test_as_airbyte_stream_with_primary_key(self): source_defined_primary_key=[["composite_key_1"], ["composite_key_2"]], namespace=None, is_resumable=False, + is_file_based=False, ) airbyte_stream = stream.as_airbyte_stream() @@ -149,6 +151,7 @@ def test_as_airbyte_stream_with_composite_primary_key(self): source_defined_primary_key=[["id_a"], ["id_b"]], namespace=None, is_resumable=False, + is_file_based=False, ) airbyte_stream = stream.as_airbyte_stream() @@ -186,6 +189,7 @@ def test_as_airbyte_stream_with_a_cursor(self): source_defined_primary_key=None, namespace=None, is_resumable=True, + is_file_based=False, ) airbyte_stream = stream.as_airbyte_stream() @@ -216,6 +220,39 @@ def test_as_airbyte_stream_with_namespace(self): source_defined_primary_key=None, namespace="test", is_resumable=False, + is_file_based=False, + ) + actual_airbyte_stream = stream.as_airbyte_stream() + + assert actual_airbyte_stream == expected_airbyte_stream + + def test_as_airbyte_stream_with_file_transfer_support(self): + stream = DefaultStream( + self._partition_generator, + self._name, + self._json_schema, + self._availability_strategy, + self._primary_key, + self._cursor_field, + self._logger, + FinalStateCursor( + stream_name=self._name, + stream_namespace=None, + message_repository=self._message_repository, + ), + namespace="test", + supports_file_transfer=True, + ) + expected_airbyte_stream = AirbyteStream( + name=self._name, + json_schema=self._json_schema, + supported_sync_modes=[SyncMode.full_refresh], + source_defined_cursor=None, + default_cursor_field=None, + source_defined_primary_key=None, + namespace="test", + is_resumable=False, + is_file_based=True, ) actual_airbyte_stream = stream.as_airbyte_stream() diff --git a/unit_tests/test/test_standard_tests.py b/unit_tests/test/test_standard_tests.py new file mode 100644 index 000000000..aa2d38a2b --- /dev/null +++ b/unit_tests/test/test_standard_tests.py @@ -0,0 +1,31 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Unit tests for FAST Airbyte Standard Tests.""" + +from typing import Any + +import pytest + +from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.source import Source +from airbyte_cdk.test.standard_tests._job_runner import IConnector + + +@pytest.mark.parametrize( + "input, expected", + [ + (DeclarativeSource, True), + (Source, True), + (None, False), + ("", False), + ([], False), + ({}, False), + (object(), False), + ], +) +def test_is_iconnector_check(input: Any, expected: bool) -> None: + """Assert whether inputs are valid as an IConnector object or class.""" + if isinstance(input, type): + assert issubclass(input, IConnector) == expected + return + + assert isinstance(input, IConnector) == expected