Skip to content

Commit 5d26717

Browse files
committed
add a stream test
1 parent 6d066fd commit 5d26717

File tree

3 files changed

+91
-24
lines changed

3 files changed

+91
-24
lines changed

cpp/src/arrow/util/compression_test.cc

Lines changed: 91 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -513,48 +513,119 @@ TEST(TestCodecMisc, SpecifyCodecOptionsZstd) {
513513
return option;
514514
};
515515
constexpr int ZSTD_c_windowLog = 101;
516-
constexpr int ZSTD_d_windowLogMax = 100;
517516
const std::pair<arrow::util::ZstdCodecOptions, bool> options[]{
518517
{make_option(2, {}, {}), true},
519518
{make_option(9, {}, {}), true},
520519
{make_option(15, {}, {}), true},
521520
{make_option(-992, {}, {}), true},
522521
{make_option(3, {{ZSTD_c_windowLog, 23}}, {}), true},
523-
{make_option(3, {{ZSTD_c_windowLog, 28}}, {{ZSTD_d_windowLogMax, 28}}), true}};
522+
{make_option(3, {{ZSTD_c_windowLog, 28}}, {}), true}};
524523
CheckSpecifyCodecOptions<arrow::util::ZstdCodecOptions>(Compression::ZSTD, options);
525524
}
526525

527526
TEST(TestCodecMisc, ZstdLargerWindowLog) {
528527
constexpr int ZSTD_c_windowLog = 101;
529-
constexpr int ZSTD_d_windowLogMax = 100;
530528

531529
arrow::util::ZstdCodecOptions option1;
532-
option1.compression_level = 3;
530+
arrow::util::ZstdCodecOptions option2;
531+
option2.compression_context_params = {{ZSTD_c_windowLog, 28}};
532+
533+
std::vector<uint8_t> data = MakeRandomData(4 * 1024 * 1024);
534+
data.reserve(data.size() * 2);
535+
data.insert(data.end(), data.begin(), data.end());
536+
537+
auto compress = [&data](const arrow::util::ZstdCodecOptions& codecOption)
538+
-> Result<std::vector<uint8_t>> {
539+
ARROW_ASSIGN_OR_RAISE(auto codec, Codec::Create(Compression::ZSTD, codecOption));
540+
auto max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
541+
std::vector<uint8_t> compressed(max_compressed_len);
542+
543+
ARROW_ASSIGN_OR_RAISE(
544+
auto actual_size,
545+
codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data()));
546+
compressed.resize(actual_size);
547+
return compressed;
548+
};
549+
550+
ASSERT_OK_AND_ASSIGN(auto compressed1, compress(option1));
551+
ASSERT_OK_AND_ASSIGN(auto compressed2, compress(option2));
552+
ASSERT_GT(compressed1.size(), compressed2.size());
553+
}
554+
555+
TEST(TestCodecMisc, ZstdStreamLargerWindowLog) {
556+
constexpr int ZSTD_c_windowLog = 101;
557+
constexpr int ZSTD_d_windowLogMax = 100;
533558

559+
arrow::util::ZstdCodecOptions option1;
534560
arrow::util::ZstdCodecOptions option2;
535-
option2.compression_level = 3;
536-
// 1 << 23 = 8MB window size, default 2MB under level 3.
537-
option2.compression_context_params = {{ZSTD_c_windowLog, 23}};
538-
option2.decompression_context_params = {{ZSTD_d_windowLogMax, 23}};
561+
option2.compression_context_params = {{ZSTD_c_windowLog, 28}};
562+
option2.decompression_context_params = {{ZSTD_d_windowLogMax, 28}};
539563

540564
std::vector<uint8_t> data = MakeRandomData(4 * 1024 * 1024);
541565
data.reserve(data.size() * 2);
542566
data.insert(data.end(), data.begin(), data.end());
543567

544-
ASSERT_OK_AND_ASSIGN(auto result1, Codec::Create(Compression::ZSTD, option1));
545-
ASSERT_OK_AND_ASSIGN(auto result2, Codec::Create(Compression::ZSTD, option2));
568+
ASSERT_OK_AND_ASSIGN(auto codec1, Codec::Create(Compression::ZSTD, option1));
569+
ASSERT_OK_AND_ASSIGN(auto codec2, Codec::Create(Compression::ZSTD, option2));
570+
571+
auto compress = [&data](Codec& codec) -> Result<std::vector<uint8_t>> {
572+
auto max_compressed_len = codec.MaxCompressedLen(data.size(), data.data());
573+
std::vector<uint8_t> compressed(max_compressed_len);
574+
575+
int64_t bytes_written = 0;
576+
int64_t bytes_read = 0;
577+
ARROW_ASSIGN_OR_RAISE(auto compressor, codec.MakeCompressor());
578+
while (bytes_read < static_cast<int64_t>(data.size())) {
579+
ARROW_ASSIGN_OR_RAISE(
580+
auto result,
581+
compressor->Compress(data.size() - bytes_read, data.data() + bytes_read,
582+
max_compressed_len - bytes_written,
583+
compressed.data() + bytes_written));
584+
bytes_written += result.bytes_written;
585+
bytes_read += result.bytes_read;
586+
}
587+
while (true) {
588+
ARROW_ASSIGN_OR_RAISE(auto result,
589+
compressor->End(max_compressed_len - bytes_written,
590+
compressed.data() + bytes_written));
591+
bytes_written += result.bytes_written;
592+
if (!result.should_retry) {
593+
break;
594+
}
595+
}
596+
compressed.resize(bytes_written);
597+
return compressed;
598+
};
546599

547-
int max_compressed_len =
548-
static_cast<int>(result1->MaxCompressedLen(data.size(), data.data()));
549-
std::vector<uint8_t> compressed(max_compressed_len);
600+
ASSERT_OK_AND_ASSIGN(auto compressed1, compress(*codec1));
601+
ASSERT_OK_AND_ASSIGN(auto compressed2, compress(*codec2));
602+
ASSERT_GT(compressed1.size(), compressed2.size());
550603

551-
ASSERT_OK_AND_ASSIGN(
552-
int64_t actual_size1,
553-
result1->Compress(data.size(), data.data(), max_compressed_len, compressed.data()));
554-
ASSERT_OK_AND_ASSIGN(
555-
int64_t actual_size2,
556-
result2->Compress(data.size(), data.data(), max_compressed_len, compressed.data()));
557-
ASSERT_GT(actual_size1, actual_size2);
604+
ASSERT_OK_AND_ASSIGN(auto decompressor1, codec1->MakeDecompressor());
605+
ASSERT_OK_AND_ASSIGN(auto decompressor2, codec2->MakeDecompressor());
606+
607+
std::vector<uint8_t> decompressed(data.size());
608+
// Using a windowLog greater than ZSTD_WINDOWLOG_LIMIT_DEFAULT(1 << 27) requires
609+
// explicitly allowing such size at decompression stage.
610+
auto ret = decompressor1->Decompress(compressed2.size(), compressed2.data(),
611+
decompressed.size(), decompressed.data());
612+
ASSERT_NOT_OK(ret);
613+
ASSERT_EQ(ret.status().message(),
614+
"ZSTD decompress failed: Frame requires too much memory for decoding");
615+
616+
int64_t bytes_written = 0;
617+
int64_t bytes_read = 0;
618+
while (bytes_read < static_cast<int64_t>(compressed2.size())) {
619+
ASSERT_OK_AND_ASSIGN(auto result,
620+
decompressor2->Decompress(compressed2.size() - bytes_read,
621+
compressed2.data() + bytes_read,
622+
decompressed.size() - bytes_written,
623+
decompressed.data() + bytes_written));
624+
bytes_read += result.bytes_read;
625+
bytes_written += result.bytes_written;
626+
}
627+
ASSERT_TRUE(decompressor2->IsFinished());
628+
ASSERT_EQ(data, decompressed);
558629
}
559630

