Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,6 @@ struct PAIMON_EXPORT Options {
/// serialized BlobViewStruct bytes inline in data files and resolve from upstream tables at
/// read time. No default value.
static const char BLOB_VIEW_FIELD[];
/// "blob-external-storage-field" - Comma-separated BLOB field names (must be a subset of
/// blob-descriptor-field ) whose raw data will be written to external storage at write time.
/// The external storage path is configured via blob-external-storage-path. Orphan file cleanup
/// is not applied to that path. No default value.
static const char BLOB_EXTERNAL_STORAGE_FIELD[];
/// "blob-external-storage-path" - The external storage path where raw BLOB data from fields
/// configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is
/// not applied to this path. No default value.
/// @note: this option differs from the Java paimon and will be deprecated once
/// RestCatalog is supported.
static const char BLOB_EXTERNAL_STORAGE_PATH[];
/// "blob-view-upstream-warehouse" - Since the catalog capabilities are partially missing, when
/// Blob View is enabled, cpp paimon cannot automatically obtain the upstream table warehouse
/// path and requires manual configuration by the user. No default value.
Expand Down
2 changes: 0 additions & 2 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ set(PAIMON_CORE_SRCS
core/io/map_shared_shredding_core_utils.cpp
core/io/shredding_append_data_file_writer_factory.cpp
core/io/shredding_key_value_data_file_writer_factory.cpp
core/io/external_storage_blob_writer.cpp
core/io/multiple_blob_file_writer.cpp
core/io/rolling_blob_file_writer.cpp
core/manifest/file_kind.cpp
Expand Down Expand Up @@ -634,7 +633,6 @@ if(PAIMON_BUILD_TESTS)
core/io/file_index_evaluator_test.cpp
core/io/single_file_writer_test.cpp
core/io/rolling_blob_file_writer_test.cpp
core/io/external_storage_blob_writer_test.cpp
core/global_index/indexed_split_test.cpp
core/manifest/file_source_test.cpp
core/manifest/file_kind_test.cpp
Expand Down
2 changes: 0 additions & 2 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ const char Options::BLOB_DESCRIPTOR_FIELD[] = "blob-descriptor-field";
const char Options::FALLBACK_BLOB_DESCRIPTOR_FIELD[] = "blob.stored-descriptor-fields";
const char Options::BLOB_VIEW_FIELD[] = "blob-view-field";
const char Options::BLOB_VIEW_UPSTREAM_WAREHOUSE[] = "blob-view-upstream-warehouse";
const char Options::BLOB_EXTERNAL_STORAGE_FIELD[] = "blob-external-storage-field";
const char Options::BLOB_EXTERNAL_STORAGE_PATH[] = "blob-external-storage-path";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
Expand Down
60 changes: 7 additions & 53 deletions src/paimon/core/append/append_only_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "paimon/core/io/compact_increment.h"
#include "paimon/core/io/data_file_path_factory.h"
#include "paimon/core/io/data_increment.h"
#include "paimon/core/io/external_storage_blob_writer.h"
#include "paimon/core/io/multiple_blob_file_writer.h"
#include "paimon/core/io/rolling_blob_file_writer.h"
#include "paimon/core/io/rolling_file_writer.h"
Expand Down Expand Up @@ -85,22 +84,6 @@ Status AppendOnlyWriter::Write(std::unique_ptr<RecordBatch>&& batch) {
PAIMON_ASSIGN_OR_RAISE(writer_, CreateRollingRowWriter());
}

// Transform batch for external storage descriptor fields before writing.
if (external_storage_writer_) {
auto data_type = arrow::struct_(write_schema_->fields());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array,
arrow::ImportArray(batch->GetData(), data_type));
auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> transformed,
external_storage_writer_->TransformBatch(struct_array));
auto transformed_struct = std::dynamic_pointer_cast<arrow::StructArray>(transformed);
PAIMON_RETURN_NOT_OK(BlobUtils::ValidateBlobInlineFields(
transformed_struct, inline_descriptor_fields_, "blob-descriptor-field"));
::ArrowArray c_transformed;
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*transformed, &c_transformed));
return writer_->Write(&c_transformed);
}

