Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ class S3FileSystem(AsyncFileSystem):
session : aiobotocore AioSession object to be used for all connections.
This session will be used inplace of creating a new session inside S3FileSystem.
For example: aiobotocore.session.AioSession(profile='test_user')
max_concurrency : int (None)
If given, the maximum number of concurrent transfers to use for a
multipart upload. Defaults to 1 (multipart uploads will be done sequentially).
Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)``
the result will be a maximum of ``max_concurrency * batch_size`` concurrent
transfers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If given, the maximum number of concurrent transfers to use for a
multipart upload. Defaults to 1 (multipart uploads will be done sequentially).
Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)``
the result will be a maximum of ``max_concurrency * batch_size`` concurrent
transfers.
The maximum number of concurrent transfers to use per file for a
multipart upload (``put()``) operations. Defaults to 1 (sequential).
When used in conjunction with ``S3FileSystem.put(batch_size=...)``
the maximum number of simultaneous connections is ``max_concurrency * batch_size``.
We may extend this parameter to affect ``pipe()``, ``cat()`` and ``get()``.


The following parameters are passed on to fsspec:

Expand Down Expand Up @@ -282,6 +288,7 @@ def __init__(
cache_regions=False,
asynchronous=False,
loop=None,
max_concurrency=None,
**kwargs,
):
if key and username:
Expand Down Expand Up @@ -319,6 +326,7 @@ def __init__(
self.cache_regions = cache_regions
self._s3 = None
self.session = session
self.max_concurrency = max_concurrency

@property
def s3(self):
Expand Down Expand Up @@ -1140,7 +1148,13 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
self.invalidate_cache(path)

async def _put_file(
self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, **kwargs
self,
lpath,
rpath,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
**kwargs,
):
bucket, key, _ = self.split_path(rpath)
if os.path.isdir(lpath):
Expand Down Expand Up @@ -1169,24 +1183,15 @@ async def _put_file(
mpu = await self._call_s3(
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
)

out = []
while True:
chunk = f0.read(chunksize)
if not chunk:
break
out.append(
await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=len(out) + 1,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
)
callback.relative_update(len(chunk))

out = await self._upload_part_concurrent(
bucket,
key,
mpu,
f0,
callback=callback,
chunksize=chunksize,
max_concurrency=max_concurrency,
)
parts = [
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
]
Expand All @@ -1201,6 +1206,54 @@ async def _put_file(
self.invalidate_cache(rpath)
rpath = self._parent(rpath)

async def _upload_part_concurrent(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please rename this to indicate it is uploading from a file, as opposed to bytes. Or can it be generalised to support pipe() too?

self,
bucket,
key,
mpu,
f0,
callback=_DEFAULT_CALLBACK,
chunksize=50 * 2**20,
max_concurrency=None,
):
max_concurrency = max_concurrency or self.max_concurrency
if max_concurrency is None or max_concurrency < 1:
max_concurrency = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not max_concurrency=1 as the default in __init__?


async def _upload_chunk(chunk, part_number):
result = await self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part_number,
UploadId=mpu["UploadId"],
Body=chunk,
Key=key,
)
callback.relative_update(len(chunk))
return result

out = []
while True:
chunks = []
for i in range(max_concurrency):
chunk = f0.read(chunksize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere we need a caveat, that increasing concurrency will lead to high memory use.

if chunk:
chunks.append(chunk)
if not chunks:
break
if len(chunks) > 1:
out.extend(
await asyncio.gather(
*[
_upload_chunk(chunk, len(out) + i)
for i, chunk in enumerate(chunks, 1)
]
)
)
else:
out.append(await _upload_chunk(chunk, len(out) + 1))
return out

async def _get_file(
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None
):
Expand Down