Skip to content

Commit cf75079

Browse files
xokdviumEricson2314
authored andcommitted
libstore: Make uploads with filetransfer.cc consume a RestartableSource
Make uploads run in constant memory. Also change the callbacks to be noexcept, since we really don't want to be unwinding the stack in the curl thread. That will definitely corrupt that stack and make nix/curl crash in very bad ways.
1 parent b8d7f55 commit cf75079

File tree

9 files changed

+87
-42
lines changed

9 files changed

+87
-42
lines changed

src/libfetchers/git-lfs-fetch.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ std::vector<nlohmann::json> Fetch::fetchUrls(const std::vector<Pointer> & pointe
219219
nlohmann::json oidList = pointerToPayload(pointers);
220220
nlohmann::json data = {{"operation", "download"}};
221221
data["objects"] = oidList;
222-
request.data = data.dump();
222+
auto payload = data.dump();
223+
StringSource source{payload};
224+
request.data = {source};
223225

224226
FileTransferResult result = getFileTransfer()->upload(request);
225227
auto responseString = result.data;

src/libstore/binary-cache-store.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ std::optional<std::string> BinaryCacheStore::getNixCacheInfo()
7979
void BinaryCacheStore::upsertFile(
8080
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
8181
{
82-
StringSource source{data};
83-
upsertFile(path, source, mimeType, sizeHint);
82+
auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique<StringSource>(data); });
83+
upsertFile(path, *source, mimeType, sizeHint);
8484
}
8585

8686
void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
@@ -272,10 +272,19 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
272272

273273
/* Atomically write the NAR file. */
274274
if (repair || !fileExists(narInfo->url)) {
275-
AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY));
276-
FdSource source{fd.get()};
275+
auto source = restartableSourceFromFactory([fnTemp]() {
276+
struct AutoCloseFDSource : AutoCloseFD, FdSource
277+
{
278+
AutoCloseFDSource(AutoCloseFD fd)
279+
: AutoCloseFD(std::move(fd))
280+
, FdSource(get())
281+
{
282+
}
283+
};
284+
return std::make_unique<AutoCloseFDSource>(toDescriptor(open(fnTemp.c_str(), O_RDONLY)));
285+
});
277286
stats.narWrite++;
278-
upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize);
287+
upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize);
279288
} else
280289
stats.narWriteAverted++;
281290

src/libstore/filetransfer.cc

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer
295295
return 0;
296296
}
297297

298-
size_t readOffset = 0;
299-
300-
size_t readCallback(char * buffer, size_t size, size_t nitems)
301-
{
302-
if (readOffset == request.data->length())
303-
return 0;
304-
auto count = std::min(size * nitems, request.data->length() - readOffset);
305-
assert(count);
306-
memcpy(buffer, request.data->data() + readOffset, count);
307-
readOffset += count;
308-
return count;
298+
size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept
299+
try {
300+
auto data = request.data;
301+
return data->source->read(buffer, nitems * size);
302+
} catch (EndOfFile &) {
303+
return 0;
304+
} catch (...) {
305+
return CURL_READFUNC_ABORT;
309306
}
310307

311-
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
308+
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept
312309
{
313310
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
314311
}
@@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer
322319
}
323320
#endif
324321

325-
size_t seekCallback(curl_off_t offset, int origin)
326-
{
322+
size_t seekCallback(curl_off_t offset, int origin) noexcept
323+
try {
324+
auto source = request.data->source;
327325
if (origin == SEEK_SET) {
328-
readOffset = offset;
326+
source->restart();
327+
source->skip(offset);
329328
} else if (origin == SEEK_CUR) {
330-
readOffset += offset;
329+
source->skip(offset);
331330
} else if (origin == SEEK_END) {
332-
readOffset = request.data->length() + offset;
331+
NullSink sink{};
332+
source->drainInto(sink);
333333
}
334334
return CURL_SEEKFUNC_OK;
335+
} catch (...) {
336+
return CURL_SEEKFUNC_FAIL;
335337
}
336338

337-
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
339+
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept
338340
{
339341
return ((TransferItem *) clientp)->seekCallback(offset, origin);
340342
}
@@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer
393395
if (request.data) {
394396
if (request.method == HttpMethod::POST) {
395397
curl_easy_setopt(req, CURLOPT_POST, 1L);
396-
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length());
398+
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint);
397399
} else if (request.method == HttpMethod::PUT) {
398400
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
399-
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
401+
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint);
400402
} else {
401403
unreachable();
402404
}

