diff --git a/src/paimon/core/operation/merge_file_split_read.cpp b/src/paimon/core/operation/merge_file_split_read.cpp index a180332b..14227c8b 100644 --- a/src/paimon/core/operation/merge_file_split_read.cpp +++ b/src/paimon/core/operation/merge_file_split_read.cpp @@ -247,15 +247,11 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema( std::shared_ptr* key_comparator, std::shared_ptr* interval_partition_comparator, std::shared_ptr* sequence_fields_comparator) { - std::vector key_fields; - std::vector non_key_fields; - // 1. split user raw read schema to key and non-key fields - PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_key_fields, - table_schema.TrimmedPrimaryKeys()); - PAIMON_RETURN_NOT_OK( - SplitKeyAndNonKeyField(trimmed_key_fields, raw_read_schema, &key_fields, &non_key_fields)); - // 2. add user defined sequence field in non-key fields - PAIMON_RETURN_NOT_OK(CompleteSequenceField(table_schema, options, &non_key_fields)); + // 1. add user raw read schema to need_fields + PAIMON_ASSIGN_OR_RAISE(std::vector need_fields, + DataField::ConvertArrowSchemaToDataFields(raw_read_schema)); + // 2. add user defined sequence field to need_fields + PAIMON_RETURN_NOT_OK(CompleteSequenceField(table_schema, options, &need_fields)); if (options.GetMergeEngine() == MergeEngine::PARTIAL_UPDATE) { // add sequence group fields for partial update std::map> value_field_to_seq_group_field; @@ -263,18 +259,26 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema( PAIMON_RETURN_NOT_OK(PartialUpdateMergeFunction::ParseSequenceGroupFields( options, &value_field_to_seq_group_field, &seq_group_key_set)); PAIMON_RETURN_NOT_OK(PartialUpdateMergeFunction::CompleteSequenceGroupFields( - table_schema, value_field_to_seq_group_field, &non_key_fields)); + table_schema, value_field_to_seq_group_field, &need_fields)); } - // 3. construct value fields: key fields are put before non-key fields + // 3. split need_fields to key and non-key fields + std::vector key_fields; + std::vector non_key_fields; + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_key_fields, + table_schema.TrimmedPrimaryKeys()); + PAIMON_RETURN_NOT_OK( + SplitKeyAndNonKeyField(trimmed_key_fields, need_fields, &key_fields, &non_key_fields)); + + // 4. construct value fields: key fields are put before non-key fields std::vector value_fields; value_fields.insert(value_fields.end(), key_fields.begin(), key_fields.end()); value_fields.insert(value_fields.end(), non_key_fields.begin(), non_key_fields.end()); *value_schema = DataField::ConvertDataFieldsToArrowSchema(value_fields); - // 4. create sequence field comparator + // 5. create sequence field comparator PAIMON_ASSIGN_OR_RAISE( *sequence_fields_comparator, PrimaryKeyTableUtils::CreateSequenceFieldsComparator(value_fields, options)); - // 5. complete key fields to all trimmed primary key + // 6. complete key fields to all trimmed primary key key_fields.clear(); PAIMON_ASSIGN_OR_RAISE(key_fields, table_schema.GetFields(trimmed_key_fields)); PAIMON_ASSIGN_OR_RAISE( @@ -285,7 +289,7 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema( *interval_partition_comparator, FieldsComparator::Create(key_fields, /*is_ascending_order=*/true, /*use_view=*/false)); - // 6. construct actual read fields: special + key + non-key value + // 7. construct actual read fields: special + key + non-key value std::vector read_fields; std::vector special_fields( {SpecialFields::SequenceNumber(), SpecialFields::ValueKind()}); @@ -297,11 +301,8 @@ Status MergeFileSplitRead::GenerateKeyValueReadSchema( } Status MergeFileSplitRead::SplitKeyAndNonKeyField( - const std::vector& trimmed_key_fields, - const std::shared_ptr& raw_read_schema, std::vector* key_fields, - std::vector* non_key_fields) { - PAIMON_ASSIGN_OR_RAISE(std::vector read_fields, - DataField::ConvertArrowSchemaToDataFields(raw_read_schema)); + const std::vector& trimmed_key_fields, const std::vector& read_fields, + std::vector* key_fields, std::vector* non_key_fields) { for (const auto& field : read_fields) { auto iter = std::find(trimmed_key_fields.begin(), trimmed_key_fields.end(), field.Name()); if (iter == trimmed_key_fields.end()) { diff --git a/src/paimon/core/operation/merge_file_split_read.h b/src/paimon/core/operation/merge_file_split_read.h index 7c077b7d..2dec4843 100644 --- a/src/paimon/core/operation/merge_file_split_read.h +++ b/src/paimon/core/operation/merge_file_split_read.h @@ -140,7 +140,7 @@ class MergeFileSplitRead : public AbstractSplitRead { std::shared_ptr* sequence_fields_comparator); static Status SplitKeyAndNonKeyField(const std::vector& trimmed_key_fields, - const std::shared_ptr& raw_read_schema, + const std::vector& read_fields, std::vector* key_fields, std::vector* non_key_fields); diff --git a/test/inte/write_and_read_inte_test.cpp b/test/inte/write_and_read_inte_test.cpp index ed95081c..a0d81739 100644 --- a/test/inte/write_and_read_inte_test.cpp +++ b/test/inte/write_and_read_inte_test.cpp @@ -533,6 +533,136 @@ TEST_P(WriteAndReadInteTest, TestPkTimestampType) { ASSERT_TRUE(success); } +TEST_P(WriteAndReadInteTest, TestPKWithSequenceFieldInPKField) { + arrow::FieldVector fields = { + arrow::field("p1", arrow::utf8()), + arrow::field("p2", arrow::int32()), + arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::float64()), + }; + auto schema = arrow::schema(fields); + auto [file_format, file_system] = GetParam(); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, + {Options::FILE_SYSTEM, file_system}, {Options::SEQUENCE_FIELD, "p2"}, + {"orc.read.enable-lazy-decoding", "true"}, {"orc.dictionary-key-size-threshold", "1"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(test_dir_, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"p1", "p2"}, options, + /*is_streaming_mode=*/true)); + int64_t commit_identifier = 0; + std::string data_1 = R"([ + ["banana", 2, 12, 13.0], + ["lucy", 0, 14, 5.2], + ["dog", 1, 1, 4.1], + ["banana", 2, 2, 3.0], + ["mouse", 3, 100, 10.3] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_1, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch_1), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + std::string data_2 = R"([ + ["apple", 0, 20, 23.0], + ["banana", 1, 200, 20.3], + ["dog", 1, 21, 24.1], + ["mouse", 3, 200, 20.3] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_2, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(commit_msgs, + helper->WriteAndCommit(std::move(batch_2), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + std::string data = R"([ + [0, "apple", 0, 20, 23.0], + [0, "banana", 1, 200, 20.3], + [0, "banana", 2, 2, 3.0], + [0, "dog", 1, 21, 24.1], + [0, "lucy", 0, 14, 5.2], + [0, "mouse", 3, 200, 20.3] + ])"; + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, data)); + ASSERT_TRUE(success); +} + +TEST_P(WriteAndReadInteTest, TestPKWithSequenceFieldPartialInPKField) { + arrow::FieldVector fields = { + arrow::field("p1", arrow::utf8()), + arrow::field("p2", arrow::int32()), + arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::float64()), + }; + auto schema = arrow::schema(fields); + auto [file_format, file_system] = GetParam(); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, {Options::BUCKET, "1"}, + {Options::FILE_SYSTEM, file_system}, {Options::SEQUENCE_FIELD, "p2,f1"}, + {"orc.read.enable-lazy-decoding", "true"}, {"orc.dictionary-key-size-threshold", "1"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(test_dir_, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"p1", "p2"}, options, + /*is_streaming_mode=*/true)); + int64_t commit_identifier = 0; + std::string data_1 = R"([ + ["banana", 2, 12, 13.0], + ["lucy", 0, 14, 5.2], + ["dog", 1, 1, 4.1], + ["banana", 2, 2, 3.0], + ["mouse", 3, 100, 10.3] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_1, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data_1, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch_1), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + std::string data_2 = R"([ + ["apple", 0, 20, 23.0], + ["banana", 1, 200, 20.3], + ["dog", 1, 21, 24.1], + ["mouse", 3, 10, 20.3] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_2, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data_2, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(commit_msgs, + helper->WriteAndCommit(std::move(batch_2), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + std::string data = R"([ + [0, "apple", 0, 20, 23.0], + [0, "banana", 1, 200, 20.3], + [0, "banana", 2, 12, 13.0], + [0, "dog", 1, 21, 24.1], + [0, "lucy", 0, 14, 5.2], + [0, "mouse", 3, 100, 10.3] + ])"; + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, data)); + ASSERT_TRUE(success); +} + std::vector> GetTestValuesForWriteAndReadInteTest() { std::vector> values = {{"parquet", "local"}}; #ifdef PAIMON_ENABLE_ORC