Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class PAIMON_EXPORT ReadContext {
uint32_t GetRowToBatchThreadNumber() const {
return row_to_batch_thread_number_;
}
const std::optional<std::string>& GetSpecificTableSchema() {
const std::optional<std::string>& GetSpecificTableSchema() const {
return table_schema_;
}
std::shared_ptr<MemoryPool> GetMemoryPool() const {
Expand Down
18 changes: 18 additions & 0 deletions include/paimon/scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class PAIMON_EXPORT ScanContext {
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::optional<std::string>& table_schema,
const std::map<std::string, std::string>& options,
const std::shared_ptr<Cache>& cache);

Expand Down Expand Up @@ -88,6 +89,10 @@ class PAIMON_EXPORT ScanContext {
return specific_file_system_;
}

const std::optional<std::string>& GetSpecificTableSchema() const {
return table_schema_;
}
Comment thread
gripleaf marked this conversation as resolved.

std::shared_ptr<Cache> GetCache() const {
return cache_;
}
Expand All @@ -101,6 +106,7 @@ class PAIMON_EXPORT ScanContext {
std::shared_ptr<MemoryPool> memory_pool_;
std::shared_ptr<Executor> executor_;
std::shared_ptr<FileSystem> specific_file_system_;
std::optional<std::string> table_schema_;
std::map<std::string, std::string> options_;
std::shared_ptr<Cache> cache_;
};
Expand Down Expand Up @@ -185,6 +191,18 @@ class PAIMON_EXPORT ScanContextBuilder {
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`)
ScanContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);

/// Set the table schema as a string to avoid schema loading I/O operations.
///
/// This optimization allows the scanner to use a pre-loaded schema instead of
/// reading it from the table metadata, which can improve performance especially
/// in scenarios with many small scan operations.
///
/// @param table_schema String representation of the table schema.
/// @return Reference to this builder for method chaining.
/// @note The user must ensure that the schema string is valid and matches the table.
/// @note If not set, the schema will be loaded from the table path.
ScanContextBuilder& SetTableSchema(const std::string& table_schema);

/// Inject a cache for scan operations. Passing nullptr disables cache.
/// @return Reference to this builder for method chaining.
ScanContextBuilder& WithCache(const std::shared_ptr<Cache>& cache);
Expand Down
8 changes: 7 additions & 1 deletion src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() {
Status PrefetchFileBatchReaderImpl::SetReadSchema(
::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap) {
PAIMON_RETURN_NOT_OK(CleanUp());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> schema,
arrow::ImportSchema(read_schema));
for (const auto& reader : readers_) {
Expand All @@ -162,11 +163,15 @@ Status PrefetchFileBatchReaderImpl::SetReadSchema(
}
selection_bitmap_ = selection_bitmap;
predicate_ = predicate;
return RefreshReadRanges();
return RefreshReadRangesAfterCleanUp();
}

Status PrefetchFileBatchReaderImpl::RefreshReadRanges() {
PAIMON_RETURN_NOT_OK(CleanUp());
return RefreshReadRangesAfterCleanUp();
}

Status PrefetchFileBatchReaderImpl::RefreshReadRangesAfterCleanUp() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why split out the RefreshReadRangesAfterCleanUp() function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split out RefreshReadRangesAfterCleanUp() because SetReadSchema() needs a different ordering now:

CleanUp();
reader->SetReadSchema(...);
RefreshReadRangesAfterCleanUp();

The original RefreshReadRanges() always calls CleanUp() internally. If SetReadSchema() called it directly after stopping the prefetch thread, we would either reset schemas before cleanup, which caused the ORC crash, or call CleanUp() twice.

So the split keeps the behavior clear:

RefreshReadRanges()
-> CleanUp()
-> RefreshReadRangesAfterCleanUp()

SetReadSchema()
-> CleanUp()
-> reset reader schemas
-> RefreshReadRangesAfterCleanUp()

This lets us reuse the range-refresh logic while guaranteeing that schema reset happens only after the background prefetch thread has fully stopped.

bool need_prefetch;
PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch));

Expand Down Expand Up @@ -279,6 +284,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() {
read_ranges_.clear();
read_ranges_in_group_.clear();
current_batch_global_row_ids_.clear();
read_ranges_freshed_ = false;
clean_prefetch_queue();
for (size_t i = 0; i < readers_pos_.size(); i++) {
readers_pos_[i]->store(0);
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> DispatchReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t reader_count);

Status RefreshReadRangesAfterCleanUp();
Result<std::pair<uint64_t, uint64_t>> EofRange() const;
std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t reader_idx) const;
Status EnsureReaderPosition(size_t reader_idx,
Expand Down
11 changes: 10 additions & 1 deletion src/paimon/core/operation/scan_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ScanContext::ScanContext(const std::string& path, bool is_streaming_mode,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::optional<std::string>& table_schema,
const std::map<std::string, std::string>& options,
const std::shared_ptr<Cache>& cache)
: path_(path),
Expand All @@ -43,6 +44,7 @@ ScanContext::ScanContext(const std::string& path, bool is_streaming_mode,
memory_pool_(memory_pool),
executor_(executor),
specific_file_system_(specific_file_system),
table_schema_(table_schema),
options_(options),
cache_(cache) {}

Expand All @@ -62,6 +64,7 @@ class ScanContextBuilder::Impl {
memory_pool_ = GetDefaultPool();
executor_ = CreateDefaultExecutor();
specific_file_system_.reset();
table_schema_ = std::nullopt;
options_.clear();
cache_.reset();
}
Expand All @@ -77,6 +80,7 @@ class ScanContextBuilder::Impl {
std::shared_ptr<MemoryPool> memory_pool_ = GetDefaultPool();
std::shared_ptr<Executor> executor_ = CreateDefaultExecutor();
std::shared_ptr<FileSystem> specific_file_system_;
std::optional<std::string> table_schema_;
std::map<std::string, std::string> options_;
std::shared_ptr<Cache> cache_;
};
Expand Down Expand Up @@ -147,6 +151,11 @@ ScanContextBuilder& ScanContextBuilder::WithFileSystem(
return *this;
}

ScanContextBuilder& ScanContextBuilder::SetTableSchema(const std::string& table_schema) {
impl_->table_schema_ = table_schema;
return *this;
}

ScanContextBuilder& ScanContextBuilder::WithCache(const std::shared_ptr<Cache>& cache) {
impl_->cache_ = cache;
return *this;
Expand All @@ -162,7 +171,7 @@ Result<std::unique_ptr<ScanContext>> ScanContextBuilder::Finish() {
std::make_shared<ScanFilter>(impl_->predicates_, impl_->partition_filters_,
impl_->bucket_filter_),
impl_->global_index_result_, impl_->memory_pool_, impl_->executor_,
impl_->specific_file_system_, impl_->options_, impl_->cache_);
impl_->specific_file_system_, impl_->table_schema_, impl_->options_, impl_->cache_);
impl_->Reset();
return ctx;
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/operation/scan_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TEST(ScanContextTest, TestSetContent) {
builder.WithExecutor(executor);
auto fs = std::make_shared<MockFileSystem>();
builder.WithFileSystem(fs);
builder.SetTableSchema("table-schema-json");
auto manifest_cache = std::make_shared<LruCache>(1024);
builder.WithCache(manifest_cache);
ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
Expand All @@ -79,6 +80,8 @@ TEST(ScanContextTest, TestSetContent) {
ASSERT_EQ("{1,2,4,5}", ctx->GetGlobalIndexResult()->ToString());
ASSERT_EQ(memory_pool, ctx->GetMemoryPool());
ASSERT_EQ(executor, ctx->GetExecutor());
ASSERT_TRUE(ctx->GetSpecificTableSchema().has_value());
ASSERT_EQ("table-schema-json", ctx->GetSpecificTableSchema().value());
std::map<std::string, std::string> expected_options = {{"key", "value"}};
ASSERT_EQ(expected_options, ctx->GetOptions());
ASSERT_EQ(fs, ctx->GetSpecificFileSystem());
Expand Down
19 changes: 13 additions & 6 deletions src/paimon/core/table/source/table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,20 @@ Result<std::unique_ptr<TableScan>> NewDataTableScan(const std::shared_ptr<ScanCo
CoreOptions tmp_options,
CoreOptions::FromMap(context->GetOptions(), context->GetSpecificFileSystem(), {}));
std::string branch = BranchManager::NormalizeBranch(tmp_options.GetBranch());
SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath(), branch);
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_table_schema,
schema_manager.Latest());
if (latest_table_schema == std::nullopt) {
return Status::Invalid("not found latest schema");
std::shared_ptr<TableSchema> table_schema;
const auto& specific_table_schema = context->GetSpecificTableSchema();
if (branch == BranchManager::DEFAULT_MAIN_BRANCH && specific_table_schema) {
PAIMON_ASSIGN_OR_RAISE(table_schema,
TableSchema::CreateFromJson(specific_table_schema.value()));
} else {
SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath(), branch);
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_table_schema,
schema_manager.Latest());
if (latest_table_schema == std::nullopt) {
return Status::Invalid("not found latest schema");
}
table_schema = latest_table_schema.value();
Comment thread
gripleaf marked this conversation as resolved.
}
const auto& table_schema = latest_table_schema.value();
// merge options
auto options = table_schema->Options();
for (const auto& [key, value] : context->GetOptions()) {
Expand Down
71 changes: 71 additions & 0 deletions test/inte/scan_inte_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstdint>
#include <cstring>
#include <filesystem>
#include <limits>
#include <map>
#include <memory>
Expand Down Expand Up @@ -396,6 +397,76 @@ TEST_P(ScanInteTest, TestScanAppendWithSnapshot3) {
CheckResult(expected_data_splits, result_data_splits);
}

TEST_P(ScanInteTest, TestScanAppendWithSpecificTableSchema) {
std::string data_dir = paimon::test::GetDataDir();
if (!std::filesystem::exists(data_dir + "orc/append_09.db/append_09/schema/schema-0")) {
data_dir = "../" + data_dir;
}
std::string table_path = data_dir + "orc/append_09.db/append_09";

auto check_result = [&](const std::optional<std::string>& specific_table_schema) {
ScanContextBuilder context_builder(table_path);
context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3");
if (specific_table_schema) {
context_builder.SetTableSchema(specific_table_schema.value());
}
ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(context_builder));
ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context)));
ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan());

ASSERT_EQ(result_plan->SnapshotId().value(), 3);

auto result_data_splits = CollectDataSplits(result_plan);
DataSplitImpl::Builder builder1(BinaryRowGenerator::GenerateRow({10}, pool_.get()),
/*bucket=*/0, /*bucket_path=*/
data_dir + "orc/append_09.db/append_09/f1=10/bucket-0",
{meta_snapshot1_partition10_bucket0_});
auto expected_data_split1 =
std::dynamic_pointer_cast<DataSplitImpl>(builder1.WithTotalBuckets(2)
.WithSnapshot(3)
.IsStreaming(false)
.RawConvertible(true)
.Build()
.value());

DataSplitImpl::Builder builder2(
BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/
data_dir + "orc/append_09.db/append_09/f1=10/bucket-1",
{meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_,
meta_snapshot3_partition10_bucket1_});
auto expected_data_split2 =
std::dynamic_pointer_cast<DataSplitImpl>(builder2.WithTotalBuckets(2)
.WithSnapshot(3)
.IsStreaming(false)
.RawConvertible(true)
.Build()
.value());

DataSplitImpl::Builder builder3(
BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/
data_dir + "orc/append_09.db/append_09/f1=20/bucket-0",
{meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_});
auto expected_data_split3 =
std::dynamic_pointer_cast<DataSplitImpl>(builder3.WithTotalBuckets(2)
.WithSnapshot(3)
.IsStreaming(false)
.RawConvertible(true)
.Build()
.value());

std::vector<std::shared_ptr<DataSplitImpl>> expected_data_splits = {
expected_data_split1, expected_data_split2, expected_data_split3};
CheckResult(expected_data_splits, result_data_splits);
};

check_result(std::nullopt);

auto fs = std::make_shared<LocalFileSystem>();
std::string schema_str;
ASSERT_OK(fs->ReadFile(table_path + "/schema/schema-0", &schema_str));
check_result(std::optional<std::string>(schema_str));
}

TEST_P(ScanInteTest, TestScanInvalidSnapshot) {
std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09";
ScanContextBuilder context_builder(table_path);
Expand Down
Loading