Skip to content

Commit 5852134

Browse files
authored
GH-42971: [C++] Parquet stream writer: Allow writing BYTE_ARRAY with converted type NONE (#44739)
### Rationale for this change We are trying to store binary data (in our case, dump of captured CAN messages) in a parquet file. The data has a variable length (from 0 to 8 bytes) and is not an UTF-8 string (or a text string at all). For this, physical type BYTE_ARRAY and logical type NONE seems appropriate. Unfortunately, the Parquet stream writer will not let us do that. We can do either fixed length and converted type NONE, or variable length and converted type UTF-8. This change relaxes the type check on byte arrays to allow use of the NONE converted type. ### What changes are included in this PR? Allow the Parquet stream writer to store data in a BYTE_ARRAY with NONE logical type. The changes are based to similar changes made earlier to the stream reader. The reader part has already been fixed in 4d82549 and this uses a similar implementation, but with a stricter set of "exceptions" (only BYTE_ARRAY with NONE type are allowed). ### Are these changes tested? Yes. ### Are there any user-facing changes? Only a new feature. * GitHub Issue: #42971 Authored-by: Adrien Destugues <adrien.destugues@opensource.viveris.fr> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 11c97b8 commit 5852134

File tree

3 files changed

+50
-9
lines changed

3 files changed

+50
-9
lines changed

cpp/src/parquet/stream_writer.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,20 +129,26 @@ StreamWriter& StreamWriter::operator<<(FixedStringView v) {
129129
}
130130

131131
StreamWriter& StreamWriter::operator<<(const char* v) {
132-
return WriteVariableLength(v, std::strlen(v));
132+
return WriteVariableLength(v, std::strlen(v), ConvertedType::UTF8);
133133
}
134134

135135
StreamWriter& StreamWriter::operator<<(const std::string& v) {
136-
return WriteVariableLength(v.data(), v.size());
136+
return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8);
137137
}
138138

139139
StreamWriter& StreamWriter::operator<<(::std::string_view v) {
140-
return WriteVariableLength(v.data(), v.size());
140+
return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8);
141+
}
142+
143+
StreamWriter& StreamWriter::operator<<(RawDataView v) {
144+
return WriteVariableLength(reinterpret_cast<const char*>(v.data()), v.size(),
145+
ConvertedType::NONE);
141146
}
142147

