|
18 | 18 | AirbyteRecordMessageFileReference, |
19 | 19 | Level, |
20 | 20 | ) |
21 | | -from airbyte_cdk.models import Type as MessageType |
| 21 | +from airbyte_cdk.models import AirbyteStream, Type as MessageType |
| 22 | +from airbyte_cdk.sources.file_based import FileBasedStreamConfig |
22 | 23 | from airbyte_cdk.sources.file_based.availability_strategy import ( |
23 | 24 | AbstractFileBasedAvailabilityStrategy, |
24 | 25 | ) |
@@ -312,20 +313,6 @@ def setUp(self) -> None: |
312 | 313 | use_file_transfer=True, |
313 | 314 | ) |
314 | 315 |
|
315 | | - self._stream_not_mirroring = DefaultFileBasedStream( |
316 | | - config=self._stream_config, |
317 | | - catalog_schema=self._catalog_schema, |
318 | | - stream_reader=self._stream_reader, |
319 | | - availability_strategy=self._availability_strategy, |
320 | | - discovery_policy=self._discovery_policy, |
321 | | - parsers={MockFormat: self._parser}, |
322 | | - validation_policy=self._validation_policy, |
323 | | - cursor=self._cursor, |
324 | | - errors_collector=FileBasedErrorsCollector(), |
325 | | - use_file_transfer=True, |
326 | | - preserve_directory_structure=False, |
327 | | - ) |
328 | | - |
329 | 316 | def test_when_read_records_from_slice_then_return_records(self) -> None: |
330 | 317 | """Verify that we have the new file method and data is empty""" |
331 | 318 | with mock.patch.object( |
@@ -474,3 +461,86 @@ def test_when_compute_slices_with_duplicates(self) -> None: |
474 | 461 | assert "2 duplicates found for file name monthly-kickoff-202402.mpeg" in str(exc_info.value) |
475 | 462 | assert "2 duplicates found for file name monthly-kickoff-202401.mpeg" in str(exc_info.value) |
476 | 463 | assert "3 duplicates found for file name monthly-kickoff-202403.mpeg" in str(exc_info.value) |
| 464 | + |
| 465 | + |
| 466 | +class DefaultFileBasedStreamSchemaTest(unittest.TestCase): |
| 467 | + _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc) |
| 468 | + _A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference( |
| 469 | + file_size_bytes=10, |
| 470 | + source_file_relative_path="relative/path/file.csv", |
| 471 | + staging_file_url="/absolute/path/file.csv", |
| 472 | + ) |
| 473 | + |
| 474 | + def setUp(self) -> None: |
| 475 | + self._stream_config = Mock(spec=FileBasedStreamConfig) |
| 476 | + self._stream_config.format = MockFormat() |
| 477 | + self._stream_config.name = "a stream name" |
| 478 | + self._stream_config.input_schema = "" |
| 479 | + self._stream_config.schemaless = False |
| 480 | + self._stream_config.primary_key = [] |
| 481 | + self._catalog_schema = Mock() |
| 482 | + self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) |
| 483 | + self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy) |
| 484 | + self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) |
| 485 | + self._parser = Mock(spec=FileTypeParser) |
| 486 | + self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy) |
| 487 | + self._validation_policy.name = "validation policy name" |
| 488 | + self._cursor = Mock(spec=AbstractFileBasedCursor) |
| 489 | + |
| 490 | + def test_non_file_based_stream(self) -> None: |
| 491 | + """ |
| 492 | + Test that the stream is correct when file transfer is not used. |
| 493 | + """ |
| 494 | + non_file_based_stream = DefaultFileBasedStream( |
| 495 | + config=self._stream_config, |
| 496 | + catalog_schema=self._catalog_schema, |
| 497 | + stream_reader=self._stream_reader, |
| 498 | + availability_strategy=self._availability_strategy, |
| 499 | + discovery_policy=self._discovery_policy, |
| 500 | + parsers={MockFormat: self._parser}, |
| 501 | + validation_policy=self._validation_policy, |
| 502 | + cursor=self._cursor, |
| 503 | + errors_collector=FileBasedErrorsCollector(), |
| 504 | + use_file_transfer=False, |
| 505 | + ) |
| 506 | + with ( |
| 507 | + mock.patch.object(non_file_based_stream, "get_json_schema", return_value={}), |
| 508 | + mock.patch.object( |
| 509 | + DefaultFileBasedStream, |
| 510 | + "primary_key", |
| 511 | + new_callable=mock.PropertyMock, |
| 512 | + return_value=["id"], |
| 513 | + ), |
| 514 | + ): |
| 515 | + airbyte_stream = non_file_based_stream.as_airbyte_stream() |
| 516 | + assert isinstance(airbyte_stream, AirbyteStream) |
| 517 | + assert not airbyte_stream.is_file_based |
| 518 | + |
| 519 | + def test_file_based_stream(self) -> None: |
| 520 | + """ |
| 521 | + Test that the stream is correct when file transfer used. |
| 522 | + """ |
| 523 | + non_file_based_stream = DefaultFileBasedStream( |
| 524 | + config=self._stream_config, |
| 525 | + catalog_schema=self._catalog_schema, |
| 526 | + stream_reader=self._stream_reader, |
| 527 | + availability_strategy=self._availability_strategy, |
| 528 | + discovery_policy=self._discovery_policy, |
| 529 | + parsers={MockFormat: self._parser}, |
| 530 | + validation_policy=self._validation_policy, |
| 531 | + cursor=self._cursor, |
| 532 | + errors_collector=FileBasedErrorsCollector(), |
| 533 | + use_file_transfer=True, |
| 534 | + ) |
| 535 | + with ( |
| 536 | + mock.patch.object(non_file_based_stream, "get_json_schema", return_value={}), |
| 537 | + mock.patch.object( |
| 538 | + DefaultFileBasedStream, |
| 539 | + "primary_key", |
| 540 | + new_callable=mock.PropertyMock, |
| 541 | + return_value=["id"], |
| 542 | + ), |
| 543 | + ): |
| 544 | + airbyte_stream = non_file_based_stream.as_airbyte_stream() |
| 545 | + assert isinstance(airbyte_stream, AirbyteStream) |
| 546 | + assert airbyte_stream.is_file_based |
0 commit comments