Skip to content

Commit 0504ddd

Browse files
committed
Use SCQ
1 parent 93cfdca commit 0504ddd

File tree

10 files changed

+629
-176
lines changed

10 files changed

+629
-176
lines changed

include/fpgalign/contrib/slotted_cart_queue.hpp

Lines changed: 515 additions & 0 deletions
Large diffs are not rendered by default.

include/fpgalign/search/search.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ struct hit
2020
};
2121

2222
void search(config const & config);
23-
std::vector<hit> ibf(config const & config);
24-
void fmindex(config const & config, std::vector<hit> hits);
23+
std::vector<hit> ibf(config const & config, size_t & todo_bin_count);
24+
void fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count);
2525

2626
} // namespace search

src/build/fmindex.cpp

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22
// SPDX-FileCopyrightText: 2016-2025 Knut Reinert & MPI für molekulare Genetik
33
// SPDX-License-Identifier: BSD-3-Clause
44

5+
#include <fmt/format.h>
6+
57
#include <seqan3/io/sequence_file/input.hpp>
68

9+
#include <hibf/contrib/std/enumerate_view.hpp>
10+
711
#include <fmindex-collection/fmindex/BiFMIndex.h>
812

913
#include <fpgalign/build/build.hpp>
@@ -16,44 +20,41 @@ struct dna4_traits : seqan3::sequence_file_input_default_traits_dna
1620
using sequence_alphabet = seqan3::dna4;
1721
};
1822

19-
void fmindex(config const & config)
23+
std::vector<std::vector<uint8_t>> read_reference(std::vector<std::string> const & bin_paths)
2024
{
21-
std::vector<std::vector<uint8_t>> const reference = [&config]()
22-
{
23-
using sequence_file_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>>;
25+
std::vector<std::vector<uint8_t>> reference{};
2426

25-
auto const bin_pathss = parse_input(config);
26-
27-
std::vector<std::vector<uint8_t>> result{};
28-
29-
for (auto && bin_paths : bin_pathss)
27+
for (auto const & bin_path : bin_paths)
28+
{
29+
seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::seq>> fin{bin_path};
30+
for (auto && record : fin)
3031
{
31-
for (auto && bin_path : bin_paths)
32-
{
33-
sequence_file_t fin{bin_path};
34-
for (auto && record : fin)
35-
{
36-
result.push_back({});
37-
std::ranges::copy(record.sequence()
38-
| std::views::transform(
39-
[](auto const & in)
40-
{
41-
return seqan3::to_rank(in);
42-
}),
43-
std::back_inserter(result.back()));
44-
}
45-
}
32+
reference.push_back({});
33+
std::ranges::copy(record.sequence()
34+
| std::views::transform(
35+
[](auto const & in)
36+
{
37+
return seqan3::to_rank(in);
38+
}),
39+
std::back_inserter(reference.back()));
4640
}
41+
}
4742

48-
return result;
49-
}();
50-
51-
fmc::BiFMIndex<4> index{reference, /*samplingRate*/ 16, config.threads};
43+
return reference;
44+
}
5245

46+
void fmindex(config const & config)
47+
{
48+
for (auto && [id, bin_paths] : seqan::stl::views::enumerate(parse_input(config)))
5349
{
54-
std::ofstream os{config.output_path.string() + ".fmindex", std::ios::binary};
55-
cereal::BinaryOutputArchive oarchive{os};
56-
oarchive(index);
50+
auto reference = read_reference(bin_paths);
51+
fmc::BiFMIndex<4> index{reference, /*samplingRate*/ 16, config.threads};
52+
53+
{
54+
std::ofstream os{fmt::format("{}.{}.fmindex", config.output_path.c_str(), id), std::ios::binary};
55+
cereal::BinaryOutputArchive oarchive{os};
56+
oarchive(index);
57+
}
5758
}
5859
}
5960

src/search/fmindex.cpp

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,90 @@
22
// SPDX-FileCopyrightText: 2016-2025 Knut Reinert & MPI für molekulare Genetik
33
// SPDX-License-Identifier: BSD-3-Clause
44

5+
#include <fmt/format.h>
6+
57
#include <seqan3/io/sequence_file/input.hpp>
68

9+
#include <hibf/contrib/std/enumerate_view.hpp>
10+
711
#include <fmindex-collection/fmindex/BiFMIndex.h>
812
#include <fmindex-collection/search/search.h>
913

14+
#include <fpgalign/contrib/slotted_cart_queue.hpp>
1015
#include <fpgalign/search/search.hpp>
1116