143148
StreamWriter& StreamWriter::WriteVariableLength(const char* data_ptr,
144-
std::size_t data_len) {
145-
CheckColumn(Type::BYTE_ARRAY, ConvertedType::UTF8);
149+
std::size_t data_len,
150+
ConvertedType::type type) {
151+
CheckColumn(Type::BYTE_ARRAY, type);
146152

147153
auto writer = static_cast<ByteArrayWriter*>(row_group_writer_->column(column_index_++));
148154

cpp/src/parquet/stream_writer.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include <string_view>
2727
#include <vector>
2828

29+
#include "arrow/util/span.h"
30+
2931
#include "parquet/column_writer.h"
3032
#include "parquet/file_writer.h"
3133

@@ -151,6 +153,12 @@ class PARQUET_EXPORT StreamWriter {
151153
StreamWriter& operator<<(const std::string& v);
152154
StreamWriter& operator<<(::std::string_view v);
153155

156+
/// \brief Helper class to write variable length raw data.
157+
using RawDataView = ::arrow::util::span<const uint8_t>;
158+
159+
/// \brief Output operators for variable length raw data.
160+
StreamWriter& operator<<(RawDataView v);
161+
154162
/// \brief Output operator for optional fields.
155163
template <typename T>
156164
StreamWriter& operator<<(const optional<T>& v) {
@@ -190,7 +198,8 @@ class PARQUET_EXPORT StreamWriter {
190198
return *this;
191199
}
192200

193-
StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);
201+
StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len,
202+
ConvertedType::type converted_type);
194203

195204
StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
196205

cpp/src/parquet/stream_writer_test.cc

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ class TestStreamWriter : public ::testing::Test {
8080
fields.push_back(schema::PrimitiveNode::Make("double_field", Repetition::REQUIRED,
8181
Type::DOUBLE, ConvertedType::NONE));
8282

83+
fields.push_back(schema::PrimitiveNode::Make("bytes_field", Repetition::REQUIRED,
84+
Type::BYTE_ARRAY, ConvertedType::NONE));
85+
8386
return std::static_pointer_cast<schema::GroupNode>(
8487
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
8588
}
@@ -99,7 +102,7 @@ TEST_F(TestStreamWriter, DefaultConstructed) {
99102
EXPECT_EQ(0, os.current_column());
100103
EXPECT_EQ(0, os.current_row());
101104
EXPECT_EQ(0, os.num_columns());
102-
EXPECT_EQ(0, os.SkipColumns(10));
105+
EXPECT_EQ(0, os.SkipColumns(11));
103106
}
104107

105108
TEST_F(TestStreamWriter, TypeChecking) {
@@ -162,6 +165,17 @@ TEST_F(TestStreamWriter, TypeChecking) {
162165
EXPECT_THROW(writer_ << 5.4f, ParquetException);
163166
EXPECT_NO_THROW(writer_ << 5.4);
164167

168+
// Required type: Variable length byte array.
169+
// Strings and naked char* are rejected because they should use UTF8 instead of None
170+
// type.
171+
EXPECT_EQ(10, writer_.current_column());
172+
EXPECT_THROW(writer_ << 5, ParquetException);
173+
EXPECT_THROW(writer_ << char3_array, ParquetException);
174+
EXPECT_THROW(writer_ << char4_array, ParquetException);
175+
EXPECT_THROW(writer_ << char5_array, ParquetException);
176+
EXPECT_THROW(writer_ << std::string("not ok"), ParquetException);
177+
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"\xff\0ok", 4));
178+
165179
EXPECT_EQ(0, writer_.current_row());
166180
EXPECT_NO_THROW(writer_ << EndRow);
167181
EXPECT_EQ(1, writer_.current_row());
@@ -210,6 +224,11 @@ TEST_F(TestStreamWriter, RequiredFieldChecking) {
210224
EXPECT_THROW(writer_ << optional<double>(), ParquetException);
211225
EXPECT_NO_THROW(writer_ << optional<double>(5.4));
212226

227+
// Required field of type: Variable length byte array.
228+
EXPECT_THROW(writer_ << optional<StreamWriter::RawDataView>(), ParquetException);
229+
EXPECT_NO_THROW(
230+
writer_ << std::make_optional<StreamWriter::RawDataView>((uint8_t*)"ok", 2));
231+
213232
EXPECT_NO_THROW(writer_ << EndRow);
214233
}
215234

@@ -234,6 +253,7 @@ TEST_F(TestStreamWriter, EndRow) {
234253
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) + 123));
235254
EXPECT_NO_THROW(writer_ << 25.4f);
236255
EXPECT_NO_THROW(writer_ << 3.3424);
256+
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"ok", 2));
237257
// Correct use of end row after all fields have been output.
238258
EXPECT_NO_THROW(writer_ << EndRow);
239259
EXPECT_EQ(1, writer_.current_row());
@@ -272,6 +292,10 @@ TEST_F(TestStreamWriter, EndRowGroup) {
272292
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) - i * i)) << "index: " << i;
273293
EXPECT_NO_THROW(writer_ << 42325.4f / float(i + 1)) << "index: " << i;
274294
EXPECT_NO_THROW(writer_ << 3.2342e5 / double(i + 1)) << "index: " << i;
295+
std::string tmpString = std::to_string(i);
296+
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)tmpString.c_str(),
297+
tmpString.length()))
298+
<< "index: " << i;
275299
EXPECT_NO_THROW(writer_ << EndRow) << "index: " << i;
276300

277301
if (i % 1000 == 0) {
@@ -293,7 +317,8 @@ TEST_F(TestStreamWriter, SkipColumns) {
293317
writer_ << true << std::string("Cannot skip mandatory columns");
294318
EXPECT_THROW(writer_.SkipColumns(1), ParquetException);
295319
writer_ << 'x' << std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
296-
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
320+
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0
321+
<< StreamWriter::RawDataView((uint8_t*)"ok", 2);
297322
writer_ << EndRow;
298323
}
299324

@@ -304,7 +329,8 @@ TEST_F(TestStreamWriter, AppendNotImplemented) {
304329
writer_ = StreamWriter{ParquetFileWriter::Open(outfile, GetSchema())};
305330
writer_ << false << std::string("Just one row") << 'x'
306331
<< std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
307-
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
332+
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0
333+
<< StreamWriter::RawDataView((uint8_t*)"ok", 2);
308334
writer_ << EndRow;
309335
writer_ = StreamWriter{};
310336

0 commit comments

Comments
 (0)