Skip to content

Commit 040d1aa

Browse files
committed
feat(libstore/s3-binary-cache-store): implement uploadMultipart()
Implement `uploadMultipart()`, the main method that orchestrates S3 multipart uploads
1 parent bf947bf commit 040d1aa

File tree

1 file changed

+191
-35
lines changed

1 file changed

+191
-35
lines changed

src/libstore/s3-binary-cache-store.cc

Lines changed: 191 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "nix/util/util.hh"
88

99
#include <cassert>
10+
#include <cstring>
1011
#include <ranges>
1112
#include <regex>
1213
#include <span>
@@ -17,6 +18,7 @@ MakeError(UploadToS3, Error);
1718

1819
static constexpr uint64_t AWS_MIN_PART_SIZE = 5 * 1024 * 1024; // 5MiB
1920
static constexpr uint64_t AWS_MAX_PART_SIZE = 5ULL * 1024 * 1024 * 1024; // 5GiB
21+
static constexpr uint64_t AWS_MAX_PART_COUNT = 10000;
2022

2123
class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
2224
{
@@ -51,9 +53,48 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
5153
std::optional<std::string_view> contentEncoding);
5254

5355
/**
54-
* Uploads a file to S3 (CompressedSource overload).
56+
* Uploads a file to S3 using multipart upload.
57+
*
58+
* This method is suitable for large files that exceed the multipart threshold.
59+
* It orchestrates the complete multipart upload process: creating the upload,
60+
* splitting the data into parts, uploading each part, and completing the upload.
61+
* If any error occurs, the multipart upload is automatically aborted.
62+
*
63+
* @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
64+
*/
65+
void uploadMultipart(
66+
std::string_view path,
67+
RestartableSource & source,
68+
uint64_t sizeHint,
69+
std::string_view mimeType,
70+
std::optional<std::string_view> contentEncoding);
71+
72+
/**
73+
* A Sink that manages a complete S3 multipart upload lifecycle.
74+
* Creates the upload on construction, buffers and uploads chunks as data arrives,
75+
* and completes or aborts the upload appropriately.
5576
*/
56-
void upload(std::string_view path, CompressedSource & source, std::string_view mimeType);
77+
struct MultipartSink : Sink
78+
{
79+
S3BinaryCacheStore & store;
80+
std::string_view path;
81+
std::string uploadId;
82+
std::string::size_type chunkSize;
83+
84+
std::vector<std::string> partEtags;
85+
std::string buffer;
86+
87+
MultipartSink(
88+
S3BinaryCacheStore & store,
89+
std::string_view path,
90+
uint64_t sizeHint,
91+
std::string_view mimeType,
92+
std::optional<std::string_view> contentEncoding);
93+
94+
void operator()(std::string_view data) override;
95+
void finish();
96+
void uploadChunk(std::string chunk);
97+
};
5798

5899
/**
59100
* Creates a multipart upload for large objects to S3.
@@ -73,36 +114,45 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
73114
*/
74115
std::string uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data);
75116

76-
struct UploadedPart
77-
{
78-
uint64_t partNumber;
79-
std::string etag;
80-
};
81-
82117
/**
83118
* Completes a multipart upload by combining all uploaded parts.
84119
* @see
85120
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_RequestSyntax
86121
*/
87-
void completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span<const UploadedPart> parts);
122+
void
123+
completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span<const std::string> partEtags);
88124

89125
/**
90126
* Abort a multipart upload
91127
*
92128
* @see
93129
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html#API_AbortMultipartUpload_RequestSyntax
94130
*/
95-
void abortMultipartUpload(std::string_view key, std::string_view uploadId);
131+
void abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept;
96132
};
97133