560631
TEST_P(CodecTest, MinMaxCompressionLevel) {

cpp/src/parquet/column_writer_test.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,10 +705,8 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
705705
}
706706
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCodecOptions) {
707707
constexpr int ZSTD_c_windowLog = 101;
708-
constexpr int ZSTD_d_windowLogMax = 100;
709708
auto codec_options = std::make_shared<::arrow::util::ZstdCodecOptions>();
710709
codec_options->compression_context_params = {{ZSTD_c_windowLog, 23}};
711-
codec_options->decompression_context_params = {{ZSTD_d_windowLogMax, 23}};
712710
this->TestRequiredWithCodecOptions(Encoding::PLAIN, Compression::ZSTD, false, false,
713711
LARGE_SIZE, codec_options);
714712
}

cpp/src/parquet/properties_test.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,11 @@ TEST(TestWriterProperties, AdvancedHandling) {
8282

8383
TEST(TestWriterProperties, SetCodecOptions) {
8484
constexpr int ZSTD_c_windowLog = 101;
85-
constexpr int ZSTD_d_windowLogMax = 100;
8685

8786
WriterProperties::Builder builder;
8887
builder.compression("gzip", Compression::GZIP);
8988
auto zstd_codec_options = std::make_shared<::arrow::util::ZstdCodecOptions>();
9089
zstd_codec_options->compression_context_params = {{ZSTD_c_windowLog, 23}};
91-
zstd_codec_options->decompression_context_params = {{ZSTD_d_windowLogMax, 23}};
9290
builder.codec_options("zstd", zstd_codec_options);
9391
builder.compression("brotli", Compression::BROTLI);
9492
auto gzip_codec_options = std::make_shared<::arrow::util::GZipCodecOptions>();

0 commit comments

Comments
 (0)