Skip to content

Commit 6394184

Browse files
committed
refactor(filesystem_extractor): use pool of writers when writing to disk
1 parent f06623a commit 6394184

File tree

1 file changed

+134
-64
lines changed

1 file changed

+134
-64
lines changed

src/utility/filesystem_extractor.cpp

Lines changed: 134 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ la_ssize_t write_range_data(sparse_file_mode mode, struct archive* a,
120120
template <typename LoggerPolicy>
121121
class filesystem_extractor_ final : public filesystem_extractor::impl {
122122
public:
123+
using archive_ptr = std::shared_ptr<struct ::archive>;
124+
static constexpr size_t kRegFileDiskThreads{4};
125+
123126
explicit filesystem_extractor_(logger& lgr, os_access const& os,
124127
std::shared_ptr<file_access const> fa)
125128
: LOG_PROXY_INIT(lgr)
@@ -145,16 +148,18 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
145148
#else
146149
LOG_DEBUG << "opening archive file in " << format.description();
147150

148-
a_ = ::archive_write_new();
151+
// return std::shared_ptr<::archive_entry>(e, ::archive_entry_free);
152+
a_.reset(::archive_write_new(), ::archive_write_free);
149153

150154
configure_format(format, &output);
151155

152156
if (output.empty()) {
153-
check_result(::archive_write_open_filename(a_, nullptr));
157+
check_result(a_, ::archive_write_open_filename(a_.get(), nullptr));
154158
} else {
155159
out_ = fa_->open_output_binary(output);
156-
check_result(::archive_write_open2(a_, this, nullptr, on_stream_write,
157-
on_stream_close, on_stream_free));
160+
check_result(a_, ::archive_write_open2(a_.get(), this, nullptr,
161+
on_stream_write, on_stream_close,
162+
on_stream_free));
158163
}
159164
#endif
160165
}
@@ -180,11 +185,11 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
180185

181186
LOG_DEBUG << "opening archive stream in " << format.description();
182187

183-
a_ = ::archive_write_new();
188+
a_.reset(::archive_write_new(), ::archive_write_free);
184189

185190
configure_format(format);
186191

187-
check_result(::archive_write_open_fd(a_, pipefd_[1]));
192+
check_result(a_, ::archive_write_open_fd(a_.get(), pipefd_[1]));
188193
#endif
189194
}
190195

@@ -193,24 +198,43 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
193198
std::filesystem::current_path(output);
194199
}
195200

196-
a_ = ::archive_write_disk_new();
197-
198-
check_result(::archive_write_disk_set_options(
199-
a_,
200-
ARCHIVE_EXTRACT_OWNER | ARCHIVE_EXTRACT_PERM | ARCHIVE_EXTRACT_TIME |
201-
ARCHIVE_EXTRACT_UNLINK | ARCHIVE_EXTRACT_SECURE_NOABSOLUTEPATHS |
202-
ARCHIVE_EXTRACT_SECURE_NODOTDOT | ARCHIVE_EXTRACT_SECURE_SYMLINKS));
201+
a_.reset(::archive_write_disk_new(), ::archive_write_free);
202+
203+
check_result(
204+
a_, ::archive_write_disk_set_options(
205+
a_.get(), ARCHIVE_EXTRACT_NO_AUTODIR | ARCHIVE_EXTRACT_OWNER |
206+
ARCHIVE_EXTRACT_PERM | ARCHIVE_EXTRACT_TIME |
207+
ARCHIVE_EXTRACT_UNLINK |
208+
ARCHIVE_EXTRACT_SECURE_SYMLINKS));
209+
210+
for (size_t i = 0; i < kRegFileDiskThreads; ++i) {
211+
auto ar = archive_ptr{::archive_write_disk_new(), ::archive_write_free};
212+
check_result(
213+
ar, ::archive_write_disk_set_options(
214+
ar.get(), ARCHIVE_EXTRACT_NO_AUTODIR | ARCHIVE_EXTRACT_OWNER |
215+
ARCHIVE_EXTRACT_PERM | ARCHIVE_EXTRACT_TIME |
216+
ARCHIVE_EXTRACT_UNLINK));
217+
a_reg_.push_back(std::move(ar));
218+
}
203219

