Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions src/paimon/core/operation/merge_file_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,34 +247,38 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema(
std::shared_ptr<FieldsComparator>* key_comparator,
std::shared_ptr<FieldsComparator>* interval_partition_comparator,
std::shared_ptr<FieldsComparator>* sequence_fields_comparator) {
std::vector<DataField> key_fields;
std::vector<DataField> non_key_fields;
// 1. split user raw read schema to key and non-key fields
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_key_fields,
table_schema.TrimmedPrimaryKeys());
PAIMON_RETURN_NOT_OK(
SplitKeyAndNonKeyField(trimmed_key_fields, raw_read_schema, &key_fields, &non_key_fields));
// 2. add user defined sequence field in non-key fields
PAIMON_RETURN_NOT_OK(CompleteSequenceField(table_schema, options, &non_key_fields));
// 1. add user raw read schema to need_fields
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> need_fields,
DataField::ConvertArrowSchemaToDataFields(raw_read_schema));
// 2. add user defined sequence field to need_fields
PAIMON_RETURN_NOT_OK(CompleteSequenceField(table_schema, options, &need_fields));
if (options.GetMergeEngine() == MergeEngine::PARTIAL_UPDATE) {
// add sequence group fields for partial update
std::map<std::string, std::vector<std::string>> value_field_to_seq_group_field;
std::set<std::string> seq_group_key_set;
PAIMON_RETURN_NOT_OK(PartialUpdateMergeFunction::ParseSequenceGroupFields(
options, &value_field_to_seq_group_field, &seq_group_key_set));
PAIMON_RETURN_NOT_OK(PartialUpdateMergeFunction::CompleteSequenceGroupFields(
table_schema, value_field_to_seq_group_field, &non_key_fields));
table_schema, value_field_to_seq_group_field, &need_fields));
}
// 3. construct value fields: key fields are put before non-key fields
// 3. split need_fields to key and non-key fields
std::vector<DataField> key_fields;
std::vector<DataField> non_key_fields;
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_key_fields,
table_schema.TrimmedPrimaryKeys());
PAIMON_RETURN_NOT_OK(
SplitKeyAndNonKeyField(trimmed_key_fields, need_fields, &key_fields, &non_key_fields));

// 4. construct value fields: key fields are put before non-key fields
std::vector<DataField> value_fields;
value_fields.insert(value_fields.end(), key_fields.begin(), key_fields.end());
value_fields.insert(value_fields.end(), non_key_fields.begin(), non_key_fields.end());
*value_schema = DataField::ConvertDataFieldsToArrowSchema(value_fields);
// 4. create sequence field comparator
// 5. create sequence field comparator
PAIMON_ASSIGN_OR_RAISE(
*sequence_fields_comparator,
PrimaryKeyTableUtils::CreateSequenceFieldsComparator(value_fields, options));
// 5. complete key fields to all trimmed primary key
// 6. complete key fields to all trimmed primary key
key_fields.clear();
PAIMON_ASSIGN_OR_RAISE(key_fields, table_schema.GetFields(trimmed_key_fields));
PAIMON_ASSIGN_OR_RAISE(
Expand All @@ -285,7 +289,7 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema(
*interval_partition_comparator,
FieldsComparator::Create(key_fields,
/*is_ascending_order=*/true, /*use_view=*/false));
// 6. construct actual read fields: special + key + non-key value
// 7. construct actual read fields: special + key + non-key value
std::vector<DataField> read_fields;
std::vector<DataField> special_fields(
{SpecialFields::SequenceNumber(), SpecialFields::ValueKind()});
Expand All @@ -297,11 +301,8 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema(
}

Status MergeFileSplitRead::SplitKeyAndNonKeyField(
const std::vector<std::string>& trimmed_key_fields,
const std::shared_ptr<arrow::Schema>& raw_read_schema, std::vector<DataField>* key_fields,
std::vector<DataField>* non_key_fields) {
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> read_fields,
DataField::ConvertArrowSchemaToDataFields(raw_read_schema));
const std::vector<std::string>& trimmed_key_fields, const std::vector<DataField>& read_fields,
std::vector<DataField>* key_fields, std::vector<DataField>* non_key_fields) {
for (const auto& field : read_fields) {
auto iter = std::find(trimmed_key_fields.begin(), trimmed_key_fields.end(), field.Name());
if (iter == trimmed_key_fields.end()) {
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/operation/merge_file_split_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MergeFileSplitRead : public AbstractSplitRead {
std::shared_ptr<FieldsComparator>* sequence_fields_comparator);

static Status SplitKeyAndNonKeyField(const std::vector<std::string>& trimmed_key_fields,
const std::shared_ptr<arrow::Schema>& raw_read_schema,
const std::vector<DataField>& read_fields,
std::vector<DataField>* key_fields,
std::vector<DataField>* non_key_fields);

Expand Down
130 changes: 130 additions & 0 deletions test/inte/write_and_read_inte_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,136 @@ TEST_P(WriteAndReadInteTest, TestPkTimestampType) {
ASSERT_TRUE(success);
}

TEST_P(WriteAndReadInteTest, TestPKWithSequenceFieldInPKField) {
arrow::FieldVector fields = {
arrow::field("p1", arrow::utf8()),
arrow::field("p2", arrow::int32()),
arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::float64()),
};
auto schema = arrow::schema(fields);
auto [file_format, file_system] = GetParam();
std::map<std::string, std::string> options = {
{Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format},
{Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"},
{Options::FILE_SYSTEM, file_system}, {Options::SEQUENCE_FIELD, "p2"},
{"orc.read.enable-lazy-decoding", "true"}, {"orc.dictionary-key-size-threshold", "1"},
};
if (file_system == "jindo") {
options = AddOptionsForJindo(options);
}
ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(test_dir_, schema, /*partition_keys=*/{},
/*primary_keys=*/{"p1", "p2"}, options,
/*is_streaming_mode=*/true));
int64_t commit_identifier = 0;
std::string data_1 = R"([
["banana", 2, 12, 13.0],
["lucy", 0, 14, 5.2],
["dog", 1, 1, 4.1],
["banana", 2, 2, 3.0],
["mouse", 3, 100, 10.3]
])";
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1,
TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1,
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK_AND_ASSIGN(auto commit_msgs,
helper->WriteAndCommit(std::move(batch_1), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));
std::string data_2 = R"([
["apple", 0, 20, 23.0],
["banana", 1, 200, 20.3],
["dog", 1, 21, 24.1],
["mouse", 3, 200, 20.3]
])";
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2,
TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2,
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK_AND_ASSIGN(commit_msgs,
helper->WriteAndCommit(std::move(batch_2), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));
arrow::FieldVector fields_with_row_kind = fields;
fields_with_row_kind.insert(fields_with_row_kind.begin(),
arrow::field("_VALUE_KIND", arrow::int8()));
auto data_type = arrow::struct_(fields_with_row_kind);
ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits,
helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt));
std::string data = R"([
[0, "apple", 0, 20, 23.0],
[0, "banana", 1, 200, 20.3],
[0, "banana", 2, 2, 3.0],
[0, "dog", 1, 21, 24.1],
[0, "lucy", 0, 14, 5.2],
[0, "mouse", 3, 200, 20.3]
])";
ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, data));
ASSERT_TRUE(success);
}