98134
void S3BinaryCacheStore::upsertFile(
99135
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
100136
{
101-
if (auto compressionMethod = getCompressionMethod(path)) {
102-
CompressedSource compressed(source, *compressionMethod);
103-
upload(path, compressed, mimeType);
104-
} else {
105-
upload(path, source, sizeHint, mimeType, std::nullopt);
137+
auto doUpload = [&](RestartableSource & src, uint64_t size, std::optional<std::string_view> encoding) {
138+
if (s3Config->multipartUpload && size > s3Config->multipartThreshold) {
139+
uploadMultipart(path, src, size, mimeType, encoding);
140+
} else {
141+
upload(path, src, size, mimeType, encoding);
142+
}
143+
};
144+
145+
try {
146+
if (auto compressionMethod = getCompressionMethod(path)) {
147+
CompressedSource compressed(source, *compressionMethod);
148+
doUpload(compressed, compressed.size(), compressed.getCompressionMethod());
149+
} else {
150+
doUpload(source, sizeHint, std::nullopt);
151+
}
152+
} catch (FileTransferError & e) {
153+
UploadToS3 err(e.message());
154+
err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string());
155+
throw err;
106156
}
107157
}
108158

@@ -120,18 +170,112 @@ void S3BinaryCacheStore::upload(
120170
renderSize(sizeHint),
121171
renderSize(AWS_MAX_PART_SIZE));
122172

173+
HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding);
174+
}
175+
176+
void S3BinaryCacheStore::uploadMultipart(
177+
std::string_view path,
178+
RestartableSource & source,
179+
uint64_t sizeHint,
180+
std::string_view mimeType,
181+
std::optional<std::string_view> contentEncoding)
182+
{
183+
debug("using S3 multipart upload for '%s' (%d bytes)", path, sizeHint);
184+
MultipartSink sink(*this, path, sizeHint, mimeType, contentEncoding);
185+
source.drainInto(sink);
186+
sink.finish();
187+
}
188+
189+
S3BinaryCacheStore::MultipartSink::MultipartSink(
190+
S3BinaryCacheStore & store,
191+
std::string_view path,
192+
uint64_t sizeHint,
193+
std::string_view mimeType,
194+
std::optional<std::string_view> contentEncoding)
195+
: store(store)
196+
, path(path)
197+
{
198+
// Calculate chunk size and estimated parts
199+
chunkSize = store.s3Config->multipartChunkSize;
200+
uint64_t estimatedParts = (sizeHint + chunkSize - 1) / chunkSize; // ceil division
201+
202+
if (estimatedParts > AWS_MAX_PART_COUNT) {
203+
// Equivalent to ceil(sizeHint / AWS_MAX_PART_COUNT)
204+
uint64_t minChunkSize = (sizeHint + AWS_MAX_PART_COUNT - 1) / AWS_MAX_PART_COUNT;
205+
206+
if (minChunkSize > AWS_MAX_PART_SIZE) {
207+
throw Error(
208+
"file too large for S3 multipart upload: %s would require chunk size of %s "
209+
"(max %s) to stay within %d part limit",
210+
renderSize(sizeHint),
211+
renderSize(minChunkSize),
212+
renderSize(AWS_MAX_PART_SIZE),
213+
AWS_MAX_PART_COUNT);
214+
}
215+
216+
warn(
217+
"adjusting S3 multipart chunk size from %s to %s "
218+
"to stay within %d part limit for %s file",
219+
renderSize(store.s3Config->multipartChunkSize.get()),
220+
renderSize(minChunkSize),
221+
AWS_MAX_PART_COUNT,
222+
renderSize(sizeHint));
223+
224+
chunkSize = minChunkSize;
225+
estimatedParts = AWS_MAX_PART_COUNT;
226+
}
227+
228+
buffer.reserve(chunkSize);
229+
partEtags.reserve(estimatedParts);
230+
uploadId = store.createMultipartUpload(path, mimeType, contentEncoding);
231+
}
232+
233+
void S3BinaryCacheStore::MultipartSink::operator()(std::string_view data)
234+
{
235+
buffer.append(data);
236+
237+
while (buffer.size() >= chunkSize) {
238+
// Move entire buffer, extract excess, copy back remainder
239+
auto chunk = std::move(buffer);
240+
auto excessSize = chunk.size() > chunkSize ? chunk.size() - chunkSize : 0;
241+
if (excessSize > 0) {
242+
buffer.resize(excessSize);
243+
std::memcpy(buffer.data(), chunk.data() + chunkSize, excessSize);
244+
}
245+
chunk.resize(std::min(chunkSize, chunk.size()));
246+
uploadChunk(std::move(chunk));
247+
}
248+
}
249+
250+
void S3BinaryCacheStore::MultipartSink::finish()
251+
{
252+
if (!buffer.empty()) {
253+
uploadChunk(std::move(buffer));
254+
}
255+
123256
try {
124-
HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding);
125-
} catch (FileTransferError & e) {
126-
UploadToS3 err(e.message());
127-
err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string());
128-
throw err;
257+
if (partEtags.empty()) {
258+
throw Error("no data read from stream");
259+
}
260+
store.completeMultipartUpload(path, uploadId, partEtags);
261+
} catch (Error & e) {
262+
store.abortMultipartUpload(path, uploadId);
263+
e.addTrace({}, "while finishing an S3 multipart upload");
264+
throw;
129265
}
130266
}
131267

