Skip to content

Commit e62c027

Browse files
authored
Make proxy filter thread safe for different segments (iresearch-toolkit#574)
1 parent b2fe116 commit e62c027

File tree

4 files changed

+48
-44
lines changed

4 files changed

+48
-44
lines changed

core/search/filter.hpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,7 @@ class filter {
9898

9999
score_t boost() const noexcept { return boost_; }
100100

101-
filter& boost(score_t boost) noexcept {
102-
boost_ = boost;
103-
return *this;
104-
}
101+
void boost(score_t boost) noexcept { boost_ = boost; }
105102

106103
virtual type_info::type_id type() const noexcept = 0;
107104

core/search/proxy_filter.cpp

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@
2525
#include <bit>
2626

2727
#include "cost.hpp"
28-
#include "score.hpp"
2928
#include "utils/bitset.hpp"
3029

30+
#include <absl/synchronization/mutex.h>
31+
3132
namespace irs {
3233

3334
// Bitset expecting doc iterator to be able only to move forward.
@@ -178,65 +179,73 @@ class lazy_filter_bitset_iterator : public doc_iterator,
178179

179180
struct proxy_query_cache {
180181
proxy_query_cache(IResourceManager& memory, filter::ptr&& ptr) noexcept
181-
: readers_{Alloc{memory}}, real_filter_(std::move(ptr)) {}
182+
: real_filter_{std::move(ptr)}, readers_{Alloc{memory}} {}
182183

183184
using Alloc = ManagedTypedAllocator<
184185
std::pair<const SubReader* const, std::unique_ptr<lazy_filter_bitset>>>;
185186

187+
filter::ptr real_filter_;
188+
filter::prepared::ptr real_filter_prepared_;
189+
absl::Mutex readers_lock_;
186190
absl::flat_hash_map<
187191
const SubReader*, std::unique_ptr<lazy_filter_bitset>,
188192
absl::container_internal::hash_default_hash<const SubReader*>,
189193
absl::container_internal::hash_default_eq<const SubReader*>, Alloc>
190194
readers_;
191-
filter::prepared::ptr prepared_real_filter_;
192-
filter::ptr real_filter_;
193195
};
194196

195197
class proxy_query : public filter::prepared {
196198
public:
197-
explicit proxy_query(proxy_filter::cache_ptr cache) : cache_(cache) {
198-
IRS_ASSERT(cache_->prepared_real_filter_);
199+
explicit proxy_query(proxy_filter::cache_ptr cache) : cache_{cache} {
200+
IRS_ASSERT(cache_->real_filter_prepared_);
199201
}
200202

201203
doc_iterator::ptr execute(const ExecutionContext& ctx) const final {
202-
// first try to find segment in cache.
203-
[[maybe_unused]] auto& [_, cached] =
204-
*cache_->readers_.emplace(&ctx.segment, nullptr).first;
205-
206-
if (!cached) {
207-
cached = std::make_unique<lazy_filter_bitset>(
208-
ctx, *cache_->prepared_real_filter_);
204+
auto* cache_bitset = [&]() -> lazy_filter_bitset* {
205+
absl::ReaderMutexLock lock{&cache_->readers_lock_};
206+
auto it = cache_->readers_.find(&ctx.segment);
207+
if (it != cache_->readers_.end()) {
208+
return it->second.get();
209+
}
210+
return nullptr;
211+
}();
212+
if (!cache_bitset) {
213+
auto bitset = std::make_unique<lazy_filter_bitset>(
214+
ctx, *cache_->real_filter_prepared_);
215+
cache_bitset = bitset.get();
216+
absl::WriterMutexLock lock{&cache_->readers_lock_};
217+
IRS_ASSERT(!cache_->readers_.contains(&ctx.segment));
218+
cache_->readers_.emplace(&ctx.segment, std::move(bitset));
209219
}
210-
211-
IRS_ASSERT(cached);
212220
return memory::make_tracked<lazy_filter_bitset_iterator>(ctx.memory,
213-
*cached);
221+
*cache_bitset);
214222
}
215223

216224
void visit(const SubReader&, PreparedStateVisitor&, score_t) const final {
217225
// No terms to visit
218226
}
219227

220228
private:
221-
mutable proxy_filter::cache_ptr cache_;
229+
proxy_filter::cache_ptr cache_;
222230
};
223231

224232
filter::prepared::ptr proxy_filter::prepare(const PrepareContext& ctx) const {
225-
if (!cache_ || !cache_->real_filter_ || !ctx.scorers.empty()) {
226-
// Currently we do not support caching scores.
227-
// Proxy filter should not be used with scorers!
228-
IRS_ASSERT(false);
233+
// Currently we do not support caching scores.
234+
// Proxy filter should not be used with scorers!
235+
IRS_ASSERT(ctx.scorers.empty());
236+
if (!cache_ || !ctx.scorers.empty()) {
229237
return filter::prepared::empty();
230238
}
231-
if (!cache_->prepared_real_filter_) {
232-
cache_->prepared_real_filter_ = cache_->real_filter_->prepare(ctx);
239+
if (!cache_->real_filter_prepared_) {
240+
cache_->real_filter_prepared_ = cache_->real_filter_->prepare(ctx);
241+
cache_->real_filter_.reset();
233242
}
234243
return memory::make_tracked<proxy_query>(ctx.memory, cache_);
235244
}
236245

237246
filter& proxy_filter::cache_filter(IResourceManager& memory,
238-
filter::ptr&& ptr) {
239-
cache_ = std::make_shared<proxy_query_cache>(memory, std::move(ptr));
247+
filter::ptr&& real) {
248+
cache_ = std::make_shared<proxy_query_cache>(memory, std::move(real));
240249
IRS_ASSERT(cache_->real_filter_);
241250
return *cache_->real_filter_;
242251
}

core/search/proxy_filter.hpp

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,17 @@ class proxy_filter final : public filter {
4444

4545
filter::prepared::ptr prepare(const PrepareContext& ctx) const final;
4646

47-
template<typename T, typename... Args>
48-
std::pair<T&, cache_ptr> set_filter(IResourceManager& memory,
49-
Args&&... args) {
50-
static_assert(std::is_base_of_v<filter, T>);
51-
auto& ptr =
52-
cache_filter(memory, std::make_unique<T>(std::forward<Args>(args)...));
53-
return {static_cast<T&>(ptr), cache_};
47+
template<typename Impl, typename Base = Impl, typename... Args>
48+
std::pair<Base&, cache_ptr> set_filter(IResourceManager& memory,
49+
Args&&... args) {
50+
static_assert(std::is_base_of_v<filter, Base>);
51+
static_assert(std::is_base_of_v<Base, Impl>);
52+
auto& real =
53+
cache_filter(memory, std::make_unique<Impl>(std::forward<Args>(args)...));
54+
return {DownCast<Base>(real), cache_};
5455
}
5556

56-
proxy_filter& set_cache(cache_ptr cache) noexcept {
57-
cache_ = std::move(cache);
58-
return *this;
59-
}
57+
void set_cache(cache_ptr cache) noexcept { cache_ = std::move(cache); }
6058

6159
irs::type_info::type_id type() const noexcept final {
6260
return irs::type<proxy_filter>::id();
@@ -67,4 +65,5 @@ class proxy_filter final : public filter {
6765

6866
mutable cache_ptr cache_;
6967
};
68+
7069
} // namespace irs

tests/search/proxy_filter_test.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,21 +125,20 @@ class doclist_test_filter : public filter {
125125

126126
filter::prepared::ptr prepare(const PrepareContext& ctx) const final {
127127
++prepares_;
128-
return memory::make_tracked<doclist_test_query>(ctx.memory, documents_,
128+
return memory::make_tracked<doclist_test_query>(ctx.memory, *documents_,
129129
ctx.boost);
130130
}
131131

132-
// intentional copy here to simplify multiple runs of same expected
133132
void set_expected(const std::vector<doc_id_t>& documents) {
134-
documents_ = documents;
133+
documents_ = &documents;
135134
}
136135

137136
irs::type_info::type_id type() const noexcept final {
138137
return irs::type<doclist_test_filter>::id();
139138
}
140139

141140
private:
142-
std::vector<doc_id_t> documents_;
141+
const std::vector<doc_id_t>* documents_;
143142
static size_t prepares_;
144143
};
145144

0 commit comments

Comments
 (0)