Skip to content

Commit af6d147

Browse files
committed
mismi-s3: Smarter uploadRecursive
Uploads of multiple small files is now concurrent.
1 parent 4792344 commit af6d147

File tree

4 files changed

+40
-33
lines changed

4 files changed

+40
-33
lines changed

mismi-s3/mismi-s3.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ library
3535
, filepath >= 1.3 && < 1.5
3636
, http-client >= 0.4.18 && < 0.5
3737
, http-types == 0.8.*
38+
, lifted-async == 0.9.*
3839
, mtl >= 2.1 && < 2.3
3940
, process >= 1.2 && < 1.5
4041
, resourcet == 1.1.*

mismi-s3/src/Mismi/S3/Commands.hs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ module Mismi.S3.Commands (
7070
) where
7171

7272
import Control.Arrow ((***))
73-
73+
import Control.Concurrent.Async.Lifted (mapConcurrently_)
7474
import Control.Exception (ioError)
7575
import qualified Control.Exception as CE
7676
import Control.Lens ((.~), (^.), to, view)
@@ -124,7 +124,6 @@ import System.Directory (createDirectoryIfMissing, doesFileExist, getD
124124
import System.FilePath (FilePath, (</>), takeDirectory)
125125
import System.Posix.IO (OpenMode(..), openFd, closeFd, fdSeek, defaultFileFlags)
126126
import System.Posix.Files (fileSize, getFileStatus, isDirectory, isRegularFile)
127-
import System.Posix.Types (FileOffset)
128127
import qualified "unix-bytestring" System.Posix.IO.ByteString as UBS
129128

130129
import System.Timeout.Lifted (timeout)
@@ -372,13 +371,13 @@ upload :: FilePath -> Address -> EitherT UploadError AWS ()
372371
upload =
373372
uploadWithMode Fail
374373

375-
uploadRecursive :: FilePath -> Address -> EitherT UploadError AWS ()
374+
uploadRecursive :: FilePath -> Address -> Int -> EitherT UploadError AWS ()
376375
uploadRecursive =
377376
uploadRecursiveWithMode Fail
378377

379-
uploadRecursiveOrFail :: FilePath -> Address -> AWS ()
380-
uploadRecursiveOrFail f a =
381-
eitherT hoistUploadError pure $ uploadRecursive f a
378+
uploadRecursiveOrFail :: FilePath -> Address -> Int -> AWS ()
379+
uploadRecursiveOrFail f a i =
380+
eitherT hoistUploadError pure $ uploadRecursive f a i
382381

383382
uploadOrFail :: FilePath -> Address -> AWS ()
384383
uploadOrFail f a =
@@ -388,9 +387,9 @@ uploadWithModeOrFail :: WriteMode -> FilePath -> Address -> AWS ()
388387
uploadWithModeOrFail w f a =
389388
eitherT hoistUploadError pure $ uploadWithMode w f a
390389

391-
uploadRecursiveWithModeOrFail :: WriteMode -> FilePath -> Address -> AWS ()
392-
uploadRecursiveWithModeOrFail w f a =
393-
eitherT hoistUploadError pure $ uploadRecursiveWithMode w f a
390+
uploadRecursiveWithModeOrFail :: WriteMode -> FilePath -> Address -> Int -> AWS ()
391+
uploadRecursiveWithModeOrFail w f a i =
392+
eitherT hoistUploadError pure $ uploadRecursiveWithMode w f a i
394393

395394
hoistUploadError :: UploadError -> AWS ()
396395
hoistUploadError e =
@@ -439,11 +438,11 @@ uploadSingle file a = do
439438
void . send $ f' A.putObject a rq & A.poServerSideEncryption .~ pure sse
440439

441440
multipartUpload :: FilePath -> Address -> Integer -> Integer -> Int -> EitherT UploadError AWS ()
442-
multipartUpload file a fileSize chunk fork = do
441+
multipartUpload file a fSize chunk fork = do
443442
e <- ask
444443
mpu <- lift $ createMultipartUpload a
445444

446-
let chunks = calculateChunksCapped (fromInteger fileSize) (fromInteger chunk) 4096 -- max 4096 prts returned
445+
let chunks = calculateChunksCapped (fromInteger fSize) (fromInteger chunk) 4096 -- max 4096 prts returned
447446

448447
r <- liftIO $
449448
consume (forM_ chunks . writeQueue) fork $ multipartUploadWorker e mpu file a
@@ -481,40 +480,46 @@ multipartUploadWorker e mpu file a (o, c, i) =
481480
pure $! Right $! PartResponse i m
482481

483482

484-
uploadRecursiveWithMode :: WriteMode -> FilePath -> Address -> EitherT UploadError AWS ()
485-
uploadRecursiveWithMode m src (Address buck ky) = do
483+
uploadRecursiveWithMode :: WriteMode -> FilePath -> Address -> Int -> EitherT UploadError AWS ()
484+
uploadRecursiveWithMode mode src (Address buck ky) fork = do
486485
es <- tryIO $ getFileStatus src
487486
case es of
488487
Left _ -> left $ UploadSourceMissing src
489488
Right st -> unless (isDirectory st) . left $ UploadSourceNotDirectory src
490-
files <- fmap fst <$> liftIO (listRecursivelyLocal src)
491-
let outputAddrs = fmap (\fp -> Address buck (ky // Key (T.pack $ L.drop prefixLen fp))) files
492-
mapM_ (uncurry (uploadWithMode m)) $ L.zip files outputAddrs
489+
files <- liftIO (listRecursivelyLocal src)
490+
mapM_ uploadFiles $ chunkFilesBySize fork (fromIntegral bigChunkSize) files
493491
where
492+
uploadFiles :: [(FilePath, Int64)] -> EitherT UploadError AWS ()
493+
uploadFiles [] = pure ()
494+
uploadFiles [(f,s)]
495+
| fromIntegral s < bigChunkSize = lift . uploadSingle f $ uploadAddress f
496+
| otherwise = uploadWithMode mode f $ uploadAddress f
497+
uploadFiles xs =
498+
mapConcurrently_ (\ (f, _) -> lift . uploadSingle f $ uploadAddress f) xs
499+
500+
494501
prefixLen = L.length (src </> "a") - 1
495502

496-
-- uploadAddress :: FilePath -> Address
497-
-- uploadAddress fp = Address buck (ky // Key (T.pack $ L.drop prefixLen fp))
503+
uploadAddress :: FilePath -> Address
504+
uploadAddress fp = Address buck (ky // Key (T.pack $ L.drop prefixLen fp))
498505

499-
-- Take a list of files and their sizes, and return a list of list of files
500-
-- where the total size of of the files in the sub list is less than `bigChunkSize`
506+
-- Take a list of files and their sizes, and convert it to a list of tests
507+
-- where the total size of of the files in the sub list is less than `maxSize`
501508
-- and the length of the sub lists is <= `maxCount`.
502-
chunkFilesBySize :: Int -> Int64 -> [(FilePath, Int64)] -> [[FilePath]]
509+
chunkFilesBySize :: Int -> Int64 -> [(FilePath, Int64)] -> [[(FilePath, Int64)]]
503510
chunkFilesBySize maxCount maxSize =
504511
takeFiles 0 [] . L.sortOn snd
505512
where
506-
takeFiles :: Int64 -> [FilePath] -> [(FilePath, Int64)] -> [[FilePath]]
513+
takeFiles :: Int64 -> [(FilePath, Int64)] -> [(FilePath, Int64)] -> [[(FilePath, Int64)]]
507514
takeFiles _ acc [] = [acc]
508515
takeFiles current acc ((x, s):xs) =
509516
if current + s < maxSize && L.length acc < maxCount
510-
then takeFiles (current + s) (x:acc) xs
511-
else (x:acc) : takeFiles 0 [] xs
512-
513-
-- bigChunkSize
517+
then takeFiles (current + s) ((x, s):acc) xs
518+
else ((x, s):acc) : takeFiles 0 [] xs
514519

515520
-- | Like `listRecursively` but for the local filesystem.
516521
-- Also returns
517-
listRecursivelyLocal :: MonadIO m => FilePath -> m [(FilePath, FileOffset)]
522+
listRecursivelyLocal :: MonadIO m => FilePath -> m [(FilePath, Int64)]
518523
listRecursivelyLocal topdir = do
519524
entries <- liftIO $ listDirectory topdir
520525
(dirs, files) <- liftIO . partitionDirsFilesWithSizes $ fmap (topdir </>) entries
@@ -530,16 +535,17 @@ listDirectory path =
530535
f filename =
531536
filename /= "." && filename /= ".."
532537

533-
partitionDirsFilesWithSizes :: MonadIO m => [FilePath] -> m ([FilePath], [(FilePath, FileOffset)])
538+
partitionDirsFilesWithSizes :: MonadIO m => [FilePath] -> m ([FilePath], [(FilePath, Int64)])
534539
partitionDirsFilesWithSizes =
535540
pworker ([], [])
536541
where
537542
pworker (dirs, files) [] = pure (dirs, files)
538543
pworker (dirs, files) (x:xs) = do
539544
xstat <- liftIO $ getFileStatus x
540-
pworker
541-
(if isDirectory xstat then x : dirs else dirs, if isRegularFile xstat then (x, fileSize xstat) : files else files)
542-
xs
545+
let xsize = fromIntegral $ fileSize xstat
546+
newDirs = if isDirectory xstat then x : dirs else dirs
547+
newFiles = if isRegularFile xstat then (x, xsize) : files else files
548+
pworker (newDirs, newFiles) xs
543549

544550
write :: Address -> Text -> AWS WriteResult
545551
write =

mismi-s3/test/Test/IO/Mismi/S3/Commands.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ prop_upload_recursive = once . testAWS $ do
532532

533533
addr <- withKey (// Key "top") <$> newAddress
534534

535-
eitherT (fail . show) pure $ uploadRecursive tmpdir addr
535+
eitherT (fail . show) pure $ uploadRecursive tmpdir addr 2
536536

537537
a <- read (withKey (// Key "a") addr)
538538
c <- read (withKey (// Key "b/c") addr)

mismi-s3/test/Test/Mismi/S3/Commands.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ prop_chunk_files_by_size =
4747
QC.forAll (QC.choose (1000, 10000)) $ \ maxChunkSize ->
4848
QC.forAll (fileNameSizePairs fileCount) $ \ pairs ->
4949
let chunks = chunkFilesBySize maxFilesPerChunk maxChunkSize pairs
50-
chunkSizes = DL.map (multiChunkSum (DM.fromList pairs)) chunks
50+
chunkSizes = DL.map (multiChunkSum (DM.fromList pairs) . DL.map fst) chunks
5151
in
5252
DL.filter (> maxChunkSize) chunkSizes === []
5353

0 commit comments

Comments
 (0)