Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/paimon/global_index/global_index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ class PAIMON_EXPORT GlobalIndexScan {
const std::string& field_name,
const std::optional<RowRangeIndex>& row_range_index) const = 0;

/// Creates a `GlobalIndexReader` for a specific field with a specific index type.
/// @param field_name Name of the indexed column.
/// @param index_type Name of index type.
/// @param row_range_index Optional row range that limits the scan to a sub-range of row ids.
/// If not provided, the entire row range is considered.
/// @return A `Result` that is:
/// - Successful with a reader(with global row id) if the index exists and loads
/// correctly;
/// - Successful with nullptr if no index was built for the given field with the given
/// index type;
/// - Error returns when loading fails (e.g., file corruption, I/O error,
/// unsupported format).
virtual Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
const std::string& field_name, const std::string& index_type,
const std::optional<RowRangeIndex>& row_range_index) const = 0;

/// Creates several `GlobalIndexReader`s for a specific field (looked up by id),
/// @param field_id Field id of the indexed column.
/// @param row_range_index Optional row range that limits the scan to a sub-range of row ids.
Expand Down
57 changes: 57 additions & 0 deletions src/paimon/common/executor/default_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,61 @@ TEST(DefaultExecutorTest, TestAddTaskAfterShutdownNowIgnored) {
ASSERT_EQ(executed_count.load(), 0);
}

TEST(DefaultExecutorTest, TestAddTaskFromMultipleThreads) {
auto executor = CreateDefaultExecutor(/*thread_count=*/4);

constexpr int32_t kSubmitterCount = 8;
constexpr int32_t kTaskCountPerSubmitter = 64;
constexpr int32_t kTotalTaskCount = kSubmitterCount * kTaskCountPerSubmitter;

std::vector<std::atomic<int32_t>> executed_slots(kTotalTaskCount);
for (auto& executed_slot : executed_slots) {
executed_slot.store(0);
}
std::vector<std::promise<void>> task_promises(kTotalTaskCount);
std::vector<std::future<void>> task_futures;
task_futures.reserve(kTotalTaskCount);
for (auto& task_promise : task_promises) {
task_futures.push_back(task_promise.get_future());
}
std::atomic<int32_t> ready_submitter_count = 0;
std::atomic<int32_t> executed_count = 0;
std::promise<void> start_signal;
std::shared_future<void> start_future = start_signal.get_future().share();
std::vector<std::thread> submitters;
submitters.reserve(kSubmitterCount);

for (int32_t submitter_index = 0; submitter_index < kSubmitterCount; ++submitter_index) {
submitters.emplace_back([&, submitter_index]() {
++ready_submitter_count;
start_future.wait();
for (int32_t task_index = 0; task_index < kTaskCountPerSubmitter; ++task_index) {
const int32_t slot_index = submitter_index * kTaskCountPerSubmitter + task_index;
executor->Add([&, slot_index]() {
++executed_slots[slot_index];
++executed_count;
task_promises[slot_index].set_value();
});
}
});
}

for (int32_t retry = 0; retry < 100 && ready_submitter_count.load() < kSubmitterCount;
++retry) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
const int32_t ready_submitter_count_before_start = ready_submitter_count.load();
start_signal.set_value();
for (auto& submitter : submitters) {
submitter.join();
}
ASSERT_EQ(kSubmitterCount, ready_submitter_count_before_start);
Wait(task_futures);

ASSERT_EQ(kTotalTaskCount, executed_count.load());
for (const auto& executed_slot : executed_slots) {
ASSERT_EQ(1, executed_slot.load());
}
}

} // namespace paimon::test
11 changes: 10 additions & 1 deletion src/paimon/common/global_index/union_global_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ Result<std::shared_ptr<GlobalIndexResult>> UnionGlobalIndexReader::Union(ReaderA
return merged_result;
}

bool UnionGlobalIndexReader::IsThreadSafe() const {
for (const auto& reader : readers_) {
if (!reader->IsThreadSafe()) {
return false;
}
}
return true;
}

