diff --git a/include/paimon/read_context.h b/include/paimon/read_context.h index 3eb4b4551..708dd9a66 100644 --- a/include/paimon/read_context.h +++ b/include/paimon/read_context.h @@ -106,7 +106,7 @@ class PAIMON_EXPORT ReadContext { uint32_t GetRowToBatchThreadNumber() const { return row_to_batch_thread_number_; } - const std::optional& GetSpecificTableSchema() { + const std::optional& GetSpecificTableSchema() const { return table_schema_; } std::shared_ptr GetMemoryPool() const { diff --git a/include/paimon/scan_context.h b/include/paimon/scan_context.h index faa9b0170..39a364029 100644 --- a/include/paimon/scan_context.h +++ b/include/paimon/scan_context.h @@ -49,6 +49,7 @@ class PAIMON_EXPORT ScanContext { const std::shared_ptr& memory_pool, const std::shared_ptr& executor, const std::shared_ptr& specific_file_system, + const std::optional& table_schema, const std::map& options, const std::shared_ptr& cache); @@ -88,6 +89,10 @@ class PAIMON_EXPORT ScanContext { return specific_file_system_; } + const std::optional& GetSpecificTableSchema() const { + return table_schema_; + } + std::shared_ptr GetCache() const { return cache_; } @@ -101,6 +106,7 @@ class PAIMON_EXPORT ScanContext { std::shared_ptr memory_pool_; std::shared_ptr executor_; std::shared_ptr specific_file_system_; + std::optional table_schema_; std::map options_; std::shared_ptr cache_; }; @@ -185,6 +191,18 @@ class PAIMON_EXPORT ScanContextBuilder { /// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`) ScanContextBuilder& WithFileSystem(const std::shared_ptr& file_system); + /// Set the table schema as a string to avoid schema loading I/O operations. + /// + /// This optimization allows the scanner to use a pre-loaded schema instead of + /// reading it from the table metadata, which can improve performance especially + /// in scenarios with many small scan operations. + /// + /// @param table_schema String representation of the table schema. + /// @return Reference to this builder for method chaining. + /// @note The user must ensure that the schema string is valid and matches the table. + /// @note If not set, the schema will be loaded from the table path. + ScanContextBuilder& SetTableSchema(const std::string& table_schema); + /// Inject a cache for scan operations. Passing nullptr disables cache. /// @return Reference to this builder for method chaining. ScanContextBuilder& WithCache(const std::shared_ptr& cache); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index c960d4563..ca9b494b7 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -153,6 +153,7 @@ PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() { Status PrefetchFileBatchReaderImpl::SetReadSchema( ::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) { + PAIMON_RETURN_NOT_OK(CleanUp()); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, arrow::ImportSchema(read_schema)); for (const auto& reader : readers_) { @@ -162,11 +163,15 @@ Status PrefetchFileBatchReaderImpl::SetReadSchema( } selection_bitmap_ = selection_bitmap; predicate_ = predicate; - return RefreshReadRanges(); + return RefreshReadRangesAfterCleanUp(); } Status PrefetchFileBatchReaderImpl::RefreshReadRanges() { PAIMON_RETURN_NOT_OK(CleanUp()); + return RefreshReadRangesAfterCleanUp(); +} + +Status PrefetchFileBatchReaderImpl::RefreshReadRangesAfterCleanUp() { bool need_prefetch; PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch)); @@ -279,6 +284,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() { read_ranges_.clear(); read_ranges_in_group_.clear(); current_batch_global_row_ids_.clear(); + read_ranges_freshed_ = false; clean_prefetch_queue(); for (size_t i = 0; i < readers_pos_.size(); i++) { readers_pos_[i]->store(0); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h index 19f936c8f..36a0efdd9 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -129,6 +129,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { static std::vector>> DispatchReadRanges( const std::vector>& read_ranges, size_t reader_count); + Status RefreshReadRangesAfterCleanUp(); Result> EofRange() const; std::optional> GetCurrentReadRange(size_t reader_idx) const; Status EnsureReaderPosition(size_t reader_idx, diff --git a/src/paimon/core/operation/scan_context.cpp b/src/paimon/core/operation/scan_context.cpp index b09183eed..5bea9ba36 100644 --- a/src/paimon/core/operation/scan_context.cpp +++ b/src/paimon/core/operation/scan_context.cpp @@ -33,6 +33,7 @@ ScanContext::ScanContext(const std::string& path, bool is_streaming_mode, const std::shared_ptr& memory_pool, const std::shared_ptr& executor, const std::shared_ptr& specific_file_system, + const std::optional& table_schema, const std::map& options, const std::shared_ptr& cache) : path_(path), @@ -43,6 +44,7 @@ ScanContext::ScanContext(const std::string& path, bool is_streaming_mode, memory_pool_(memory_pool), executor_(executor), specific_file_system_(specific_file_system), + table_schema_(table_schema), options_(options), cache_(cache) {} @@ -62,6 +64,7 @@ class ScanContextBuilder::Impl { memory_pool_ = GetDefaultPool(); executor_ = CreateDefaultExecutor(); specific_file_system_.reset(); + table_schema_ = std::nullopt; options_.clear(); cache_.reset(); } @@ -77,6 +80,7 @@ class ScanContextBuilder::Impl { std::shared_ptr memory_pool_ = GetDefaultPool(); std::shared_ptr executor_ = CreateDefaultExecutor(); std::shared_ptr specific_file_system_; + std::optional table_schema_; std::map options_; std::shared_ptr cache_; }; @@ -147,6 +151,11 @@ ScanContextBuilder& ScanContextBuilder::WithFileSystem( return *this; } +ScanContextBuilder& ScanContextBuilder::SetTableSchema(const std::string& table_schema) { + impl_->table_schema_ = table_schema; + return *this; +} + ScanContextBuilder& ScanContextBuilder::WithCache(const std::shared_ptr& cache) { impl_->cache_ = cache; return *this; @@ -162,7 +171,7 @@ Result> ScanContextBuilder::Finish() { std::make_shared(impl_->predicates_, impl_->partition_filters_, impl_->bucket_filter_), impl_->global_index_result_, impl_->memory_pool_, impl_->executor_, - impl_->specific_file_system_, impl_->options_, impl_->cache_); + impl_->specific_file_system_, impl_->table_schema_, impl_->options_, impl_->cache_); impl_->Reset(); return ctx; } diff --git a/src/paimon/core/operation/scan_context_test.cpp b/src/paimon/core/operation/scan_context_test.cpp index 4a6e544e5..f724fab65 100644 --- a/src/paimon/core/operation/scan_context_test.cpp +++ b/src/paimon/core/operation/scan_context_test.cpp @@ -66,6 +66,7 @@ TEST(ScanContextTest, TestSetContent) { builder.WithExecutor(executor); auto fs = std::make_shared(); builder.WithFileSystem(fs); + builder.SetTableSchema("table-schema-json"); auto manifest_cache = std::make_shared(1024); builder.WithCache(manifest_cache); ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); @@ -79,6 +80,8 @@ TEST(ScanContextTest, TestSetContent) { ASSERT_EQ("{1,2,4,5}", ctx->GetGlobalIndexResult()->ToString()); ASSERT_EQ(memory_pool, ctx->GetMemoryPool()); ASSERT_EQ(executor, ctx->GetExecutor()); + ASSERT_TRUE(ctx->GetSpecificTableSchema().has_value()); + ASSERT_EQ("table-schema-json", ctx->GetSpecificTableSchema().value()); std::map expected_options = {{"key", "value"}}; ASSERT_EQ(expected_options, ctx->GetOptions()); ASSERT_EQ(fs, ctx->GetSpecificFileSystem()); diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 7869cf8b5..8caff10d6 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -194,13 +194,20 @@ Result> NewDataTableScan(const std::shared_ptrGetOptions(), context->GetSpecificFileSystem(), {})); std::string branch = BranchManager::NormalizeBranch(tmp_options.GetBranch()); - SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath(), branch); - PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, - schema_manager.Latest()); - if (latest_table_schema == std::nullopt) { - return Status::Invalid("not found latest schema"); + std::shared_ptr table_schema; + const auto& specific_table_schema = context->GetSpecificTableSchema(); + if (branch == BranchManager::DEFAULT_MAIN_BRANCH && specific_table_schema) { + PAIMON_ASSIGN_OR_RAISE(table_schema, + TableSchema::CreateFromJson(specific_table_schema.value())); + } else { + SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath(), branch); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, + schema_manager.Latest()); + if (latest_table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + table_schema = latest_table_schema.value(); } - const auto& table_schema = latest_table_schema.value(); // merge options auto options = table_schema->Options(); for (const auto& [key, value] : context->GetOptions()) { diff --git a/test/inte/scan_inte_test.cpp b/test/inte/scan_inte_test.cpp index 75c4c0e36..51363774f 100644 --- a/test/inte/scan_inte_test.cpp +++ b/test/inte/scan_inte_test.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -396,6 +397,76 @@ TEST_P(ScanInteTest, TestScanAppendWithSnapshot3) { CheckResult(expected_data_splits, result_data_splits); } +TEST_P(ScanInteTest, TestScanAppendWithSpecificTableSchema) { + std::string data_dir = paimon::test::GetDataDir(); + if (!std::filesystem::exists(data_dir + "orc/append_09.db/append_09/schema/schema-0")) { + data_dir = "../" + data_dir; + } + std::string table_path = data_dir + "orc/append_09.db/append_09"; + + auto check_result = [&](const std::optional& specific_table_schema) { + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3"); + if (specific_table_schema) { + context_builder.SetTableSchema(specific_table_schema.value()); + } + ASSERT_OK_AND_ASSIGN(auto scan_context, FinishScanContext(context_builder)); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + ASSERT_EQ(result_plan->SnapshotId().value(), 3); + + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1(BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + data_dir + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/ + data_dir + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_, + meta_snapshot3_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ + data_dir + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); + }; + + check_result(std::nullopt); + + auto fs = std::make_shared(); + std::string schema_str; + ASSERT_OK(fs->ReadFile(table_path + "/schema/schema-0", &schema_str)); + check_result(std::optional(schema_str)); +} + TEST_P(ScanInteTest, TestScanInvalidSnapshot) { std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; ScanContextBuilder context_builder(table_path);