Skip to content

Commit 1c6137d

Browse files
authored
feat(eng-1375): Support large file uploads (#143)
* feat(eng-1375): Support large file uploads * update changelog
1 parent 3e792f4 commit 1c6137d

File tree

5 files changed

+189
-26
lines changed

5 files changed

+189
-26
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
## [Unreleased]
1111

12+
### Added
13+
14+
- Added support for large file uploads ([#143](https://github.com/cloudsmith-io/cloudsmith-cli/pull/143))
15+
1216
### Fixed
1317

1418
- Removed more unused dependencies relating to python 2.7 compatibility ([#142](https://github.com/cloudsmith-io/cloudsmith-cli/pull/142))

cloudsmith_cli/cli/commands/push.py

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""CLI/Commands - Push packages."""
2-
2+
import math
33
import os
44
import time
55
from datetime import datetime
@@ -9,6 +9,8 @@
99
from ...core import utils
1010
from ...core.api.exceptions import ApiException
1111
from ...core.api.files import (
12+
CHUNK_SIZE,
13+
multi_part_upload_file,
1214
request_file_upload,
1315
upload_file as api_upload_file,
1416
validate_request_file_upload,
@@ -56,6 +58,10 @@ def upload_file(ctx, opts, owner, repo, filepath, skip_errors, md5_checksum):
5658
filename = click.format_filename(filepath)
5759
basename = os.path.basename(filename)
5860

61+
filesize = utils.get_file_size(filepath=filename)
62+
projected_chunks = math.floor(filesize / CHUNK_SIZE) + 1
63+
is_multi_part_upload = projected_chunks > 1
64+
5965
click.echo(
6066
"Requesting file upload for %(filename)s ... "
6167
% {"filename": click.style(basename, bold=True)},
@@ -68,33 +74,58 @@ def upload_file(ctx, opts, owner, repo, filepath, skip_errors, md5_checksum):
6874
):
6975
with maybe_spinner(opts):
7076
identifier, upload_url, upload_fields = request_file_upload(
71-
owner=owner, repo=repo, filepath=filename, md5_checksum=md5_checksum
77+
owner=owner,
78+
repo=repo,
79+
filepath=filename,
80+
md5_checksum=md5_checksum,
81+
is_multi_part_upload=is_multi_part_upload,
7282
)
7383

7484
click.secho("OK", fg="green")
7585

7686
context_msg = "Failed to upload file!"
7787
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
78-
filesize = utils.get_file_size(filepath=filename)
79-
8088
label = f"Uploading {click.style(basename, bold=True)}:"
8189

82-
with click.progressbar(
83-
length=filesize,
84-
label=label,
85-
fill_char=click.style("#", fg="green"),
86-
empty_char=click.style("-", fg="red"),
87-
) as pb:
88-
89-
def progress_callback(monitor):
90-
pb.update(monitor.bytes_read)
91-
92-
api_upload_file(
93-
upload_url=upload_url,
94-
upload_fields=upload_fields,
95-
filepath=filename,
96-
callback=progress_callback,
97-
)
90+
if not is_multi_part_upload:
91+
# We can upload the whole file in one go.
92+
with click.progressbar(
93+
length=filesize,
94+
label=label,
95+
fill_char=click.style("#", fg="green"),
96+
empty_char=click.style("-", fg="red"),
97+
) as pb:
98+
99+
def progress_callback(monitor):
100+
pb.update(monitor.bytes_read)
101+
102+
api_upload_file(
103+
upload_url=upload_url,
104+
upload_fields=upload_fields,
105+
filepath=filename,
106+
callback=progress_callback,
107+
)
108+
else:
109+
# The file is sufficiently large that we need to upload in chunks.
110+
with click.progressbar(
111+
length=projected_chunks,
112+
label=label,
113+
fill_char=click.style("#", fg="green"),
114+
empty_char=click.style("-", fg="red"),
115+
) as pb:
116+
117+
def progress_callback():
118+
pb.update(1)
119+
120+
multi_part_upload_file(
121+
opts=opts,
122+
upload_url=upload_url,
123+
owner=owner,
124+
repo=repo,
125+
filepath=filename,
126+
callback=progress_callback,
127+
upload_id=identifier,
128+
)
98129

99130
return identifier
100131

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import json
2+
import time
3+
4+
import pytest
5+
6+
from ....core.api.files import CHUNK_SIZE
7+
from ...commands.delete import delete
8+
from ...commands.list_ import list_
9+
from ...commands.push import push
10+
from ...commands.status import status
11+
from ..utils import random_str
12+
13+
14+
@pytest.mark.usefixtures("set_api_key_env_var", "set_api_host_env_var")
15+
@pytest.mark.parametrize(
16+
"filesize",
17+
[
18+
1, # A tiny file that will be uploaded using python bindings as a one-off post.
19+
CHUNK_SIZE * 3, # A large file that will be uploaded to S3 in multiple chunks.
20+
],
21+
)
22+
def test_push_and_delete_raw_package(
23+
runner, organization, tmp_repository, tmp_path, filesize
24+
):
25+
# List packages again - should be empty.
26+
org_repo = f'{organization}/{tmp_repository["slug"]}'
27+
result = runner.invoke(
28+
list_, args=["pkgs", org_repo, "-F", "json"], catch_exceptions=False
29+
)
30+
data = json.loads(result.output)["data"]
31+
assert len(data) == 0
32+
33+
# Create a file of the requested size.
34+
pkg_file = tmp_path / f"{random_str()}.txt"
35+
with open(pkg_file, "wb") as f:
36+
# Fill the file with null bytes.
37+
f.truncate(filesize)
38+
39+
# Push it to cloudsmith as a raw package using the push command.
40+
runner.invoke(
41+
push, args=["raw", org_repo, str(pkg_file.resolve())], catch_exceptions=False
42+
)
43+
44+
# List packages, check that it is there.
45+
result = runner.invoke(
46+
list_, args=["pkgs", org_repo, "-F", "json"], catch_exceptions=False
47+
)
48+
data = json.loads(result.output)["data"]
49+
assert len(data) == 1
50+
small_file_data = data[0]
51+
assert small_file_data["filename"] == pkg_file.name
52+
53+
# Wait for the package to sync.
54+
org_repo_package = f"{org_repo}/{small_file_data['slug']}"
55+
for _ in range(10):
56+
time.sleep(5)
57+
result = runner.invoke(status, args=[org_repo_package], catch_exceptions=False)
58+
if "Fully Synchronised" in result.output:
59+
break
60+
else:
61+
raise TimeoutError("Test timed out waiting for package sync")
62+
63+
# Delete the package.
64+
runner.invoke(delete, args=["-y", org_repo_package], catch_exceptions=False)
65+
66+
# Wait for package deletion to take effect.
67+
for _ in range(10):
68+
time.sleep(5)
69+
result = runner.invoke(status, args=[org_repo_package], catch_exceptions=False)
70+
if "status: 404 - Not Found" in result.output:
71+
break
72+
else:
73+
raise TimeoutError("Test timed out waiting for package deletion")
74+
75+
# List packages again - should be empty.
76+
result = runner.invoke(
77+
list_, args=["pkgs", org_repo, "-F", "json"], catch_exceptions=False
78+
)
79+
data = json.loads(result.output)["data"]
80+
assert len(data) == 0

cloudsmith_cli/cli/tests/conftest.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import click.testing
44
import pytest
55

6-
from cloudsmith_cli.cli.tests.utils import random_str
6+
from ...core.api.init import initialise_api
7+
from ...core.api.repos import create_repo, delete_repo
8+
from .utils import random_str
79

810

911
def _get_env_var_or_skip(key):
@@ -47,10 +49,9 @@ def organization():
4749

4850

4951
@pytest.fixture()
50-
def tmp_repository(organization):
52+
def tmp_repository(organization, api_host, api_key):
5153
"""Yield a temporary repository."""
52-
from ...core.api.repos import create_repo, delete_repo
53-
54+
initialise_api(host=api_host, key=api_key)
5455
repo_data = create_repo(organization, {"name": random_str()})
5556
yield repo_data
5657
delete_repo(organization, repo_data["slug"])

cloudsmith_cli/core/api/files.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from .exceptions import ApiException, catch_raise_api_exception
1414
from .init import get_api_client
1515

16+
CHUNK_SIZE = 1024 * 1024 * 100
17+
1618

1719
def get_files_api():
1820
"""Get the files API client."""
@@ -35,16 +37,24 @@ def validate_request_file_upload(owner, repo, filepath, md5_checksum=None):
3537
return md5_checksum
3638

3739

38-
def request_file_upload(owner, repo, filepath, md5_checksum=None):
40+
def request_file_upload(
41+
owner, repo, filepath, md5_checksum=None, is_multi_part_upload=False
42+
):
3943
"""Request a new package file upload (for creating packages)."""
4044
client = get_files_api()
4145
md5_checksum = md5_checksum or calculate_file_md5(filepath)
4246

47+
method = "put_parts" if is_multi_part_upload else "post"
48+
4349
with catch_raise_api_exception():
4450
data, _, headers = client.files_create_with_http_info(
4551
owner=owner,
4652
repo=repo,
47-
data={"filename": os.path.basename(filepath), "md5_checksum": md5_checksum},
53+
data={
54+
"filename": os.path.basename(filepath),
55+
"md5_checksum": md5_checksum,
56+
"method": method,
57+
},
4858
)
4959

5060
# pylint: disable=no-member
@@ -82,3 +92,40 @@ def upload_file(upload_url, upload_fields, filepath, callback=None):
8292
raise ApiException(
8393
resp.status_code, headers=exc.response.headers, body=exc.response.content
8494
)
95+
96+
97+
def multi_part_upload_file(
98+
opts, upload_url, owner, repo, filepath, callback, upload_id
99+
):
100+
with open(filepath, "rb") as f:
101+
chunk_number = 1
102+
session = create_requests_session()
103+
headers = {"X-Api-Key": opts.api_key}
104+
while chunk := f.read(CHUNK_SIZE):
105+
resp = session.put(
106+
upload_url,
107+
headers=headers,
108+
data=chunk,
109+
params={
110+
"upload_id": upload_id,
111+
"part_number": chunk_number,
112+
},
113+
)
114+
try:
115+
resp.raise_for_status()
116+
except requests.RequestException as exc:
117+
raise ApiException(
118+
resp.status_code,
119+
headers=exc.response.headers,
120+
body=exc.response.content,
121+
)
122+
callback()
123+
chunk_number += 1
124+
125+
api = get_files_api()
126+
api.files_complete(
127+
owner,
128+
repo,
129+
identifier=upload_id,
130+
data={"upload_id": upload_id, "complete": True},
131+
)

0 commit comments

Comments
 (0)