template <typename R>
std::vector<R> UnionGlobalIndexReader::ExecuteAllReaders(
const std::function<R(const std::shared_ptr<GlobalIndexReader>&)>& action) {
Expand All @@ -199,7 +208,7 @@ std::vector<R> UnionGlobalIndexReader::ExecuteAllReaders(
futures.reserve(readers_.size());
for (const auto& reader : readers_) {
futures.push_back(
Via(executor_.get(), [&action, &reader]() -> R { return action(reader); }));
Via(executor_.get(), [&action, reader]() -> R { return action(reader); }));
}
return CollectAll(futures);
}
Expand Down
4 changes: 1 addition & 3 deletions src/paimon/common/global_index/union_global_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ class UnionGlobalIndexReader : public GlobalIndexReader {
Result<std::shared_ptr<GlobalIndexResult>> VisitFullTextSearch(
const std::shared_ptr<FullTextSearch>& full_text_search) override;

bool IsThreadSafe() const override {
return false;
}
bool IsThreadSafe() const override;

std::string GetIndexType() const override {
return "union";
Expand Down
27 changes: 22 additions & 5 deletions src/paimon/common/global_index/union_global_index_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class FakeReader : public GlobalIndexReader {
has_scored_result_ = true;
}

void SetThreadSafe(bool thread_safe) {
thread_safe_ = thread_safe;
}

/// Counts how many times any Visit* method was invoked. Useful to assert all readers
/// are exercised by UnionGlobalIndexReader.
int InvocationCount() const {
Expand Down Expand Up @@ -147,7 +151,7 @@ class FakeReader : public GlobalIndexReader {
}

bool IsThreadSafe() const override {
return true;
return thread_safe_;
}

std::string GetIndexType() const override {
Expand Down Expand Up @@ -176,6 +180,7 @@ class FakeReader : public GlobalIndexReader {
std::vector<int64_t> scored_row_ids_;
std::vector<float> scored_scores_;
bool has_scored_result_ = false;
bool thread_safe_ = true;
std::atomic<int32_t> invocation_count_{0};
};

Expand Down Expand Up @@ -515,12 +520,24 @@ TEST_F(UnionGlobalIndexReaderTest, TestVisitVectorSearchErrorPropagation) {
ASSERT_NOK_WITH_MSG(union_reader.VisitVectorSearch(nullptr), "vector search failure");
}

TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeAlwaysFalse) {
auto reader = std::make_shared<FakeReader>();
std::vector<std::shared_ptr<GlobalIndexReader>> readers = {reader};
TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeReturnsTrueWhenAllReadersAreSafe) {
auto reader1 = std::make_shared<FakeReader>();
auto reader2 = std::make_shared<FakeReader>();

std::vector<std::shared_ptr<GlobalIndexReader>> readers = {reader1, reader2};
UnionGlobalIndexReader union_reader(std::move(readers), nullptr);

ASSERT_TRUE(union_reader.IsThreadSafe());
}

TEST_F(UnionGlobalIndexReaderTest, TestIsThreadSafeReturnsFalseWhenAnyReaderIsNotSafe) {
auto reader1 = std::make_shared<FakeReader>();
auto reader2 = std::make_shared<FakeReader>();
reader2->SetThreadSafe(false);

std::vector<std::shared_ptr<GlobalIndexReader>> readers = {reader1, reader2};
UnionGlobalIndexReader union_reader(std::move(readers), nullptr);

// UnionGlobalIndexReader is not thread-safe regardless of inner readers
ASSERT_FALSE(union_reader.IsThreadSafe());
}

Expand Down
33 changes: 27 additions & 6 deletions src/paimon/core/global_index/global_index_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,48 @@ Result<std::shared_ptr<GlobalIndexEvaluator>> GlobalIndexScanImpl::GetOrCreateIn
Result<std::vector<std::shared_ptr<GlobalIndexReader>>> GlobalIndexScanImpl::CreateReaders(
int32_t field_id, const std::optional<RowRangeIndex>& row_range_index) const {
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id));
return CreateReaders(field, row_range_index);
return CreateReaders(field, /*index_type=*/std::nullopt, row_range_index);
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> GlobalIndexScanImpl::CreateReaders(
const std::string& field_name, const std::optional<RowRangeIndex>& row_range_index) const {
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name));
return CreateReaders(field, row_range_index);
return CreateReaders(field, /*index_type=*/std::nullopt, row_range_index);
}

