Skip to content

Commit 282b8fb

Browse files
authored
Merge pull request #204 from h-2/query_preload
Query preload
2 parents e3f2327 + 71df54d commit 282b8fb

File tree

6 files changed

+159
-39
lines changed

6 files changed

+159
-39
lines changed

src/search.cpp

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ int searchMain(int const argc, char const ** argv)
7676
parseCommandLine(options, argc, argv);
7777

7878
#ifdef _OPENMP
79-
omp_set_num_threads(options.threads - 1);
79+
omp_set_num_threads(options.threads - options.lazyQryFile); // reserve one thread for I/O when lazy-loading
8080
#else
8181
options.threads = 1;
8282
#endif
@@ -337,7 +337,10 @@ void realMain(LambdaOptions const & options)
337337

338338
loadDbIndexFromDisk(globalHolder, options);
339339

340-
countQuery(globalHolder, options);
340+
if (options.lazyQryFile)
341+
countQuery(globalHolder, options);
342+
else
343+
loadQuery(globalHolder, options);
341344

342345
myWriteHeader(globalHolder, options);
343346

@@ -346,35 +349,55 @@ void realMain(LambdaOptions const & options)
346349
"Searching and extending hits on-line...progress:\n"
347350
"0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100%\n|");
348351

349-
double start = sysTime();
350-
352+
double start = sysTime();
351353
uint64_t lastPercent = 0;
352354

353-
bio::io::seq::record r{.id = std::string{},
354-
.seq = std::vector<typename TGlobalHolder::TOrigQryAlph>{},
355-
.qual = std::ignore};
356-
bio::io::seq::reader reader{options.queryFile, bio::io::seq::reader_options{.record = r}};
357-
358-
auto file_view = reader | views::async_input_buffer(globalHolder.records_per_batch * options.threads);
355+
/* Setup lazy-loader for query (if set in options) */
356+
decltype(createQryView(globalHolder, options)) file_view;
357+
if (options.lazyQryFile)
358+
file_view = createQryView(globalHolder, options);
359359