204220
sparse_mode_ = sparse_file_mode::sparse_disk;
205221
}
206222

207223
void close() override {
224+
if (!a_reg_.empty()) {
225+
for (auto& ar : a_reg_) {
226+
LOG_DEBUG << "closing regular file disk archive";
227+
check_result(ar, ::archive_write_close(ar.get()));
228+
}
229+
LOG_TRACE << "freeing regular file disk archives";
230+
a_reg_.clear();
231+
}
232+
208233
if (a_) {
209234
LOG_DEBUG << "closing archive";
210-
check_result(::archive_write_close(a_));
235+
check_result(a_, ::archive_write_close(a_.get()));
211236
LOG_TRACE << "freeing archive";
212-
::archive_write_free(a_);
213-
a_ = nullptr;
237+
a_.reset();
214238
}
215239

216240
if (iot_) {
@@ -268,17 +292,21 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
268292
auto fn = output->filename().string();
269293

270294
LOG_DEBUG << "setting archive format by extension for " << fn;
271-
check_result(::archive_write_set_format_filter_by_ext(a_, fn.c_str()));
295+
check_result(
296+
a_, ::archive_write_set_format_filter_by_ext(a_.get(), fn.c_str()));
272297
} else {
273-
check_result(::archive_write_set_format_by_name(a_, format.name.c_str()));
298+
check_result(a_, ::archive_write_set_format_by_name(a_.get(),
299+
format.name.c_str()));
274300

275301
for (auto const& filter : format.filters) {
276-
check_result(::archive_write_add_filter_by_name(a_, filter.c_str()));
302+
check_result(
303+
a_, ::archive_write_add_filter_by_name(a_.get(), filter.c_str()));
277304
}
278305
}
279306

280-
check_result(::archive_write_set_options(a_, format.options.c_str()));
281-
check_result(::archive_write_set_bytes_in_last_block(a_, 1));
307+
check_result(a_,
308+
::archive_write_set_options(a_.get(), format.options.c_str()));
309+
check_result(a_, ::archive_write_set_bytes_in_last_block(a_.get(), 1));
282310
#endif
283311
}
284312

@@ -315,26 +343,31 @@ class filesystem_extractor_ final : public filesystem_extractor::impl {
315343
}
316344
}
317345