src/libstore/http-binary-cache-store.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path)
135135
}
136136

137137
void HttpBinaryCacheStore::upsertFile(
138-
const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint)
138+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
139139
{
140140
auto req = makeRequest(path);
141141
req.method = HttpMethod::PUT;
142-
auto data = source.drain();
143142
auto compressionMethod = getCompressionMethod(path);
144143

144+
std::string data;
145+
std::optional<StringSource> stringSource{};
146+
145147
if (compressionMethod) {
146-
data = compress(*compressionMethod, data);
148+
StringSink sink{};
149+
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
150+
source.drainInto(*compressionSink);
151+
compressionSink->finish();
152+
data = std::move(sink.s);
147153
req.headers.emplace_back("Content-Encoding", *compressionMethod);
154+
stringSource = StringSource{data};
155+
req.data = {*stringSource};
156+
} else {
157+
req.data = {sizeHint, source};
148158
}
149159

150-
req.data = std::move(data);
151160
req.mimeType = mimeType;
152161

153162
try {

src/libstore/include/nix/store/binary-cache-store.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ public:
100100

101101
virtual bool fileExists(const std::string & path) = 0;
102102

103-
virtual void
104-
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0;
103+
virtual void upsertFile(
104+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0;
105105

106106
void upsertFile(
107107
const std::string & path,

src/libstore/include/nix/store/filetransfer.hh

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,26 @@ struct FileTransferRequest
115115
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
116116
ActivityId parentAct;
117117
bool decompress = true;
118-
std::optional<std::string> data;
118+
119+
struct UploadData
120+
{
121+
UploadData(StringSource & s)
122+
: sizeHint(s.s.length())
123+
, source(&s)
124+
{
125+
}
126+
127+
UploadData(std::size_t sizeHint, RestartableSource & source)
128+
: sizeHint(sizeHint)
129+
, source(&source)
130+
{
131+
}
132+
133+
std::size_t sizeHint = 0;
134+
RestartableSource * source = nullptr;
135+
};
136+
137+
std::optional<UploadData> data;
119138
std::string mimeType;
120139
std::function<void(std::string_view data)> dataCallback;
121140
/**

src/libstore/include/nix/store/http-binary-cache-store.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ protected:
8080

8181
bool fileExists(const std::string & path) override;
8282

83-
void
84-
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override;
83+
void upsertFile(
84+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
8585

8686
FileTransferRequest makeRequest(std::string_view path);
8787

src/libstore/local-binary-cache-store.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ struct LocalBinaryCacheStore : virtual BinaryCacheStore
5353

5454
bool fileExists(const std::string & path) override;
5555

56-
void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override
56+
void upsertFile(
57+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override
5758
{
5859
auto path2 = config->binaryCacheDir + "/" + path;
5960
static std::atomic<int> counter{0};

src/libstore/s3-binary-cache-store.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
2020
{
2121
}
2222

23-
void
24-
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override;
23+
void upsertFile(
24+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
2525

2626
private:
2727
ref<S3BinaryCacheStoreConfig> s3Config;
@@ -67,7 +67,7 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
6767
};
6868

6969
void S3BinaryCacheStore::upsertFile(
70-
const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint)
70+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
7171
{
7272
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
7373
}
@@ -86,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload(
8686
req.uri = VerbatimURL(url);
8787

8888
req.method = HttpMethod::POST;
89-
req.data = "";
89+
StringSource payload{std::string_view("")};
90+
req.data = {payload};
9091
req.mimeType = mimeType;
9192

9293
if (contentEncoding) {
@@ -116,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
116117
url.query["partNumber"] = std::to_string(partNumber);
117118
url.query["uploadId"] = uploadId;
118119
req.uri = VerbatimURL(url);
119-
req.data = std::move(data);
120+
StringSource payload{data};
121+
req.data = {payload};
120122
req.mimeType = "application/octet-stream";
121123

122124
auto result = getFileTransfer()->enqueueFileTransfer(req).get();
@@ -163,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload(
163165

164166
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
165167

166-
req.data = xml;
168+
StringSource payload{xml};
169+
req.data = {payload};
167170
req.mimeType = "text/xml";
168171

169172
getFileTransfer()->enqueueFileTransfer(req).get();

0 commit comments

Comments
 (0)