Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion include/fpgalign/search/search.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <mutex>
#include <string>
#include <vector>

Expand All @@ -19,8 +20,44 @@ struct hit
std::vector<uint64_t> bins;
};

struct wip_alignment
{
size_t bin;
size_t sequence_number;
size_t position;
std::vector<uint8_t> seq;
std::vector<uint8_t> ref;
std::string id;
};

class alignment_vector
{
private:
std::vector<wip_alignment> data;
std::mutex mtx;

public:
std::vector<wip_alignment> & get() noexcept
{
return data;
}

std::vector<wip_alignment> const & get() const noexcept
{
return data;
}

void emplace_back(wip_alignment elem)
{
std::lock_guard guard{mtx};
data.emplace_back(std::move(elem));
}

void emplace_back(wip_alignment & elem) = delete;
};

void search(config const & config);
std::vector<hit> ibf(config const & config, size_t & todo_bin_count);
void fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count);
std::vector<wip_alignment> fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count);

} // namespace search
16 changes: 9 additions & 7 deletions src/argument_parsing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ config parse_arguments(sharg::parser & parser)
.long_id = "output",
.description = "Output path",
.required = true,
.validator = sharg::output_file_validator{}}); // .ibf and .fmindex
.validator = sharg::output_file_validator{
sharg::output_file_open_options::open_or_create}}); // .ibf and .fmindex
parser.add_option(config.threads,
sharg::config{.short_id = '\0',
.long_id = "threads",
Expand Down Expand Up @@ -86,12 +87,13 @@ config parse_arguments(sharg::parser & parser)
.description = "Query path",
.required = true,
.validator = sharg::input_file_validator{}});
parser.add_option(config.output_path,
sharg::config{.short_id = '\0',
.long_id = "output",
.description = "Output path",
.required = true,
.validator = sharg::output_file_validator{}});
parser.add_option(
config.output_path,
sharg::config{.short_id = '\0',
.long_id = "output",
.description = "Output path",
.required = true,
.validator = sharg::output_file_validator{sharg::output_file_open_options::open_or_create}});
parser.add_option(config.threads,
sharg::config{.short_id = '\0',
.long_id = "threads",
Expand Down
4 changes: 2 additions & 2 deletions src/build/fmindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::vector<std::vector<uint8_t>> read_reference(std::vector<std::string> const
| std::views::transform(
[](auto const & in)
{
return seqan3::to_rank(in);
return seqan3::to_rank(in) + 1u;
}),
std::back_inserter(reference.back()));
}
Expand All @@ -48,7 +48,7 @@ void fmindex(config const & config)
for (auto && [id, bin_paths] : seqan::stl::views::enumerate(parse_input(config)))
{
auto reference = read_reference(bin_paths);
fmc::BiFMIndex<4> index{reference, /*samplingRate*/ 16, config.threads};
fmc::BiFMIndex<5> index{reference, /*samplingRate*/ 16, config.threads};

{
std::ofstream os{fmt::format("{}.{}.fmindex", config.output_path.c_str(), id), std::ios::binary};
Expand Down
129 changes: 88 additions & 41 deletions src/search/fmindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: BSD-3-Clause

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <seqan3/io/sequence_file/input.hpp>

Expand All @@ -17,75 +18,121 @@
namespace search
{

fmc::BiFMIndex<4> load_index(config const & config, size_t const id)
template <typename Index>
auto myReconstruct(Index const & index, size_t seqNbr) -> std::vector<uint8_t>
{
fmc::BiFMIndex<4> index{};
//TODO: möglicher weise ist das identisch zu einfach index.C[1]
auto totalNumberOfSeq = index.bwt.rank(index.size(), 0) + index.C[0];
for (size_t i{0}; i < totalNumberOfSeq; ++i)
{
auto idx = std::get<0>(std::get<0>(index.locate(i)));
if (idx == seqNbr)
{
return reconstructText(index, i);
}
}
throw std::runtime_error{"unknown sequence number"};
}

fmc::BiFMIndex<5> load_index(config const & config, size_t const id)
{
fmc::BiFMIndex<5> index{};

{
std::ifstream os{fmt::format("{}.{}.fmindex", config.input_path.c_str(), id), std::ios::binary};
cereal::BinaryInputArchive iarchive{os};
iarchive(index);
}

// {
// fmt::println(" === Reconstruct all ===");
// auto text = fmc::reconstructText(index);
// for (size_t i = 0; i < text.size(); ++i)
// fmt::print("### {} ###\n{}\n", i, fmt::join(text[i], ""));
// }
// {
// fmt::println(" === Reconstruct single ===");
// auto text = fmc::reconstructText(index, 0);
// fmt::print("### 0 ###\n{}\n", fmt::join(text, ""));
// text = fmc::reconstructText(index, 1);
// fmt::print("### 1 ###\n{}\n", fmt::join(text, ""));
// }
// {
// fmt::println(" === Reconstruct alternative ===");
// auto text = myReconstruct(index, 0);
// fmt::print("### 0 ###\n{}\n", fmt::join(text, ""));
// text = myReconstruct(index, 1);
// fmt::print("### 1 ###\n{}\n", fmt::join(text, ""));
// }

return index;
}

void fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count)
std::vector<wip_alignment> fmindex(config const & config, std::vector<hit> hits, size_t const todo_bin_count)
{
// todo bin count
// todo capacity
// each slot = 1 bin
// a cart is full if it has 5 elements (hits)
scq::slotted_cart_queue<size_t> queue{{.slots = todo_bin_count, .carts = todo_bin_count, .capacity = 5}};
size_t thread_id{};

auto get_thread = [&]()
alignment_vector res;
{
return std::jthread(
[&, thread_id = thread_id++]()
{
while (true)
scq::slotted_cart_queue<size_t> queue{{.slots = todo_bin_count, .carts = todo_bin_count, .capacity = 5}};
size_t thread_id{};

auto get_thread = [&]()
{
return std::jthread(
[&, thread_id = thread_id++]()
{
scq::cart_future<size_t> cart = queue.dequeue();
if (!cart.valid())
return;
auto [slot, span] = cart.get();
auto index = load_index(config, slot.value);
for (auto idx : span)
while (true)
{
auto & [id, seq, bins] = hits[idx];

auto callback = [&](auto cursor, size_t)
scq::cart_future<size_t> cart = queue.dequeue();
if (!cart.valid())
return;
auto [slot, span] = cart.get();
auto index = load_index(config, slot.value);
for (auto idx : span)
{
for (auto j : cursor)
auto & [id, seq, bins] = hits[idx];

auto callback = [&](auto cursor, size_t)
{
auto [entry, offset] = index.locate(j);
auto [seqId, pos] = entry;
for (auto j : cursor)
{
fmt::print("[{}][{}] found hit in bin {} in seqNo {} at Pos {}\n",
thread_id,
id,
slot.value,
seqId,
pos + offset);
auto [entry, offset] = index.locate(j);
auto [seqId, pos] = entry;
// fmt::print("[{}][{}] found hit in bin {} in seqNo {} at Pos {}\n",
// thread_id,
// id,
// slot.value,
// seqId,
// pos + offset);
res.emplace_back(wip_alignment{.bin = slot.value,
.sequence_number = seqId,
.position = pos + offset,
.seq = seq,
.ref = myReconstruct(index, seqId),
.id = id}); // todo seq is copied
}
}
};
};

fmc::search<true>(index, seq, config.errors, callback);
fmc::search<true>(index, seq, config.errors, callback);
}
}
}
});
};
});
};

std::vector<std::jthread> worker(config.threads);
std::ranges::generate(worker, get_thread);

std::vector<std::jthread> worker(config.threads);
std::ranges::generate(worker, get_thread);
for (auto && [idx, hit] : seqan::stl::views::enumerate(hits))
for (auto bin : hit.bins)
queue.enqueue(scq::slot_id{bin}, idx);

for (auto && [idx, hit] : seqan::stl::views::enumerate(hits))
for (auto bin : hit.bins)
queue.enqueue(scq::slot_id{bin}, idx);
queue.close();
} // Wait for threads to finish