TEST_P(WriteAndReadInteTest, TestPKWithSequenceFieldPartialInPKField) {
arrow::FieldVector fields = {
arrow::field("p1", arrow::utf8()),
arrow::field("p2", arrow::int32()),
arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::float64()),
};
auto schema = arrow::schema(fields);
auto [file_format, file_system] = GetParam();
std::map<std::string, std::string> options = {
{Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format},
{Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"},
{Options::FILE_SYSTEM, file_system}, {Options::SEQUENCE_FIELD, "p2,f1"},
{"orc.read.enable-lazy-decoding", "true"}, {"orc.dictionary-key-size-threshold", "1"},
};
if (file_system == "jindo") {
options = AddOptionsForJindo(options);
}
ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(test_dir_, schema, /*partition_keys=*/{},
/*primary_keys=*/{"p1", "p2"}, options,
/*is_streaming_mode=*/true));
int64_t commit_identifier = 0;
std::string data_1 = R"([
["banana", 2, 12, 13.0],
["lucy", 0, 14, 5.2],
["dog", 1, 1, 4.1],
["banana", 2, 2, 3.0],
["mouse", 3, 100, 10.3]
])";
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_1,
TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1,
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK_AND_ASSIGN(auto commit_msgs,
helper->WriteAndCommit(std::move(batch_1), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));
std::string data_2 = R"([
["apple", 0, 20, 23.0],
["banana", 1, 200, 20.3],
["dog", 1, 21, 24.1],
["mouse", 3, 10, 20.3]
])";
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch_2,
TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2,
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK_AND_ASSIGN(commit_msgs,
helper->WriteAndCommit(std::move(batch_2), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));
arrow::FieldVector fields_with_row_kind = fields;
fields_with_row_kind.insert(fields_with_row_kind.begin(),
arrow::field("_VALUE_KIND", arrow::int8()));
auto data_type = arrow::struct_(fields_with_row_kind);
ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits,
helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt));
std::string data = R"([
[0, "apple", 0, 20, 23.0],
[0, "banana", 1, 200, 20.3],
[0, "banana", 2, 12, 13.0],
[0, "dog", 1, 21, 24.1],
[0, "lucy", 0, 14, 5.2],
[0, "mouse", 3, 100, 10.3]
])";
ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, data));
ASSERT_TRUE(success);
}

std::vector<std::pair<std::string, std::string>> GetTestValuesForWriteAndReadInteTest() {
std::vector<std::pair<std::string, std::string>> values = {{"parquet", "local"}};
#ifdef PAIMON_ENABLE_ORC
Expand Down
Loading