@@ -558,144 +558,146 @@ def update_status():
558
558
pool .put (self ._get , url = pre_signed .url , path = path )
559
559
560
560
561
+ MULTIPART_CHUNK_SIZE = int (15e6 ) # 15MB
562
+ PUT_TIMEOUT = 300 # 5 minutes
563
+
564
+
561
565
class PutDatasetFilesCommand (BaseDatasetFilesCommand ):
562
566
563
567
# @classmethod
564
- def _put (self , path , url , content_type , dataset_version_id = None , key = None ):
568
+ def _put (self , session , path , url , content_type , dataset_version_id = None , key = None ):
565
569
size = os .path .getsize (path )
566
- with requests .Session () as session :
567
- headers = {'Content-Type' : content_type }
570
+ headers = {'Content-Type' : content_type }
568
571
569
- try :
570
- if size <= 0 :
571
- headers .update ({'Content-Size' : '0' })
572
- r = session .put (url , data = '' , headers = headers , timeout = 5 )
573
- # for files under half a GB
574
- elif size <= (10e8 ) / 2 :
575
- with open (path , 'rb' ) as f :
576
- r = session .put (
577
- url , data = f , headers = headers , timeout = 5 )
578
- # # for chonky files, use a multipart upload
579
- else :
580
- # Chunks need to be at least 5MB or AWS throws an
581
- # EntityTooSmall error; we'll arbitrarily choose a
582
- # 15MB chunksize
583
- #
584
- # Note also that AWS limits the max number of chunkc
585
- # in a multipart upload to 10000, so this setting
586
- # currently enforces a hard limit on 150GB per file.
587
- #
588
- # We can dynamically assign a larger part size if needed,
589
- # but for the majority of use cases we should be fine
590
- # as-is
591
- part_minsize = int (15e6 )
592
- dataset_id , _ , version = dataset_version_id .partition (":" )
593
- mpu_url = f'/datasets/{ dataset_id } /versions/{ version } /s3/preSignedUrls'
594
-
595
- api_client = http_client .API (
596
- api_url = config .CONFIG_HOST ,
597
- api_key = self .api_key ,
598
- ps_client_name = CLI_PS_CLIENT_NAME
599
- )
600
-
601
- mpu_create_res = api_client .post (
602
- url = mpu_url ,
603
- json = {
604
- 'datasetId' : dataset_id ,
605
- 'version' : version ,
606
- 'calls' : [{
607
- 'method' : 'createMultipartUpload' ,
608
- 'params' : {'Key' : key }
609
- }]
610
- }
611
- )
612
- mpu_data = json .loads (mpu_create_res .text )[0 ]['url' ]
613
-
614
- parts = []
615
- with open (path , 'rb' ) as f :
616
- # we +2 the number of parts since we're doing floor
617
- # division, which will cut off any trailing part
618
- # less than the part_minsize, AND we want to 1-index
619
- # our range to match what AWS expects for part
620
- # numbers
621
- for part in range (1 , (size // part_minsize ) + 2 ):
622
- presigned_url_res = api_client .post (
623
- url = mpu_url ,
624
- json = {
625
- 'datasetId' : dataset_id ,
626
- 'version' : version ,
627
- 'calls' : [{
628
- 'method' : 'uploadPart' ,
629
- 'params' : {
630
- 'Key' : key ,
631
- 'UploadId' : mpu_data ['UploadId' ],
632
- 'PartNumber' : part
633
- }
634
- }]
635
- }
636
- )
637
-
638
- presigned_url = json .loads (
639
- presigned_url_res .text
640
- )[0 ]['url' ]
641
-
642
- chunk = f .read (part_minsize )
643
- for attempt in range (0 , 5 ):
644
- part_res = session .put (
645
- presigned_url ,
646
- data = chunk ,
647
- timeout = 5 )
648
- if part_res .status_code == 200 :
649
- break
650
-
651
- if part_res .status_code != 200 :
652
- # Why do we silence exceptions that get
653
- # explicitly raised? Mystery for the ages, but
654
- # there you have it I guess...
655
- print (f'\n Unable to complete upload of { path } ' )
656
- raise ApplicationError (
657
- f'Unable to complete upload of { path } ' )
658
- etag = part_res .headers ['ETag' ].replace ('"' , '' )
659
- parts .append ({'ETag' : etag , 'PartNumber' : part })
660
- # This is a pretty jank way to get about multipart
661
- # upload status updates, but we structure the Halo
662
- # spinner to report on the number of completed
663
- # tasks dispatched to the workers in the pool.
664
- # Since it's more of a PITA to properly distribute
665
- # this MPU among all workers than I really want to
666
- # deal with, that means we can't easily plug into
667
- # Halo for these updates. But we can print to
668
- # console! Which again, jank and noisy, but arguably
669
- # better than a task sitting forever, never either
670
- # completing or emitting an error message.
671
- if len (parts ) % 7 == 0 : # About every 100MB
672
- print (
673
- f'\n Uploaded { len (parts ) * part_minsize / 10e5 } MB '
674
- f'of { int (size / 10e5 )} MB for '
675
- f'{ path } '
676
- )
677
-
678
- r = api_client .post (
679
- url = mpu_url ,
680
- json = {
681
- 'datasetId' : dataset_id ,
682
- 'version' : version ,
683
- 'calls' : [{
684
- 'method' : 'completeMultipartUpload' ,
685
- 'params' : {
686
- 'Key' : key ,
687
- 'UploadId' : mpu_data ['UploadId' ],
688
- 'MultipartUpload' : {'Parts' : parts }
689
- }
690
- }]
691
- }
692
- )
693
-
694
- self .validate_s3_response (r )
695
- except requests .exceptions .ConnectionError as e :
696
- return self .report_connection_error (e )
697
- except Exception as e :
698
- return e
572
+ try :
573
+ if size <= 0 :
574
+ headers .update ({'Content-Size' : '0' })
575
+ r = session .put (url , data = '' , headers = headers , timeout = 5 )
576
+ # for files under 15MB
577
+ elif size <= (MULTIPART_CHUNK_SIZE ):
578
+ with open (path , 'rb' ) as f :
579
+ r = session .put (
580
+ url , data = f , headers = headers , timeout = PUT_TIMEOUT )
581
+ # # for chonky files, use a multipart upload
582
+ else :
583
+ # Chunks need to be at least 5MB or AWS throws an
584
+ # EntityTooSmall error; we'll arbitrarily choose a
585
+ # 15MB chunksize
586
+ #
587
+ # Note also that AWS limits the max number of chunks
588
+ # in a multipart upload to 10000, so this setting
589
+ # currently enforces a hard limit on 150GB per file.
590
+ #
591
+ # We can dynamically assign a larger part size if needed,
592
+ # but for the majority of use cases we should be fine
593
+ # as-is
594
+ part_minsize = MULTIPART_CHUNK_SIZE
595
+ dataset_id , _ , version = dataset_version_id .partition (":" )
596
+ mpu_url = f'/datasets/{ dataset_id } /versions/{ version } /s3/preSignedUrls'
597
+ api_client = http_client .API (
598
+ api_url = config .CONFIG_HOST ,
599
+ api_key = self .api_key ,
600
+ ps_client_name = CLI_PS_CLIENT_NAME
601
+ )
602
+
603
+ mpu_create_res = api_client .post (
604
+ url = mpu_url ,
605
+ json = {
606
+ 'datasetId' : dataset_id ,
607
+ 'version' : version ,
608
+ 'calls' : [{
609
+ 'method' : 'createMultipartUpload' ,
610
+ 'params' : {'Key' : key }
611
+ }]
612
+ }
613
+ )
614
+
615
+ mpu_data = mpu_create_res .json ()[0 ]['url' ]
616
+
617
+ parts = []
618
+ with open (path , 'rb' ) as f :
619
+ # we +2 the number of parts since we're doing floor
620
+ # division, which will cut off any trailing part
621
+ # less than the part_minsize, AND we want to 1-index
622
+ # our range to match what AWS expects for part
623
+ # numbers
624
+ for part in range (1 , (size // part_minsize ) + 2 ):
625
+ presigned_url_res = api_client .post (
626
+ url = mpu_url ,
627
+ json = {
628
+ 'datasetId' : dataset_id ,
629
+ 'version' : version ,
630
+ 'calls' : [{
631
+ 'method' : 'uploadPart' ,
632
+ 'params' : {
633
+ 'Key' : key ,
634
+ 'UploadId' : mpu_data ['UploadId' ],
635
+ 'PartNumber' : part
636
+ }
637
+ }]
638
+ }
639
+ )
640
+
641
+ presigned_url = presigned_url_res .json ()[0 ]['url' ]
642
+
643
+ chunk = f .read (part_minsize )
644
+
645
+ for attempt in range (0 , 5 ):
646
+ part_res = session .put (
647
+ presigned_url ,
648
+ data = chunk ,
649
+ headers = headers ,
650
+ timeout = PUT_TIMEOUT )
651
+
652
+ if part_res .status_code == 200 :
653
+ break
654
+
655
+ if part_res .status_code != 200 :
656
+ # Why do we silence exceptions that get
657
+ # explicitly raised? Mystery for the ages, but
658
+ # there you have it I guess...
659
+ print (f'\n Unable to complete upload of { path } ' )
660
+ raise ApplicationError (
661
+ f'Unable to complete upload of { path } ' )
662
+ etag = part_res .headers ['ETag' ].replace ('"' , '' )
663
+ parts .append ({'ETag' : etag , 'PartNumber' : part })
664
+ # This is a pretty jank way to get about multipart
665
+ # upload status updates, but we structure the Halo
666
+ # spinner to report on the number of completed
667
+ # tasks dispatched to the workers in the pool.
668
+ # Since it's more of a PITA to properly distribute
669
+ # this MPU among all workers than I really want to
670
+ # deal with, that means we can't easily plug into
671
+ # Halo for these updates. But we can print to
672
+ # console! Which again, jank and noisy, but arguably
673
+ # better than a task sitting forever, never either
674
+ # completing or emitting an error message.
675
+ print (
676
+ f'\n Uploaded { len (parts ) * part_minsize / 10e5 } MB '
677
+ f'of { int (size / 10e5 )} MB for '
678
+ f'{ path } '
679
+ )
680
+
681
+ r = api_client .post (
682
+ url = mpu_url ,
683
+ json = {
684
+ 'datasetId' : dataset_id ,
685
+ 'version' : version ,
686
+ 'calls' : [{
687
+ 'method' : 'completeMultipartUpload' ,
688
+ 'params' : {
689
+ 'Key' : key ,
690
+ 'UploadId' : mpu_data ['UploadId' ],
691
+ 'MultipartUpload' : {'Parts' : parts }
692
+ }
693
+ }]
694
+ }
695
+ )
696
+
697
+ except requests .exceptions .ConnectionError as e :
698
+ return self .report_connection_error (e )
699
+ except Exception as e :
700
+ return e
699
701
700
702
@staticmethod
701
703
def _list_files (source_path ):
@@ -718,15 +720,16 @@ def _sign_and_put(self, dataset_version_id, pool, results, update_status):
718
720
Key = r ['key' ], ContentType = r ['mimetype' ])) for r in results ],
719
721
)
720
722
721
- for pre_signed , result in zip (pre_signeds , results ):
722
- update_status ()
723
- pool .put (
724
- self ._put ,
725
- url = pre_signed .url ,
726
- path = result ['path' ],
727
- content_type = result ['mimetype' ],
728
- dataset_version_id = dataset_version_id ,
729
- key = result ['key' ])
723
+ with requests .Session () as session :
724
+ for pre_signed , result in zip (pre_signeds , results ):
725
+ update_status ()
726
+ pool .put (self ._put ,
727
+ session ,
728
+ result ['path' ],
729
+ pre_signed .url ,
730
+ content_type = result ['mimetype' ],
731
+ dataset_version_id = dataset_version_id ,
732
+ key = result ['key' ])
730
733
731
734
def execute (self , dataset_version_id , source_paths , target_path ):
732
735
self .assert_supported (dataset_version_id )
0 commit comments