if (!inline_descriptor_fields_.empty() || !inline_view_fields_.empty()) {
auto data_type = arrow::struct_(write_schema_->fields());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array,
Expand Down Expand Up @@ -188,47 +171,24 @@ Status AppendOnlyWriter::Flush(bool wait_for_latest_compaction, bool forced_full

AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingRowWriter() {
auto blob_context = BlobFileContext::Create(write_schema_, options_);
std::optional<std::vector<std::string>> main_write_cols = write_cols_;

// Save inline descriptor and view fields for validation in Write()
if (blob_context) {
inline_descriptor_fields_ = blob_context->GetDescriptorFields();
inline_view_fields_ = blob_context->GetViewFields();
}

// Initialize ExternalStorageBlobWriter if needed
if (blob_context && blob_context->RequireExternalStorageWriter()) {
assert(blob_context->GetExternalStoragePath());
external_storage_writer_ = std::make_unique<ExternalStorageBlobWriter>(
write_schema_, blob_context->GetExternalStorageFields(),
blob_context->GetExternalStoragePath().value(), schema_id_, seq_num_counter_,
path_factory_, options_, memory_pool_);
if (!main_write_cols) {
// To align with java, when require external storage writer, main writer will set write
// cols in DataFileMeta
main_write_cols = write_schema_->field_names();
}
}

if (blob_context && blob_context->RequireBlobFileWriter()) {
// Use context-aware schema separation: inline BLOB fields stay in main
auto schemas =
BlobUtils::SeparateBlobSchema(write_schema_, blob_context->GetInlineFields());
return CreateRollingBlobWriter(schemas, blob_context->GetInlineFields());
}

if (!blob_context) {
// No BLOB fields at all -> plain rolling writer
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
options_.GetTargetFileSize(/*has_primary_key=*/false),
GetDataFileWriterFactory(write_schema_, main_write_cols));
} else {
// All BLOB fields are inline, no .blob files needed -> plain rolling writer
// The main data file contains all fields including inline descriptors/views.
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
options_.GetTargetFileSize(/*has_primary_key=*/false),
GetDataFileWriterFactory(write_schema_, main_write_cols));
}
// No BLOB fields, or all BLOB fields are inline and no .blob files are needed.
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
options_.GetTargetFileSize(/*has_primary_key=*/false),
GetDataFileWriterFactory(write_schema_, write_cols_));
}

