Skip to content

Commit ef23af2

Browse files
committed
In search of a work queue abstraction.
1 parent ff732ed commit ef23af2

File tree

1 file changed

+55
-24
lines changed

1 file changed

+55
-24
lines changed

lld/MachO/Driver.cpp

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -296,37 +296,60 @@ class DeferredFile {
296296
};
297297
using DeferredFiles = std::vector<DeferredFile>;
298298

299+
class BackgroundQueue {
300+
std::deque<std::function<void()>> queue;
301+
std::thread *running;
302+
std::mutex mutex;
303+
304+
public:
305+
void queueWork(std::function<void()> work, bool reap);
306+
};
307+
308+
#include <chrono>
309+
299310
// Most input files have been mapped but not yet paged in.
300311
// This code forces the page-ins on multiple threads so
301312
// the process is not stalled waiting on disk buffer i/o.
302313
void multiThreadedPageInBackground(DeferredFiles &deferred) {
314+
using namespace std::chrono;
303315
static const size_t pageSize = Process::getPageSizeEstimate();
316+
static const size_t largeArchive = 10 * 1024 * 1024;
317+
static std::atomic_uint64_t totalBytes = 0;
318+
std::atomic_int index = 0, included = 0;
319+
auto t0 = high_resolution_clock::now();
304320
#if 0
305321
ThreadPoolStrategy oldStrategy = llvm::parallel::strategy;
306322
(void)llvm::make_scope_exit([&]() { llvm::parallel::strategy = oldStrategy; });
307323
llvm::parallel::strategy = llvm::hardware_concurrency(config->readThreads);
308324

309-
size_t totalBytes = parallelTransformReduce(deferred, 0,
325+
parallelTransformReduce(deferred, 0,
310326
[](size_t acc, size_t size) { return acc + size; },
311327
[&](DeferredFile &file) {
312328
const StringRef &buffer = file.buffer.getBuffer();
329+
size_t size = buffer.size();
330+
totalBytes += size;
331+
if (size > largeArchive)
332+
return size;
333+
334+
included += 1;
313335
for (const char *page = buffer.data(), *end = page + buffer.size();
314336
page < end; page += pageSize)
315337
LLVM_ATTRIBUTE_UNUSED volatile char t = *page;
316338
return buffer.size();
317339
}
318340
);
319341
#else
320-
static size_t totalBytes = 0;
321-
std::atomic_int index = 0;
322-
323342
parallelFor(0, config->readThreads, [&](size_t I) {
324343
while (true) {
325344
int localIndex = index.fetch_add(1);
326345
if (localIndex >= (int)deferred.size())
327346
break;
328347
const StringRef &buff = deferred[localIndex].buffer.getBuffer();
329348
totalBytes += buff.size();
349+
if (buff.size() > largeArchive)
350+
continue;
351+
352+
included += 1;
330353

331354
// Reference all file's mmap'd pages to load them into memory.
332355
for (const char *page = buff.data(), *end = page + buff.size();
@@ -335,45 +358,57 @@ void multiThreadedPageInBackground(DeferredFiles &deferred) {
335358
}
336359
});
337360
#endif
361+
auto dt = high_resolution_clock::now() - t0;
338362
if (getenv("LLD_MULTI_THREAD_PAGE"))
339-
llvm::dbgs() << "multiThreadedPageIn " << totalBytes << "/"
340-
<< deferred.size() << "\n";
363+
llvm::dbgs() << "multiThreadedPageIn " << totalBytes << "/" << included
364+
<< "/" << deferred.size() << "/"
365+
<< duration_cast<milliseconds>(dt).count() / 1000. << "\n";
341366
}
342367

343-
static void multiThreadedPageIn(const DeferredFiles &deferred) {
344-
static std::deque<std::unique_ptr<DeferredFiles>> queue;
345-
static std::thread *running;
346-
static std::mutex mutex;
347-
368+
void BackgroundQueue::queueWork(std::function<void()> work, bool reap) {
348369
mutex.lock();
349-
if (running && (queue.empty() || deferred.empty())) {
370+
if (running && (queue.empty() || reap)) {
350371
mutex.unlock();
351372
running->join();
352373
mutex.lock();
353374
delete running;
354375
running = nullptr;
355376
}
356377

357-
if (!deferred.empty()) {
358-
queue.emplace_back(std::make_unique<DeferredFiles>(deferred));
378+
if (!reap) {
379+
queue.emplace_back(work);
359380
if (!running)
360381
running = new std::thread([&]() {
382+
bool shouldPop = false;
361383
while (true) {
362384
mutex.lock();
385+
if (shouldPop)
386+
queue.pop_front();
363387
if (queue.empty()) {
364388
mutex.unlock();
365389
break;
366390
}
367-
auto deferred = std::move(queue.front());
368-
queue.pop_front();
391+
auto work = std::move(queue.front());
392+
shouldPop = true;
369393
mutex.unlock();
370-
multiThreadedPageInBackground(*deferred);
394+
work();
371395
}
372396
});
373397
}
374398
mutex.unlock();
375399
}
376400

401+
static void multiThreadedPageIn(const DeferredFiles &deferred,
402+
bool reap = false) {
403+
static BackgroundQueue pageInQueue;
404+
pageInQueue.queueWork(
405+
[=]() {
406+
DeferredFiles files = deferred;
407+
multiThreadedPageInBackground(files);
408+
},
409+
reap);
410+
}
411+
377412
static InputFile *processFile(std::optional<MemoryBufferRef> buffer,
378413
DeferredFiles *archiveContents, StringRef path,
379414
LoadType loadType, bool isLazy = false,
@@ -1425,14 +1460,10 @@ static void createFiles(const InputArgList &args) {
14251460
archives.push_back(archive);
14261461
}
14271462

1428-
if (!archiveContents.empty()) {
1463+
if (!archiveContents.empty())
14291464
multiThreadedPageIn(archiveContents);
1430-
for (auto *archive : archives)
1431-
archive->addLazySymbols();
1432-
}
1433-
1434-
DeferredFiles reapThreads;
1435-
multiThreadedPageIn(reapThreads);
1465+
for (auto *archive : archives)
1466+
archive->addLazySymbols();
14361467
}
14371468
}
14381469

0 commit comments

Comments
 (0)