queue.close();
return res.get();
}

} // namespace search
2 changes: 1 addition & 1 deletion src/search/ibf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::vector<hit> ibf(config const & config, size_t & todo_bin_count)
| std::views::transform(
[](auto const & in)
{
return seqan3::to_rank(in);
return seqan3::to_rank(in) + 1u;
}),
std::back_inserter(hits[i].seq));
std::ranges::copy(result, std::back_inserter(hits[i].bins));
Expand Down
74 changes: 73 additions & 1 deletion src/search/search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,88 @@
// SPDX-FileCopyrightText: 2016-2025 Knut Reinert & MPI für molekulare Genetik
// SPDX-License-Identifier: BSD-3-Clause

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <seqan3/alignment/cigar_conversion/cigar_from_alignment.hpp>
#include <seqan3/alignment/pairwise/align_pairwise.hpp>
#include <seqan3/alphabet/nucleotide/dna4.hpp>
#include <seqan3/io/sam_file/output.hpp>

#include <fpgalign/search/search.hpp>

namespace search
{

auto format_as(wip_alignment wip)
{
return fmt::format("(bin = {}, seqNo = {}, pos = {}, seq = {:d}, ref = {:d})",
wip.bin,
wip.sequence_number,
wip.position,
fmt::join(wip.seq, ""),
fmt::join(wip.ref, ""));
}

void do_alignment(config const & config, std::vector<wip_alignment> const & wips)
{
seqan3::sam_file_output sam_out{config.output_path,
seqan3::fields<seqan3::field::seq,
seqan3::field::id,
seqan3::field::ref_id,
seqan3::field::ref_offset,
seqan3::field::cigar,
// seqan3::field::qual,
seqan3::field::mapq>{}};

seqan3::configuration const align_config =
seqan3::align_cfg::method_global{seqan3::align_cfg::free_end_gaps_sequence1_leading{true},
seqan3::align_cfg::free_end_gaps_sequence2_leading{false},
seqan3::align_cfg::free_end_gaps_sequence1_trailing{true},
seqan3::align_cfg::free_end_gaps_sequence2_trailing{false}}
| seqan3::align_cfg::edit_scheme | seqan3::align_cfg::output_alignment{}
| seqan3::align_cfg::output_begin_position{} | seqan3::align_cfg::output_score{};

for (auto & wip : wips)
{
size_t const start = wip.position - static_cast<size_t>(wip.position != 0u);
size_t const length = wip.seq.size();
auto it = std::ranges::next(wip.ref.begin(), start, wip.ref.end());
auto end = std::ranges::next(it, length + 1u, wip.ref.end());
std::span ref_text{it, end};

for (auto && alignment : seqan3::align_pairwise(std::tie(ref_text, wip.seq), align_config))
{
auto cigar = seqan3::cigar_from_alignment(alignment.alignment());
size_t ref_offset = alignment.sequence1_begin_position() + 2 + start;
size_t map_qual = 60u + alignment.score();

sam_out.emplace_back(std::views::transform(wip.seq,
[](auto const in)
{
return seqan3::dna4{}.assign_rank(
static_cast<uint8_t>(in - 1u));
}),
wip.id,
fmt::format("{}", wip.sequence_number), // todo ref storage
ref_offset,
cigar,
// record.base_qualities(),
map_qual);
}
}
}

void search(config const & config)
{
size_t todo_bin_count{};
std::vector<hit> hits = ibf(config, todo_bin_count);
fmindex(config, std::move(hits), todo_bin_count);
auto const res = fmindex(config, std::move(hits), todo_bin_count);
do_alignment(config, res);
// for (auto const & elem : res)
// {
// fmt::print("{}\n", elem);
// }
}

} // namespace search