-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the bug, including details regarding any error messages, version, and platform.
When specifying the column as LargeStringType during a Parquet read with a small batch size (4096), we noticed some very bad performance. On investigation, we discovered that the cast from StringType to LargeStringType, which requires casting the offsets from int32 to int64 was very expensive. The reason was that the cast is performed on small slices of the original table that is read from Parquet row-groups (as a single Arrow table). Each slice has an offset to identify its starting point in its buffers and a length (<=4096) to identify its end from the offset.
The way the cast is performed for the string offsets buffer of a slice is that it
- Allocates a buffer of (offset + length + 1) elements ()
ctx->Allocate((output->length + output->offset + 1) * sizeof(output_offset_type))); - It then
memsets the firstoffsetelements to0(https://github.com/apache/arrow/blob/6a2e19a852b367c72d7b12da4d104456491ed8b7/cpp/src/arrow/compute/kernels/scalar_cast_string.cc#L252-#L253) - Finally, it then performs the cast on the
lengthmany elements (https://github.com/apache/arrow/blob/6a2e19a852b367c72d7b12da4d104456491ed8b7/cpp/src/arrow/compute/kernels/scalar_cast_string.cc#L254-#L256).
When the original row group is large, say a few million rows, and the batch size is small, this can mean that steps 1 & 2 are repeated many times, each on an increasingly larger number. E.g. When the batch size is 4096, the first offset is 0 and there's no wasted computation. However, on every successive batch, the offset grows linearly, and the work adds up when there are many batches: 4096 + 8192 + 12288 + 16384 + 20480 + .... which means that for N total rows, we're doing O(N^2) work and allocating O(N^2) space.
We were able to resolve this by doing the following:
- In
ParquetFileFormat::ScanBatchesAsync, set thebatch_sizefor thereaderto INT64_MAX. Since we already have aSlicingGeneratorthat is responsible for converting the reader's output intobatch_sizesized batches, we don't need the reader to batch its output. - Add a
CastingGeneratorthat applies any required casts to the output of the reader before the output is passed toSlicingGenerator. The logic for the cast closely follows the logic inMakeExecBatch(). There's an existing note in that function which states that the readers should be the ones performing the cast (arrow/cpp/src/arrow/compute/expression.cc
Line 644 in 6a2e19a
Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial, ), so I believe this change gets us closer to that goal.arrow/cpp/src/arrow/compute/expression.cc
Line 672 in 6a2e19a
// This *should* be handled by readers, and will just be an error in the future.
I have an implementation here: main...bodo-ai:arrow-bodo-fork:sahil/pq-read-add-casting-gen that I will open a PR with. I have tested this on our use cases and it performs very well. Even for small files, we were able to improve E2E read performance by 8x since this step was the main bottleneck.
Component(s)
C++, Parquet