1217
namespace search
1318
{
1419

15-
void fmindex(config const & config, std::vector<hit> hits)
20+
fmc::BiFMIndex<4> load_index(config const & config, size_t const id)
1621
{
1722
fmc::BiFMIndex<4> index{};
1823

1924
{
20-
std::ifstream os{config.input_path.string() + ".fmindex", std::ios::binary};
25+
std::ifstream os{fmt::format("{}.{}.fmindex", config.input_path.c_str(), id), std::ios::binary};
2126
cereal::BinaryInputArchive iarchive{os};
2227
iarchive(index);
2328
}
2429

25-
#pragma omp parallel for num_threads(config.threads)
26-
for (size_t i = 0; i < hits.size(); ++i)
30+
return index;
31+
}
32+
33+
void fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count)
34+
{
35+
// todo bin count
36+
// todo capacity
37+
// each slot = 1 bin
38+
// a cart is full if it has 5 elements (hits)
39+
scq::slotted_cart_queue<size_t> queue{{.slots = todo_bin_count, .carts = todo_bin_count, .capacity = 5}};
40+
size_t thread_id{};
41+
42+
auto get_thread = [&]()
2743
{
28-
auto & [id, seq, bins] = hits[i];
29-
auto callback = [&](auto cursor, size_t)
30-
{
31-
for (auto j : cursor)
44+
return std::jthread(
45+
[&, thread_id = thread_id++]()
3246
{
33-
auto [entry, offset] = index.locate(j);
34-
auto [seqId, pos] = entry;
35-
#pragma omp critical
47+
while (true)
3648
{
37-
std::cout << '[' << id << "] found hit in seqNo " << seqId << " Pos " << pos + offset << '\n';
49+
scq::cart_future<size_t> cart = queue.dequeue();
50+
if (!cart.valid())
51+
return;
52+
auto [slot, span] = cart.get();
53+
auto index = load_index(config, slot.value);
54+
for (auto idx : span)
55+
{
56+
auto & [id, seq, bins] = hits[idx];
57+
58+
auto callback = [&](auto cursor, size_t)
59+
{
60+
for (auto j : cursor)
61+
{
62+
auto [entry, offset] = index.locate(j);
63+
auto [seqId, pos] = entry;
64+
{
65+
fmt::print("[{}][{}] found hit in bin {} in seqNo {} at Pos {}\n",
66+
thread_id,
67+
id,
68+
slot.value,
69+
seqId,
70+
pos + offset);
71+
}
72+
}
73+
};
74+
75+
fmc::search<true>(index, seq, config.errors, callback);
76+
}
3877
}
39-
}
40-
};
41-
fmc::search<true>(index, seq, config.errors, callback);
42-
(void)bins;
43-
}
78+
});
79+
};
80+
81+
std::vector<std::jthread> worker(config.threads);
82+
std::ranges::generate(worker, get_thread);
83+
84+
for (auto && [idx, hit] : seqan::stl::views::enumerate(hits))
85+
for (auto bin : hit.bins)
86+
queue.enqueue(scq::slot_id{bin}, idx);
87+
88+
queue.close();
4489
}
4590

4691
} // namespace search

src/search/ibf.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ threshold::threshold get_thresholder(config const & config)
4444
using seqfile_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::id, seqan3::field::seq>>;
4545
using record_t = typename seqfile_t::record_type;
4646

47-
std::vector<hit> ibf(config const & config)
47+
std::vector<hit> ibf(config const & config, size_t & todo_bin_count)
4848
{
4949
seqan::hibf::interleaved_bloom_filter ibf{};
5050

@@ -53,6 +53,7 @@ std::vector<hit> ibf(config const & config)
5353
cereal::BinaryInputArchive iarchive{os};
5454
iarchive(ibf);
5555
}
56+
todo_bin_count = ibf.bin_count();
5657

5758
std::vector<record_t> records = [&]()
5859
{

src/search/search.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ namespace search
99

1010
void search(config const & config)
1111
{
12-
std::vector<hit> hits = ibf(config);
13-
fmindex(config, std::move(hits));
12+
size_t todo_bin_count{};
13+
std::vector<hit> hits = ibf(config, todo_bin_count);
14+
fmindex(config, std::move(hits), todo_bin_count);
1415
}
1516

1617
} // namespace search

test/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ include (test/config)
1111
# This includes `test/data/datasources.cmake`, which makes test data available to the tests.
1212
include (data/datasources.cmake)
1313

14-
add_app_test (api_fastq_coversion_test.cpp)
15-
add_app_test (cli_fastq_coversion_test.cpp)
14+
add_app_test (fpgalign_test.cpp)
1615

1716
message (STATUS "You can run `make check` to build and run tests.")

test/api_fastq_coversion_test.cpp

Lines changed: 0 additions & 39 deletions
This file was deleted.

test/cli_fastq_coversion_test.cpp

Lines changed: 0 additions & 82 deletions
This file was deleted.

test/fpgalign_test.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// SPDX-FileCopyrightText: 2006-2025 Knut Reinert & Freie Universität Berlin
2+
// SPDX-FileCopyrightText: 2016-2025 Knut Reinert & MPI für molekulare Genetik
3+
// SPDX-License-Identifier: BSD-3-Clause
4+
5+
#include "app_test.hpp"
6+
7+
// To prevent issues when running multiple CLI tests in parallel, give each CLI test unique names:
8+
struct fpgalign : public app_test
9+
{};
10+
11+
TEST_F(fpgalign, no_options)
12+
{}

0 commit comments

Comments
 (0)