360360
SEQAN_OMP_PRAGMA(parallel)
361361
{
362362
TLocalHolder localHolder(options, globalHolder);
363363

364+
/* Define region of query this thread gets (eager-loading only) */
365+
size_t const chunk_begin = globalHolder.qrySeqs.size() * omp_get_thread_num() / omp_get_num_threads();
366+
size_t const chunk_end = globalHolder.qrySeqs.size() * (omp_get_thread_num() + 1) / omp_get_num_threads();
367+
/* Track (per-thread) batch count (eager-loading only) */
368+
size_t batch_i = 0;
369+
364370
while (true)
365371
{
366372
if (localHolder.iterativeSearch != IterativeSearchMode::PHASE2)
367373
{
368374
localHolder.reset();
369375

370-
// load records until batch is full or file at end
371-
for (auto & [id, seq, qual] : file_view)
376+
if (options.lazyQryFile) // load records until batch is full or file at end
372377
{
373-
localHolder.qryIds.push_back(std::move(id));
374-
localHolder.qrySeqs.push_back(std::move(seq));
378+
for (auto & [id, seq, qual] : file_view)
379+
{
380+
localHolder.qryIdsTmp.push_back(std::move(id));
381+
localHolder.qrySeqsTmp.push_back(std::move(seq));
382+
383+
if (++localHolder.queryCount == globalHolder.records_per_batch) // no more records in file
384+
break;
385+
}
386+
387+
localHolder.qryIds = localHolder.qryIdsTmp;
388+
localHolder.qrySeqs = localHolder.qrySeqsTmp;
389+
}
390+
else // select subset from stored query sequences
391+
{
392+
size_t const slice_begin =
393+
std::min(chunk_end, chunk_begin + batch_i * globalHolder.records_per_batch);
394+
size_t const slice_end = std::min(chunk_end, slice_begin + globalHolder.records_per_batch);
395+
396+
localHolder.qryIds = globalHolder.qryIds | bio::views::slice(slice_begin, slice_end);
397+
localHolder.qrySeqs = globalHolder.qrySeqs | bio::views::slice(slice_begin, slice_end);
375398

376-
if (++localHolder.queryCount == globalHolder.records_per_batch)
377-
break;
399+
++batch_i;
400+
localHolder.queryCount = localHolder.qrySeqs.size();
378401
}
379402

380403
if (localHolder.queryCount == 0) // no more records in file

src/search_algo.hpp

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
#include <bio/io/seq/reader.hpp>
3939
#include <bio/ranges/views/complement.hpp>
4040
#include <bio/ranges/views/translate_join.hpp>
41+
#if __cpp_lib_ranges <= 202106L
42+
# include <bio/ranges/views/persist.hpp>
43+
#endif
4144

4245
#include <fmindex-collection/DenseCSA.h>
4346
#include <fmindex-collection/locate.h>
@@ -260,6 +263,42 @@ void loadDbIndexFromDisk(
260263
// Function loadQuery()
261264
// --------------------------------------------------------------------------
262265

266+
template <DbIndexType c_indexType,
267+
AlphabetEnum c_origSbjAlph,
268+
AlphabetEnum c_transAlph,
269+
AlphabetEnum c_redAlph,
270+
AlphabetEnum c_origQryAlph>
271+
void loadQuery(GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redAlph, c_origQryAlph> & globalHolder,
272+
LambdaOptions const & options)
273+
{
274+
using TGH = GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redAlph, c_origQryAlph>;
275+
276+
double start = sysTime();
277+
278+
std::string strIdent = "Loading Query Sequences...";
279+
myPrint(options, 1, strIdent);
280+
281+
bio::io::seq::record r{.id = std::string{}, .seq = std::vector<typename TGH::TOrigQryAlph>{}, .qual = std::ignore};
282+
bio::io::seq::reader reader{options.queryFile, bio::io::seq::reader_options{.record = r}};
283+
for (auto & rec : reader)
284+
{
285+
globalHolder.qryIds.push_back(std::move(rec.id));
286+
globalHolder.qrySeqs.push_back(std::move(rec.seq));
287+
}
288+
289+
// parse the file completely and get count in one line:
290+
globalHolder.queryTotal = globalHolder.qrySeqs.size();
291+
292+
// batch-size as set in options (unless too few sequences)
293+
globalHolder.records_per_batch = std::max<size_t>(
294+
std::min<size_t>(globalHolder.queryTotal / (options.threads * 10), options.maximumQueryBlockSize),
295+
1);
296+
double finish = sysTime() - start;
297+
myPrint(options, 1, " done.\n");
298+
299+
myPrint(options, 2, "Runtime: ", finish, "s \n\n");
300+
}
301+
263302
template <DbIndexType c_indexType,
264303
AlphabetEnum c_origSbjAlph,
265304
AlphabetEnum c_transAlph,
@@ -270,7 +309,7 @@ void countQuery(GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redA
270309
{
271310
double start = sysTime();
272311

273-
std::string strIdent = "Counting Query Sequences ...";
312+
std::string strIdent = "Counting Query Sequences...";
274313
myPrint(options, 1, strIdent);
275314

276315
// TODO potentially optimise this for fasta/fastq with simple 'grep -c'
@@ -295,6 +334,26 @@ void countQuery(GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redA
295334
myPrint(options, 2, "Runtime: ", finish, "s \n\n");
296335
}
297336

337+
template <DbIndexType c_indexType,
338+
AlphabetEnum c_origSbjAlph,
339+
AlphabetEnum c_transAlph,
340+
AlphabetEnum c_redAlph,
341+
AlphabetEnum c_origQryAlph>
342+
auto createQryView(GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redAlph, c_origQryAlph> & globalHolder,
343+
LambdaOptions const & options)
344+
{
345+
using TGH = GlobalDataHolder<c_indexType, c_origSbjAlph, c_transAlph, c_redAlph, c_origQryAlph>;
346+
bio::io::seq::record r{.id = std::string{}, .seq = std::vector<typename TGH::TOrigQryAlph>{}, .qual = std::ignore};
347+
bio::io::seq::reader reader{options.queryFile, bio::io::seq::reader_options{.record = r}};
348+
349+
#if __cpp_lib_ranges <= 202106L
350+
return std::move(reader) | bio::views::persist |
351+
views::async_input_buffer(globalHolder.records_per_batch * options.threads);
352+
#else
353+
return std::move(reader) | views::async_input_buffer(globalHolder.records_per_batch * options.threads);
354+
#endif
355+
}
356+
298357
/// THREAD LOCAL STUFF
299358

300359
// --------------------------------------------------------------------------
@@ -1284,8 +1343,8 @@ void iterativeSearchPost(auto & lH)
12841343
assert(subr2.size() == successfulCount);
12851344
}
12861345

1287-
lH.qryIds.resize(lH.qryIds.size() - successfulCount);
1288-
lH.qrySeqs.resize(lH.qrySeqs.size() - successfulCount);
1346+
lH.qryIds = lH.qryIds | std::views::take(lH.qryIds.size() - successfulCount);
1347+
lH.qrySeqs = lH.qrySeqs | std::views::take(lH.qrySeqs.size() - successfulCount);
12891348

12901349
/* only switch to PHASE2 if there are any left */
12911350
if (!lH.qryIds.empty())

src/search_datastructures.hpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ class GlobalDataHolder
395395
TTransSbjSeqs transSbjSeqs = indexFile.seqs | sbjTransView<c_origSbjAlph, c_transAlph, c_redAlph>;
396396
TRedSbjSeqs redSbjSeqs = transSbjSeqs | redView<c_transAlph, c_redAlph>;
397397

398+
/* the following are not used when the query is loaded on-demand / lazily */
399+
TQryIds qryIds;
400+
TQrySeqs qrySeqs;
401+
TTransQrySeqs transQrySeqs = qrySeqs | qryTransView<c_origQryAlph, c_transAlph, c_redAlph>;
402+
TRedQrySeqs redQrySeqs = transQrySeqs | redView<c_transAlph, c_redAlph>;
403+
398404
TBlastTabFile outfileBlastTab;
399405
TBlastRepFile outfileBlastRep;
400406
TBamFile outfileBam;
@@ -406,7 +412,7 @@ class GlobalDataHolder
406412

407413
size_t queryTotal = 0;
408414
std::atomic<size_t> queryCount = 0;
409-
size_t records_per_batch = 100;
415+
size_t records_per_batch = 100; // TODO this currently has no effect
410416

411417
GlobalDataHolder() = default;
412418
GlobalDataHolder(GlobalDataHolder const &) = delete;
@@ -436,10 +442,13 @@ class LocalDataHolder
436442
TGlobalHolder /*const*/ & gH;
437443
static constexpr seqan::BlastProgram blastProgram = TGlobalHolder::blastProgram;
438444

439-
// query data
440-
typename TGlobalHolder::TQryIds qryIds;
441-
typename TGlobalHolder::TQrySeqs qrySeqs;
442-
typename TGlobalHolder::TTransQrySeqs transQrySeqs =
445+
// temporary storage used by lazy-loading
446+
typename TGlobalHolder::TQryIds qryIdsTmp;
447+
typename TGlobalHolder::TQrySeqs qrySeqsTmp;
448+
// refers to temporary storage (lazy-loading) or subset of qry in globalHolder (eager-loading)
449+
decltype(qryIdsTmp | bio::views::slice(0, 0)) qryIds;
450+
decltype(qrySeqsTmp | bio::views::slice(0, 0)) qrySeqs;
451+
typename TGlobalHolder::TTransQrySeqs transQrySeqs =
443452
qrySeqs | qryTransView<TGlobalHolder::c_origQryAlph, TGlobalHolder::c_transAlph, TGlobalHolder::c_redAlph>;
444453
typename TGlobalHolder::TRedQrySeqs redQrySeqs =
445454
transQrySeqs | redView<TGlobalHolder::c_transAlph, TGlobalHolder::c_redAlph>;
@@ -512,8 +521,8 @@ class LocalDataHolder
512521
// clear storage
513522
queryCount = 0;
514523
matches.clear();
515-
qryIds.clear();
516-
qrySeqs.clear();
524+
qryIdsTmp.clear();
525+
qrySeqsTmp.clear();
517526
blastMatches.clear();
518527

519528
// stats explicitly not cleared, because accumulated

src/search_options.hpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ struct LambdaOptions : public SharedOptions
6868
unsigned samBamSeq;
6969
bool samBamHardClip;
7070
bool versionInformationToOutputFile = true;
71-
size_t maximumQueryBlockSize = 10;
71+
size_t maximumQueryBlockSize = 10; // WTF; TODO CHANGE THIS
72+
73+
bool lazyQryFile = false;
7274

7375
bool seedHalfExact = true;
7476
bool adaptiveSeeding = true;
@@ -181,6 +183,12 @@ void parseCommandLine(LambdaOptions & options, int argc, char const ** argv)
181183
options.qryOrigAlphabet = AlphabetEnum::DNA5;
182184
}
183185

186+
parser.add_option(options.lazyQryFile,
187+
sharg::config{.long_id = "lazy-query",
188+
.description = "Load query sequences on-demand (instead of at start); "
189+
"reduces memory usage but \"wastes\" one thread on I/O.",
190+
.advanced = true});
191+
184192
parser.add_option(options.indexFilePath,
185193
sharg::config{.short_id = 'i',
186194
.long_id = "index",
@@ -346,7 +354,7 @@ void parseCommandLine(LambdaOptions & options, int argc, char const ** argv)
346354
.long_id = "threads",
347355
.description = "Number of threads to run concurrently.",
348356
.advanced = true,
349-
.validator = sharg::arithmetic_range_validator{2, 1000}
357+
.validator = sharg::arithmetic_range_validator{1, 1000}
350358
});
351359
#else
352360
parser.add_option(options.threads,
@@ -355,7 +363,7 @@ void parseCommandLine(LambdaOptions & options, int argc, char const ** argv)
355363
.long_id = "threads",
356364
.description = "LAMBDA BUILT WITHOUT OPENMP; setting this option has no effect.",
357365
.advanced = true,
358-
.validator = sharg::arithmetic_range_validator{2, 2}
366+
.validator = sharg::arithmetic_range_validator{1, 1}
359367
});
360368
#endif
361369

@@ -738,6 +746,9 @@ void parseCommandLine(LambdaOptions & options, int argc, char const ** argv)
738746

739747
// set samBamHardClip
740748
options.samBamHardClip = (samBamClip == "hard");
749+
750+
if (options.threads == 1ull && options.lazyQryFile)
751+
throw sharg::parser_error{"Lazy query loading requires at least two total threads."};
741752
}
742753

743754
// --------------------------------------------------------------------------

src/shared_definitions.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,13 +254,13 @@ template <AlphabetEnum c_origQryAlph, AlphabetEnum c_transAlph, AlphabetEnum c_r
254254
constexpr auto qryTransView = []()
255255
{
256256
if constexpr (c_redAlph == AlphabetEnum::DNA3BS)
257-
return bio::views::add_reverse_complement | views::duplicate;
257+
return bio::views::type_reduce | bio::views::add_reverse_complement | views::duplicate;
258258
else if constexpr (bio::alphabet::nucleotide_alphabet<_alphabetEnumToType<c_redAlph>>)
259-
return bio::views::add_reverse_complement;
259+
return bio::views::type_reduce | bio::views::add_reverse_complement;
260260
else if constexpr (c_origQryAlph == c_transAlph)
261261
return bio::views::type_reduce;
262262
else
263-
return bio::views::translate_join;
263+
return bio::views::type_reduce | bio::views::translate_join;
264264
}();
265265

266266
template <AlphabetEnum c_transAlph, AlphabetEnum c_redAlph>
@@ -269,11 +269,11 @@ constexpr auto redView = []()
269269
if constexpr (c_transAlph == c_redAlph)
270270
return bio::views::type_reduce;
271271
else if constexpr (c_transAlph == AlphabetEnum::AMINO_ACID)
272-
return bio::views::deep{bio::views::convert<_alphabetEnumToType<c_redAlph>>};
272+
return bio::views::type_reduce | bio::views::deep{bio::views::convert<_alphabetEnumToType<c_redAlph>>};
273273
else if constexpr (c_redAlph == AlphabetEnum::DNA3BS)
274-
return views::dna_n_to_random | views::reduce_to_bisulfite;
274+
return bio::views::type_reduce | views::dna_n_to_random | views::reduce_to_bisulfite;
275275
else
276-
return views::dna_n_to_random;
276+
return bio::views::type_reduce | views::dna_n_to_random;
277277
}();
278278

279279
// ==========================================================================

test/cli/search_test.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,25 @@ struct search_test : public cli_test
2727
std::string const & profile,
2828
std::string const & output_file,
2929
std::string const & output_type,
30-
std::string const & control_file)
30+
std::string const & control_file,
31+
auto ... more_args)
3132
{
3233
std::string _r;
3334
std::string _red;
34-
3535
if (index_command == "mkindexp")
3636
{
3737
_r = "-r";
3838
_red = reduction;
3939
}
4040

41+
std::string _t = "-t";
42+
std::string _threads = "1";
43+
if (sizeof...(more_args) > 0 && ((more_args == "-t") || ...))
44+
{
45+
_t = "";
46+
_threads = "";
47+
}
48+
4149
cli_test_result result_index = execute_app("lambda3", index_command,
4250
"-d", data(db_file),
4351
"-i", index_file,
@@ -47,10 +55,11 @@ struct search_test : public cli_test
4755
cli_test_result result_search = execute_app("lambda3", search_command,
4856
"-i", index_file,
4957
"-q", data(query_file),
50-
"-t", "2",
58+
_t, _threads,
5159
"--version-to-outputfile", "0",
5260
"-p", profile,
53-
"-o", output_file);
61+
"-o", output_file,
62+
more_args...);
5463

5564
ASSERT_EQ(result_search.exit_code, 0);
5665

@@ -647,3 +656,12 @@ TEST_F(search_test, tblastx_fm_sensitive_sam)
647656
run_search_test("mkindexp", "db_nucl.fasta.gz", "db_trans_fm.fasta.gz.lba", "fm", "li10", "searchp",
648657
"queries_nucl.fasta.gz", "sensitive", "output_tblastx_fm_sensitive_test.sam", "sam", "output_tblastx_fm_sensitive.sam");
649658
}
659+
660+
// special options
661+
662+
TEST_F(search_test, lazy_loading)
663+
{
664+
run_search_test("mkindexp", "db_prot.fasta.gz", "db_prot_fm.fasta.gz.lba", "fm", "li10", "searchp",
665+
"queries_nucl.fasta.gz", "none", "output_blastx_fm_lazy_test.m8", "m8", "output_blastx_fm.m8",
666+
"--lazy-query", "1", "-t", "2");
667+
}

0 commit comments

Comments
 (0)