AppendOnlyWriter::WriterFactory AppendOnlyWriter::GetDataFileWriterFactory(
Expand All @@ -248,10 +208,9 @@ AppendOnlyWriter::WriterFactory AppendOnlyWriter::GetBlobFileWriterFactory(
const std::shared_ptr<arrow::Schema>& single_field_schema,
const std::optional<std::vector<std::string>>& write_cols) const {
std::shared_ptr<DataFilePathFactory> path_factory = path_factory_;
return std::make_shared<BlobDataFileWriterFactory>(
options_, schema_id_, single_field_schema, write_cols, seq_num_counter_, path_factory,
[path_factory]() { return path_factory->NewBlobPath(); },
blob::BlobFormatWriter::WriteConsumer(), memory_pool_);
return std::make_shared<BlobDataFileWriterFactory>(options_, schema_id_, single_field_schema,
write_cols, seq_num_counter_, path_factory,
memory_pool_);
}

AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWriter(
Expand Down Expand Up @@ -306,11 +265,6 @@ Status AppendOnlyWriter::Close() {
writer_.reset();
}

if (external_storage_writer_) {
PAIMON_RETURN_NOT_OK(external_storage_writer_->Close());
external_storage_writer_.reset();
}

if (compact_deletion_file_ != nullptr) {
compact_deletion_file_->Clean();
}
Expand Down
2 changes: 0 additions & 2 deletions src/paimon/core/append/append_only_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class Schema;
namespace paimon {

class CommitIncrement;
class ExternalStorageBlobWriter;
class MapSharedShreddingContext;
class RecordBatch;
template <typename T, typename R>
Expand Down Expand Up @@ -133,7 +132,6 @@ class AppendOnlyWriter : public BatchWriter {

std::shared_ptr<CompactDeletionFile> compact_deletion_file_;
std::unique_ptr<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>> writer_;
std::unique_ptr<ExternalStorageBlobWriter> external_storage_writer_;
std::set<std::string> inline_descriptor_fields_;
std::set<std::string> inline_view_fields_;

Expand Down
17 changes: 0 additions & 17 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ struct CoreOptions::Impl {
std::vector<std::string> blob_fields;
std::vector<std::string> blob_descriptor_fields;
std::vector<std::string> blob_view_fields;
std::vector<std::string> blob_external_storage_fields;

std::string partition_default_name = "__DEFAULT_PARTITION__";
StartupMode startup_mode = StartupMode::Default();
Expand All @@ -395,7 +394,6 @@ struct CoreOptions::Impl {
std::optional<std::string> field_default_func;
std::optional<std::string> scan_fallback_branch;
std::optional<std::string> data_file_external_paths;
std::optional<std::string> blob_external_storage_path;
std::optional<std::string> blob_view_upstream_warehouse;

std::map<std::string, std::string> raw_options;
Expand Down Expand Up @@ -564,13 +562,6 @@ struct CoreOptions::Impl {
// Parse blob-view-upstream-warehouse - warehouse path for configured blob view fields
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::BLOB_VIEW_UPSTREAM_WAREHOUSE, &blob_view_upstream_warehouse));
// Parse blob-external-storage-field - descriptor BLOB fields written to external storage
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::BLOB_EXTERNAL_STORAGE_FIELD, Options::FIELDS_SEPARATOR,
&blob_external_storage_fields, /*need_trim=*/true));
// Parse blob-external-storage-path - external storage path for configured BLOB fields
PAIMON_RETURN_NOT_OK(
parser.Parse(Options::BLOB_EXTERNAL_STORAGE_PATH, &blob_external_storage_path));
return Status::OK();
}

Expand Down Expand Up @@ -1492,14 +1483,6 @@ std::vector<std::string> CoreOptions::GetBlobInlineFields() const {
return blob_inline_fields;
}

const std::vector<std::string>& CoreOptions::GetBlobExternalStorageFields() const {
return impl_->blob_external_storage_fields;
}

std::optional<std::string> CoreOptions::GetBlobExternalStoragePath() const {
return impl_->blob_external_storage_path;
}

int64_t CoreOptions::GetLookupCacheFileRetentionMs() const {
return impl_->lookup_cache_file_retention_ms;
}
Expand Down
2 changes: 0 additions & 2 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ class PAIMON_EXPORT CoreOptions {
const std::vector<std::string>& GetBlobViewFields() const;
std::optional<std::string> GetBlobViewUpstreamWarehouse() const;
std::vector<std::string> GetBlobInlineFields() const;
const std::vector<std::string>& GetBlobExternalStorageFields() const;
std::optional<std::string> GetBlobExternalStoragePath() const;

const std::map<std::string, std::string>& ToMap() const;

Expand Down
8 changes: 0 additions & 8 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_TRUE(core_options.GetBlobDescriptorFields().empty());
ASSERT_TRUE(core_options.GetBlobViewFields().empty());
ASSERT_TRUE(core_options.GetBlobInlineFields().empty());
ASSERT_TRUE(core_options.GetBlobExternalStorageFields().empty());
ASSERT_EQ(std::nullopt, core_options.GetBlobViewUpstreamWarehouse());
ASSERT_EQ(std::nullopt, core_options.GetBlobExternalStoragePath());
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath());
Expand Down Expand Up @@ -227,8 +225,6 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::BLOB_FIELD, "blob1,blob2"},
{Options::BLOB_DESCRIPTOR_FIELD, "blob3,blob4"},
{Options::BLOB_VIEW_FIELD, "blob5"},
{Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"},
{Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"},
{Options::BLOB_VIEW_UPSTREAM_WAREHOUSE, "FILE:///tmp/blob_view_upstream_warehouse/"},
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
{Options::GLOBAL_INDEX_ENABLED, "false"},
Expand Down Expand Up @@ -367,10 +363,6 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ(core_options.GetBlobViewFields(), std::vector<std::string>({"blob5"}));
ASSERT_EQ(core_options.GetBlobInlineFields(),
std::vector<std::string>({"blob3", "blob4", "blob5"}));
ASSERT_EQ(core_options.GetBlobExternalStorageFields(),
std::vector<std::string>({"blob3", "blob4"}));
ASSERT_EQ(core_options.GetBlobExternalStoragePath(),
std::optional<std::string>("FILE:///tmp/blob_external_storage/"));
ASSERT_EQ(core_options.GetBlobViewUpstreamWarehouse(),
std::optional<std::string>("FILE:///tmp/blob_view_upstream_warehouse/"));
ASSERT_FALSE(core_options.LegacyPartitionNameEnabled());
Expand Down
27 changes: 5 additions & 22 deletions src/paimon/core/io/blob_data_file_writer_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "paimon/core/core_options.h"
#include "paimon/core/io/data_file_path_factory.h"
#include "paimon/core/manifest/file_source.h"
#include "paimon/format/blob/blob_writer_builder.h"
#include "paimon/format/file_format.h"
#include "paimon/format/file_format_factory.h"
#include "paimon/fs/file_system.h"
Expand All @@ -34,15 +33,13 @@ BlobDataFileWriterFactory::BlobDataFileWriterFactory(
const std::shared_ptr<arrow::Schema>& file_schema,
const std::optional<std::vector<std::string>>& write_cols,
const std::shared_ptr<LongCounter>& seq_num_counter,
const std::shared_ptr<DataFilePathFactory>& path_factory, PathCreator path_creator,
blob::BlobFormatWriter::WriteConsumer write_consumer, const std::shared_ptr<MemoryPool>& pool)
const std::shared_ptr<DataFilePathFactory>& path_factory,
const std::shared_ptr<MemoryPool>& pool)
: DataFileWriterFactory(options, schema_id, pool),
file_schema_(file_schema),
write_cols_(write_cols),
seq_num_counter_(seq_num_counter),
path_factory_(path_factory),
path_creator_(std::move(path_creator)),
write_consumer_(std::move(write_consumer)) {}
path_factory_(path_factory) {}

Result<std::unique_ptr<SingleFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>>
BlobDataFileWriterFactory::CreateWriter() const {
Expand All @@ -51,26 +48,12 @@ BlobDataFileWriterFactory::CreateWriter() const {
PAIMON_ASSIGN_OR_RAISE(WriterResources resources,
CreateWriterResources(*format, file_schema_,
/*create_stats_extractor=*/true));
if (write_consumer_) {
auto blob_writer_builder =
std::dynamic_pointer_cast<blob::BlobWriterBuilder>(resources.writer_builder);
if (!blob_writer_builder) {
return Status::Invalid(
"writer_builder cannot be casted to BlobWriterBuilder "
"in BlobDataFileWriterFactory");
}
blob_writer_builder->WithWriteConsumer(write_consumer_);
}

auto writer = std::make_unique<DataFileWriter>(
/*compression=*/"none", std::function<Status(::ArrowArray*, ::ArrowArray*)>(), schema_id_,
seq_num_counter_, FileSource::Append(), resources.stats_extractor,
path_factory_->IsExternalPath(), write_cols_, pool_);
if (!path_creator_) {
return Status::Invalid("BlobDataFileWriterFactory path creator is empty.");
}
PAIMON_RETURN_NOT_OK(
writer->Init(options_.GetFileSystem(), path_creator_(), resources.writer_builder));
PAIMON_RETURN_NOT_OK(writer->Init(options_.GetFileSystem(), path_factory_->NewBlobPath(),
resources.writer_builder));
return std::unique_ptr<SingleFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
std::move(writer));
}
Expand Down
8 changes: 0 additions & 8 deletions src/paimon/core/io/blob_data_file_writer_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
Expand All @@ -27,7 +26,6 @@
#include "paimon/core/io/data_file_writer.h"
#include "paimon/core/io/data_file_writer_factory.h"
#include "paimon/core/io/single_file_writer_factory.h"
#include "paimon/format/blob/blob_format_writer.h"
#include "paimon/result.h"

namespace arrow {
Expand All @@ -45,15 +43,11 @@ class BlobDataFileWriterFactory
: public DataFileWriterFactory,
public SingleFileWriterFactory<::ArrowArray*, std::shared_ptr<DataFileMeta>> {
public:
using PathCreator = std::function<std::string()>;

BlobDataFileWriterFactory(const CoreOptions& options, int64_t schema_id,
const std::shared_ptr<arrow::Schema>& file_schema,
const std::optional<std::vector<std::string>>& write_cols,
const std::shared_ptr<LongCounter>& seq_num_counter,
const std::shared_ptr<DataFilePathFactory>& path_factory,
PathCreator path_creator,
blob::BlobFormatWriter::WriteConsumer write_consumer,
const std::shared_ptr<MemoryPool>& pool);

Result<std::unique_ptr<SingleFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>>
Expand All @@ -64,8 +58,6 @@ class BlobDataFileWriterFactory
std::optional<std::vector<std::string>> write_cols_;
std::shared_ptr<LongCounter> seq_num_counter_;
std::shared_ptr<DataFilePathFactory> path_factory_;
PathCreator path_creator_;
blob::BlobFormatWriter::WriteConsumer write_consumer_;
};

} // namespace paimon
6 changes: 0 additions & 6 deletions src/paimon/core/io/data_file_path_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ class DataFilePathFactory : public PathFactory {
return NewPathFromName(NewFileName(data_file_prefix_, ".blob"));
}

/// Creates a new blob file path under the given external storage path for descriptor fields.
std::string NewExternalStorageBlobPath(const std::string& external_storage_path) const {
std::string file_name = NewFileName(data_file_prefix_, ".blob");
return PathUtil::JoinPath(external_storage_path, file_name);
}

std::string NewPathFromName(const std::string& file_name) const {
if (external_path_provider_ != nullptr) {
return external_path_provider_->GetNextExternalDataPath(file_name);
Expand Down
14 changes: 0 additions & 14 deletions src/paimon/core/io/data_file_path_factory_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,6 @@ TEST_F(DataFilePathFactoryTest, TestNewPath) {
ASSERT_EQ(factory_.NewPathFromName("index-file"), "/tmp/index-file");
}

TEST_F(DataFilePathFactoryTest, TestNewExternalStorageBlobPath) {
std::string blob_path1 = factory_.NewExternalStorageBlobPath("/tmp/external_blob");
std::string blob_path2 = factory_.NewExternalStorageBlobPath("/tmp/external_blob");

// Paths are unique (counter increments)
ASSERT_NE(blob_path1, blob_path2);
// Both start with the external storage path joined with the data file prefix
ASSERT_TRUE(StringUtils::StartsWith(blob_path1, "/tmp/external_blob/data-"));
ASSERT_TRUE(StringUtils::StartsWith(blob_path2, "/tmp/external_blob/data-"));
// Both end with .blob extension
ASSERT_TRUE(StringUtils::EndsWith(blob_path1, ".blob"));
ASSERT_TRUE(StringUtils::EndsWith(blob_path2, ".blob"));
}

TEST_F(DataFilePathFactoryTest, TestNewPathWithDataFilePrefixAndExternalPath) {
DataFilePathFactory factory;
ASSERT_OK_AND_ASSIGN(
Expand Down
Loading
Loading