Result<std::shared_ptr<GlobalIndexReader>> GlobalIndexScanImpl::CreateReader(
const std::string& field_name, const std::string& index_type,
const std::optional<RowRangeIndex>& row_range_index) const {
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name));
PAIMON_ASSIGN_OR_RAISE(
std::vector<std::shared_ptr<GlobalIndexReader>> readers,
CreateReaders(field, std::optional<std::string>(index_type), row_range_index));
if (readers.empty()) {
return std::shared_ptr<GlobalIndexReader>();
}
if (readers.size() != 1) {
return Status::Invalid(
fmt::format("invalid global index reader size, expected 1, actual {}", readers.size()));
}
return readers[0];
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> GlobalIndexScanImpl::CreateReaders(
const DataField& field, const std::optional<RowRangeIndex>& row_range_index) const {
const DataField& field, const std::optional<std::string>& index_type,
const std::optional<RowRangeIndex>& row_range_index) const {
auto field_iter = index_metas_.find(field.Id());
if (field_iter == index_metas_.end()) {
return std::vector<std::shared_ptr<GlobalIndexReader>>();
}
const auto& index_type_to_metas = field_iter->second;
std::vector<std::shared_ptr<GlobalIndexReader>> readers;
readers.reserve(index_type_to_metas.size());
for (const auto& [index_type, range_to_metas] : index_type_to_metas) {
readers.reserve(index_type ? 1 : index_type_to_metas.size());
for (const auto& [current_index_type, range_to_metas] : index_type_to_metas) {
if (index_type && current_index_type != index_type.value()) {
continue;
}
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<GlobalIndexer> indexer,
GlobalIndexerFactory::Get(index_type, options_.ToMap()));
GlobalIndexerFactory::Get(current_index_type, options_.ToMap()));
if (!indexer) {
continue;
}
Expand Down
8 changes: 7 additions & 1 deletion src/paimon/core/global_index/global_index_scan_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <map>
#include <memory>
#include <optional>
#include <string>
#include <vector>

Expand Down Expand Up @@ -47,6 +48,10 @@ class GlobalIndexScanImpl : public GlobalIndexScan {
const std::string& field_name,
const std::optional<RowRangeIndex>& row_range_index) const override;

Result<std::shared_ptr<GlobalIndexReader>> CreateReader(
const std::string& field_name, const std::string& index_type,
const std::optional<RowRangeIndex>& row_range_index) const override;

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
int32_t field_id, const std::optional<RowRangeIndex>& row_range_index) const override;

Expand All @@ -65,7 +70,8 @@ class GlobalIndexScanImpl : public GlobalIndexScan {
Result<std::shared_ptr<GlobalIndexEvaluator>> GetOrCreateIndexEvaluator();

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> CreateReaders(
const DataField& field, const std::optional<RowRangeIndex>& row_range_index) const;
const DataField& field, const std::optional<std::string>& index_type,
const std::optional<RowRangeIndex>& row_range_index) const;

std::vector<GlobalIndexIOMeta> ToGlobalIndexIOMetas(
const std::vector<std::shared_ptr<IndexFileMeta>>& metas) const;
Expand Down
14 changes: 10 additions & 4 deletions src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/core_options.h"
#include "paimon/core/global_index/global_index_file_manager.h"
#include "paimon/core/io/data_increment.h"
Expand Down Expand Up @@ -204,16 +205,21 @@ Result<std::shared_ptr<CommitMessage>> GlobalIndexWriteTask::WriteIndex(
std::shared_ptr<GlobalIndexFileManager> index_file_manager,
CreateGlobalIndexFileManager(table_path, table_schema, core_options, pool));

// create batch reader
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<BatchReader> batch_reader,
CreateBatchReader(table_path, field_name, indexed_split, core_options, pool));

// create global index writer
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(field_name));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<GlobalIndexWriter> global_index_writer,
CreateGlobalIndexWriter(index_type, field, index_file_manager, core_options, pool));

// create batch reader
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<BatchReader> batch_reader,
CreateBatchReader(table_path, field_name, indexed_split, core_options, pool));
ScopeGuard guard([&]() {
global_index_writer.reset();
batch_reader.reset();
});

// read from data split and write to index writer
PAIMON_ASSIGN_OR_RAISE(
Expand Down
Loading
Loading