318-
void check_result(int res) {
346+
void check_result(struct archive* a, int res) {
319347
switch (res) {
320348
case ARCHIVE_OK:
321349
case ARCHIVE_EOF:
322350
default:
323351
break;
324352
case ARCHIVE_WARN:
325-
LOG_WARN << std::string(archive_error_string(a_));
353+
LOG_WARN << std::string(::archive_error_string(a));
326354
break;
327355
case ARCHIVE_RETRY:
328356
case ARCHIVE_FAILED:
329357
case ARCHIVE_FATAL:
330-
throw archive_error(std::string(archive_error_string(a_)));
358+
throw archive_error(std::string(::archive_error_string(a)));
331359
}
332360
}
333361

362+
void check_result(archive_ptr const& a, int res) {
363+
check_result(a.get(), res);
364+
}
365+
334366
LOG_PROXY_DECL(debug_logger_policy);
335367
os_access const& os_;
336368
std::shared_ptr<file_access const> fa_;
337-
struct ::archive* a_{nullptr};
369+
archive_ptr a_;
370+
std::vector<archive_ptr> a_reg_;
338371
std::unique_ptr<output_stream> out_;
339372
std::array<int, 2> pipefd_{-1, -1};
340373
std::unique_ptr<std::thread> iot_;
@@ -353,7 +386,7 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
353386

354387
scope_exit free_resolver{[&] { ::archive_entry_linkresolver_free(lr); }};
355388

356-
if (auto fmt = ::archive_format(a_)) {
389+
if (auto fmt = ::archive_format(a_.get())) {
357390
::archive_entry_linkresolver_set_strategy(lr, fmt);
358391

359392
if (sparse_mode == sparse_file_mode::auto_detect) {
@@ -369,7 +402,23 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
369402
counting_semaphore sem;
370403
sem.post(opts.max_queued_bytes);
371404

372-
worker_group archiver(LOG_GET_LOGGER, os_, "archiver", 1);
405+
using worker_ptr = std::shared_ptr<worker_group>;
406+
407+
worker_ptr archiver = std::make_shared<worker_group>(
408+
LOG_GET_LOGGER, os_, "archiver", 1, [this](size_t) {
409+
return std::make_unique<basic_thread_state<struct archive*>>(a_.get());
410+
});
411+
worker_ptr reg_archiver;
412+
413+
if (a_reg_.empty()) {
414+
reg_archiver = archiver;
415+
} else {
416+
reg_archiver = std::make_shared<worker_group>(
417+
LOG_GET_LOGGER, os_, "arch-reg", a_reg_.size(), [this](size_t idx) {
418+
return std::make_unique<basic_thread_state<struct archive*>>(
419+
a_reg_[idx].get());
420+
});
421+
}
373422

374423
vfs_stat vfs;
375424
fs.statvfs(&vfs);
@@ -379,31 +428,29 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
379428
std::atomic<uint64_t> bytes_written{0};
380429
uint64_t const bytes_total{vfs.blocks};
381430

382-
auto do_archive =
383-
[&](std::shared_ptr<::archive_entry> ae,
384-
reader::inode_view const& entry) { // TODO: inode vs. entry
385-
bool added{false};
431+
auto do_archive = [&](worker_ptr aptr, std::shared_ptr<::archive_entry> ae,
432+
reader::inode_view const& entry) {
433+
bool added{false};
386434

387-
if (auto const size = ::archive_entry_size(ae.get());
388-
entry.is_regular_file() && size > 0) {
389-
reader::detail::file_reader fr(fs, entry);
435+
if (auto const size = ::archive_entry_size(ae.get());
436+
entry.is_regular_file() && size > 0) {
437+
reader::detail::file_reader fr(fs, entry);
390438

391-
auto extents = fr.extents();
392-
std::vector<file_range> data_ranges;
439+
auto extents = fr.extents();
440+
std::vector<file_range> data_ranges;
393441

394-
for (auto const& e : extents) {
395-
if (sparse_mode != sparse_file_mode::sparse_disk ||
396-
e.kind == dwarfs::extent_kind::data) {
397-
data_ranges.push_back(e.range);
398-
}
399-
}
442+
for (auto const& e : extents) {
443+
if (sparse_mode != sparse_file_mode::sparse_disk ||
444+
e.kind == dwarfs::extent_kind::data) {
445+
data_ranges.push_back(e.range);
446+
}
447+
}
400448

401-
archiver.add_job([this, &hard_error, &soft_error, &opts,
402-
extents = std::move(extents),
403-
ranges = fr.read_sequential(data_ranges, sem,
404-
opts.max_queued_bytes),
405-
ae = std::move(ae), size, &sparse_mode,
406-
&bytes_written, bytes_total]() mutable {
449+
aptr->add_job<struct archive*>(
450+
[this, &hard_error, &soft_error, &opts, extents = std::move(extents),
451+
ranges = fr.read_sequential(data_ranges, sem, opts.max_queued_bytes),
452+
ae = std::move(ae), size, &sparse_mode, &bytes_written,
453+
bytes_total](struct archive* a) mutable {
407454
try {
408455
assert(ae);
409456

@@ -436,7 +483,7 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
436483
}
437484
}
438485

439-
check_result(::archive_write_header(a_, ae.get()));
486+
check_result(a, ::archive_write_header(a, ae.get()));
440487

441488
if (sparse_mode == sparse_file_mode::sparse_disk) {
442489
extents.erase(std::remove_if(extents.begin(), extents.end(),
@@ -456,10 +503,10 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
456503
<< ext.range.offset() << " in " << ext.kind
457504
<< " extent for " << path;
458505

459-
auto const rv = write_range_data(sparse_mode, a_, r.data(),
506+
auto const rv = write_range_data(sparse_mode, a, r.data(),
460507
r.size(), ext.range.offset());
461508

462-
check_result(rv);
509+
check_result(a, rv);
463510

464511
if (std::cmp_not_equal(rv, static_cast<la_ssize_t>(r.size()))) {
465512
throw archive_error(
@@ -495,20 +542,21 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
495542
}
496543
});
497544

498-
added = true;
499-
}
545+
added = true;
546+
}
500547

501-
if (!added) {
502-
archiver.add_job([this, ae = std::move(ae), &hard_error] {
548+
if (!added) {
549+
aptr->add_job<struct archive*>(
550+
[this, ae = std::move(ae), &hard_error](struct archive* a) {
503551
try {
504-
check_result(::archive_write_header(a_, ae.get()));
552+
check_result(a, ::archive_write_header(a, ae.get()));
505553
} catch (...) {
506554
LOG_ERROR << exception_str(std::current_exception());
507555
++hard_error;
508556
}
509557
});
510-
}
511-
};
558+
}
559+
};
512560

513561
// Asynchronously prepare walking all entries in data order
514562
auto ordered_entries = std::async(
@@ -536,13 +584,18 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
536584
}
537585

538586
auto do_archive_entry = [&](auto const& entry) {
587+
if (entry.is_root()) {
588+
// skip root entry
589+
return;
590+
}
591+
539592
auto ae = ::archive_entry_new();
540593
auto inode = entry.inode();
541594

542595
if (matcher) {
543596
auto const unix_path = entry.unix_path();
544597
LOG_TRACE << "checking " << unix_path;
545-
if (entry.inode().is_directory()) {
598+
if (inode.is_directory()) {
546599
if (!matched_dirs.contains(unix_path)) {
547600
LOG_TRACE << "skipping directory " << unix_path;
548601
// no need to extract this directory
@@ -607,7 +660,10 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
607660
};
608661

609662
if (ae) {
610-
do_archive(shared_entry_ptr(ae), inode);
663+
do_archive(inode.is_regular_file() && stat.nlink_unchecked() == 1
664+
? reg_archiver
665+
: archiver,
666+
shared_entry_ptr(ae), inode);
611667
}
612668

613669
if (sparse) {
@@ -616,21 +672,35 @@ bool filesystem_extractor_<LoggerPolicy>::extract(
616672
LOG_ERROR << "find() failed";
617673
}
618674
LOG_INFO << "archiving sparse entry " << ::archive_entry_pathname(sparse);
619-
do_archive(shared_entry_ptr(sparse), *ev);
675+
do_archive(archiver, shared_entry_ptr(sparse), *ev);
620676
}
621677
};
622678

679+
for (auto const& entry : fs.directory_entries()) {
680+
if (hard_error) {
681+
break;
682+
}
683+
684+
do_archive_entry(entry);
685+
}
686+
687+
archiver->wait();
688+
623689
for (auto const& entry : ordered_entries.get()) {
624690
if (hard_error) {
625691
break;
626692
}
627693

628-
if (!entry.is_root()) {
629-
do_archive_entry(entry);
694+
if (entry.inode().is_directory()) {
695+
// directories have already been processed above
696+
continue;
630697
}
698+
699+
do_archive_entry(entry);
631700
}
632701

633-
archiver.wait();
702+
archiver->wait();
703+
reg_archiver->wait();
634704

635705
if (hard_error) {
636706
DWARFS_THROW(runtime_error, "extraction aborted");

0 commit comments

Comments
 (0)