132-
void S3BinaryCacheStore::upload(std::string_view path, CompressedSource & source, std::string_view mimeType)
268+
void S3BinaryCacheStore::MultipartSink::uploadChunk(std::string chunk)
133269
{
134-
upload(path, static_cast<RestartableSource &>(source), source.size(), mimeType, source.getCompressionMethod());
270+
auto partNumber = partEtags.size() + 1;
271+
try {
272+
std::string etag = store.uploadPart(path, uploadId, partNumber, std::move(chunk));
273+
partEtags.push_back(std::move(etag));
274+
} catch (Error & e) {
275+
store.abortMultipartUpload(path, uploadId);
276+
e.addTrace({}, "while uploading part %d of an S3 multipart upload", partNumber);
277+
throw;
278+
}
135279
}
136280

137281
std::string S3BinaryCacheStore::createMultipartUpload(
@@ -171,6 +315,10 @@ std::string S3BinaryCacheStore::createMultipartUpload(
171315
std::string
172316
S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data)
173317
{
318+
if (partNumber > AWS_MAX_PART_COUNT) {
319+
throw Error("S3 multipart upload exceeded %d part limit", AWS_MAX_PART_COUNT);
320+
}
321+
174322
auto req = makeRequest(key);
175323
req.method = HttpMethod::PUT;
176324
req.setupForS3();
@@ -189,24 +337,29 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
189337
throw Error("S3 UploadPart response missing ETag for part %d", partNumber);
190338
}
191339

340+
debug("Part %d uploaded, ETag: %s", partNumber, result.etag);
192341
return std::move(result.etag);
193342
}
194343

195-
void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId)
344+
void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept
196345
{
197-
auto req = makeRequest(key);
198-
req.setupForS3();
346+
try {
347+
auto req = makeRequest(key);
348+
req.setupForS3();
199349

200-
auto url = req.uri.parsed();
201-
url.query["uploadId"] = uploadId;
202-
req.uri = VerbatimURL(url);
203-
req.method = HttpMethod::DELETE;
350+
auto url = req.uri.parsed();
351+
url.query["uploadId"] = uploadId;
352+
req.uri = VerbatimURL(url);
353+
req.method = HttpMethod::DELETE;
204354

205-
getFileTransfer()->enqueueFileTransfer(req).get();
355+
getFileTransfer()->enqueueFileTransfer(req).get();
356+
} catch (...) {
357+
ignoreExceptionInDestructor();
358+
}
206359
}
207360

208361
void S3BinaryCacheStore::completeMultipartUpload(
209-
std::string_view key, std::string_view uploadId, std::span<const UploadedPart> parts)
362+
std::string_view key, std::string_view uploadId, std::span<const std::string> partEtags)
210363
{
211364
auto req = makeRequest(key);
212365
req.setupForS3();
@@ -217,21 +370,24 @@ void S3BinaryCacheStore::completeMultipartUpload(
217370
req.method = HttpMethod::POST;
218371

219372
std::string xml = "<CompleteMultipartUpload>";
220-
for (const auto & part : parts) {
373+
for (const auto & [idx, etag] : enumerate(partEtags)) {
221374
xml += "<Part>";
222-
xml += "<PartNumber>" + std::to_string(part.partNumber) + "</PartNumber>";
223-
xml += "<ETag>" + part.etag + "</ETag>";
375+
// S3 part numbers are 1-indexed, but vector indices are 0-indexed
376+
xml += "<PartNumber>" + std::to_string(idx + 1) + "</PartNumber>";
377+
xml += "<ETag>" + etag + "</ETag>";
224378
xml += "</Part>";
225379
}
226380
xml += "</CompleteMultipartUpload>";
227381

228-
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
382+
debug("S3 CompleteMultipartUpload XML (%d parts): %s", partEtags.size(), xml);
229383

230384
StringSource payload{xml};
231385
req.data = {payload};
232386
req.mimeType = "text/xml";
233387

234388
getFileTransfer()->enqueueFileTransfer(req).get();
389+
390+
debug("S3 multipart upload completed: %d parts uploaded for '%s'", partEtags.size(), key);
235391
}
236392

237393
StringSet S3BinaryCacheStoreConfig::uriSchemes()

0 commit comments

Comments
 (0)