|
30 | 30 | FileUploadCompleteFutureResponse, |
31 | 31 | FileUploadCompleteResponse, |
32 | 32 | FileUploadCompleteState, |
33 | | - FileUploadCompletionBody, |
34 | 33 | FileUploadSchema, |
35 | 34 | LinkType, |
36 | 35 | PresignedLink, |
37 | 36 | SoftCopyBody, |
38 | | - UploadedPart, |
39 | 37 | ) |
40 | 38 | from models_library.projects import ProjectID |
41 | 39 | from models_library.projects_nodes_io import LocationID, NodeID, SimcoreS3FileID |
|
47 | 45 | from pytest_simcore.helpers.httpx_assert_checks import assert_status |
48 | 46 | from pytest_simcore.helpers.logging_tools import log_context |
49 | 47 | from pytest_simcore.helpers.parametrizations import byte_size_ids |
50 | | -from pytest_simcore.helpers.s3 import upload_file_part, upload_file_to_presigned_link |
| 48 | +from pytest_simcore.helpers.s3 import upload_file_part |
51 | 49 | from pytest_simcore.helpers.storage_utils import FileIDDict, ProjectWithFilesParams |
52 | 50 | from pytest_simcore.helpers.storage_utils_file_meta_data import ( |
53 | 51 | assert_file_meta_data_in_db, |
54 | 52 | ) |
55 | 53 | from servicelib.aiohttp import status |
56 | 54 | from simcore_service_storage.constants import S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID |
57 | 55 | from simcore_service_storage.models import FileDownloadResponse, S3BucketName, UploadID |
| 56 | +from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker |
58 | 57 | from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager |
59 | 58 | from sqlalchemy.ext.asyncio import AsyncEngine |
60 | 59 | from tenacity.asyncio import AsyncRetrying |
@@ -617,112 +616,6 @@ async def test_upload_real_file( |
617 | 616 | await upload_file(file_size, complex_file_name) |
618 | 617 |
|
619 | 618 |
|
620 | | -@pytest.mark.parametrize( |
621 | | - "location_id", |
622 | | - [SimcoreS3DataManager.get_location_id()], |
623 | | - ids=[SimcoreS3DataManager.get_location_name()], |
624 | | - indirect=True, |
625 | | -) |
626 | | -@pytest.mark.parametrize( |
627 | | - "file_size", |
628 | | - [ |
629 | | - (TypeAdapter(ByteSize).validate_python("1Mib")), |
630 | | - (TypeAdapter(ByteSize).validate_python("117Mib")), |
631 | | - ], |
632 | | - ids=byte_size_ids, |
633 | | -) |
634 | | -async def test_upload_real_file_with_emulated_storage_restart_after_completion_was_called( |
635 | | - complex_file_name: str, |
636 | | - file_size: ByteSize, |
637 | | - initialized_app: FastAPI, |
638 | | - client: httpx.AsyncClient, |
639 | | - user_id: UserID, |
640 | | - project_id: ProjectID, |
641 | | - node_id: NodeID, |
642 | | - location_id: LocationID, |
643 | | - create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], |
644 | | - create_file_of_size: Callable[[ByteSize, str | None], Path], |
645 | | - create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]], |
646 | | - sqlalchemy_async_engine: AsyncEngine, |
647 | | - storage_s3_client: SimcoreS3API, |
648 | | - storage_s3_bucket: S3BucketName, |
649 | | -): |
650 | | - """what does that mean? |
651 | | - storage runs the completion tasks in the background, |
652 | | - if after running the completion task, storage restarts then the task is lost. |
653 | | - Nevertheless the client still has a reference to the completion future and shall be able |
654 | | - to ask for its status""" |
655 | | - |
656 | | - file = create_file_of_size(file_size, complex_file_name) |
657 | | - file_id = create_simcore_file_id(project_id, node_id, complex_file_name) |
658 | | - file_upload_link = await create_upload_file_link_v2( |
659 | | - file_id, link_type="PRESIGNED", file_size=file_size |
660 | | - ) |
661 | | - # upload the file |
662 | | - part_to_etag: list[UploadedPart] = await upload_file_to_presigned_link( |
663 | | - file, file_upload_link |
664 | | - ) |
665 | | - # complete the upload |
666 | | - complete_url = URL(f"{file_upload_link.links.complete_upload}").relative() |
667 | | - response = await client.post( |
668 | | - f"{complete_url}", |
669 | | - json=jsonable_encoder(FileUploadCompletionBody(parts=part_to_etag)), |
670 | | - ) |
671 | | - response.raise_for_status() |
672 | | - file_upload_complete_response, error = assert_status( |
673 | | - response, status.HTTP_202_ACCEPTED, FileUploadCompleteResponse |
674 | | - ) |
675 | | - assert not error |
676 | | - assert file_upload_complete_response |
677 | | - state_url = URL(f"{file_upload_complete_response.links.state}").relative() |
678 | | - |
679 | | - # now check for the completion |
680 | | - completion_etag = None |
681 | | - async for attempt in AsyncRetrying( |
682 | | - reraise=True, |
683 | | - wait=wait_fixed(1), |
684 | | - stop=stop_after_delay(60), |
685 | | - retry=retry_if_exception_type(AssertionError), |
686 | | - ): |
687 | | - with ( |
688 | | - attempt, |
689 | | - log_context( |
690 | | - logging.INFO, |
691 | | - f"waiting for upload completion {state_url=}, {attempt.retry_state.attempt_number}", |
692 | | - ) as ctx, |
693 | | - ): |
694 | | - response = await client.post(f"{state_url}") |
695 | | - future, error = assert_status( |
696 | | - response, status.HTTP_200_OK, FileUploadCompleteFutureResponse |
697 | | - ) |
698 | | - assert not error |
699 | | - assert future |
700 | | - assert future.state == FileUploadCompleteState.OK |
701 | | - assert future.e_tag is not None |
702 | | - completion_etag = future.e_tag |
703 | | - ctx.logger.info( |
704 | | - "%s", |
705 | | - f"--> done waiting, data is completely uploaded [{attempt.retry_state.retry_object.statistics}]", |
706 | | - ) |
707 | | - # check the entry in db now has the correct file size, and the upload id is gone |
708 | | - await assert_file_meta_data_in_db( |
709 | | - sqlalchemy_async_engine, |
710 | | - file_id=file_id, |
711 | | - expected_entry_exists=True, |
712 | | - expected_file_size=file_size, |
713 | | - expected_upload_id=False, |
714 | | - expected_upload_expiration_date=False, |
715 | | - expected_sha256_checksum=None, |
716 | | - ) |
717 | | - # check the file is in S3 for real |
718 | | - s3_metadata = await storage_s3_client.get_object_metadata( |
719 | | - bucket=storage_s3_bucket, object_key=file_id |
720 | | - ) |
721 | | - assert s3_metadata.size == file_size |
722 | | - assert s3_metadata.last_modified |
723 | | - assert s3_metadata.e_tag == completion_etag |
724 | | - |
725 | | - |
726 | 619 | @pytest.mark.parametrize( |
727 | 620 | "location_id", |
728 | 621 | [SimcoreS3DataManager.get_location_id()], |
@@ -790,6 +683,7 @@ async def test_upload_real_file_with_s3_client( |
790 | 683 | node_id: NodeID, |
791 | 684 | faker: Faker, |
792 | 685 | s3_client: S3Client, |
| 686 | + with_storage_celery_worker: CeleryTaskQueueWorker, |
793 | 687 | ): |
794 | 688 | file_size = TypeAdapter(ByteSize).validate_python("500Mib") |
795 | 689 | file_name = faker.file_name() |
|
0 commit comments