Skip to content

Commit e688141

Browse files
sahil1105IsaacWarren
authored andcommitted
Add proper error handling to CastingGenerator
1 parent 504d52b commit e688141

File tree

1 file changed

+44
-42
lines changed

1 file changed

+44
-42
lines changed

recipe/patches/0004-Bodo-Changes.patch

Lines changed: 44 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2-
index 1f8b6cc488..5ad7a5f78b 100644
2+
index 1f8b6cc488..322d50e598 100644
33
--- a/cpp/src/arrow/dataset/file_parquet.cc
44
+++ b/cpp/src/arrow/dataset/file_parquet.cc
55
@@ -26,16 +26,23 @@
@@ -26,7 +26,7 @@ index 1f8b6cc488..5ad7a5f78b 100644
2626
#include "arrow/util/tracing_internal.h"
2727
#include "parquet/arrow/reader.h"
2828
#include "parquet/arrow/schema.h"
29-
@@ -555,6 +562,60 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29+
@@ -555,6 +562,59 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
3030
});
3131
}
3232

@@ -38,45 +38,44 @@ index 1f8b6cc488..5ad7a5f78b 100644
3838
+ exec_ctx(std::make_shared<compute::ExecContext>(pool)) {}
3939
+
4040
+ Future<std::shared_ptr<RecordBatch>> operator()() {
41-
+ return this->source_().Then(
42-
+ [this](const std::shared_ptr<RecordBatch>& next) -> std::shared_ptr<RecordBatch> {
43-
+ if (IsIterationEnd(next)) {
44-
+ return next;
45-
+ }
46-
+ std::vector<std::shared_ptr<::arrow::Array>> out_cols;
47-
+ std::vector<std::shared_ptr<arrow::Field>> out_schema_fields;
41+
+ return this->source_().Then([this](const std::shared_ptr<RecordBatch>& next)
42+
+ -> Result<std::shared_ptr<RecordBatch>> {
43+
+ if (IsIterationEnd(next) || this->final_schema_.get() == nullptr) {
44+
+ return next;
45+
+ }
46+
+ std::vector<std::shared_ptr<::arrow::Array>> out_cols;
47+
+ std::vector<std::shared_ptr<arrow::Field>> out_schema_fields;
4848
+
49-
+ bool changed = false;
50-
+ for (const auto& field : this->final_schema_->fields()) {
51-
+ FieldRef field_ref = FieldRef(field->name());
52-
+ auto column_st = field_ref.GetOneOrNone(*next);
53-
+ std::shared_ptr<Array> column = column_st.ValueUnsafe();
54-
+ if (column) {
55-
+ if (!column->type()->Equals(field->type())) {
56-
+ // Referenced field was present but didn't have the expected type.
57-
+ auto converted_st =
58-
+ compute::Cast(column, field->type(), compute::CastOptions::Safe(),
59-
+ this->exec_ctx.get());
60-
+ auto converted = std::move(converted_st.ValueUnsafe());
61-
+ column = converted.make_array();
62-
+ changed = true;
63-
+ }
64-
+ out_cols.emplace_back(std::move(column));
65-
+ out_schema_fields.emplace_back(field->Copy());
66-
+ // XXX Do we need to handle the else case? What happens when the column
67-
+ // doesn't exist, e.g. all null or all the same value?
68-
+ }
49+
+ bool changed = false;
50+
+ for (const auto& field : this->final_schema_->fields()) {
51+
+ FieldRef field_ref = FieldRef(field->name());
52+
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> column,
53+
+ field_ref.GetOneOrNone(*next));
54+
+ if (column) {
55+
+ if (!column->type()->Equals(field->type())) {
56+
+ // Referenced field was present but didn't have the expected type.
57+
+ ARROW_ASSIGN_OR_RAISE(
58+
+ auto converted,
59+
+ compute::Cast(column, field->type(), compute::CastOptions::Safe(),
60+
+ this->exec_ctx.get()));
61+
+ column = converted.make_array();
62+
+ changed = true;
6963
+ }
64+
+ out_cols.emplace_back(std::move(column));
65+
+ out_schema_fields.emplace_back(field->Copy());
66+
+ // XXX Do we need to handle the else case? What happens when the column
67+
+ // doesn't exist, e.g. all null or all the same value?
68+
+ }
69+
+ }
7070
+
71-
+ if (changed) {
72-
+ return RecordBatch::Make(
73-
+ std::make_shared<Schema>(std::move(out_schema_fields),
74-
+ next->schema()->metadata()),
75-
+ next->num_rows(), std::move(out_cols));
76-
+ } else {
77-
+ return next;
78-
+ }
79-
+ });
71+
+ if (changed) {
72+
+ return RecordBatch::Make(std::make_shared<Schema>(std::move(out_schema_fields),
73+
+ next->schema()->metadata()),
74+
+ next->num_rows(), std::move(out_cols));
75+
+ } else {
76+
+ return next;
77+
+ }
78+
+ });
8079
+ }
8180
+
8281
+ RecordBatchGenerator source_;
@@ -87,7 +86,7 @@ index 1f8b6cc488..5ad7a5f78b 100644
8786
struct SlicingGenerator {
8887
SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
8988
: state(std::make_shared<State>(source, batch_size)) {}
90-
@@ -617,6 +678,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
89+
@@ -617,6 +677,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
9190
[this, options, parquet_fragment, pre_filtered,
9291
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
9392
-> Result<RecordBatchGenerator> {
@@ -97,7 +96,7 @@ index 1f8b6cc488..5ad7a5f78b 100644
9796
// Ensure that parquet_fragment has FileMetaData
9897
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
9998
if (!pre_filtered) {
100-
@@ -633,10 +697,17 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
99+
@@ -633,12 +696,19 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
101100
kParquetTypeName, options.get(), default_fragment_scan_options));
102101
int batch_readahead = options->batch_readahead;
103102
int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -114,11 +113,14 @@ index 1f8b6cc488..5ad7a5f78b 100644
114113
+ ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
115114
+ reader, row_groups, column_projection,
116115
+ cpu_executor, rows_to_readahead));
117-
+ generator =
116+
+ RecordBatchGenerator casted =
118117
+ CastingGenerator(std::move(generator), options->dataset_schema, options->pool);
119118
RecordBatchGenerator sliced =
120-
SlicingGenerator(std::move(generator), options->batch_size);
119+
- SlicingGenerator(std::move(generator), options->batch_size);
120+
+ SlicingGenerator(std::move(casted), options->batch_size);
121121
if (batch_readahead == 0) {
122+
return sliced;
123+
}
122124
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
123125
index a856a792a2..5c10dfc6ac 100644
124126
--- a/cpp/src/arrow/dataset/scanner.cc

0 commit comments

Comments
 (0)