Skip to content

Commit 58dbd3c

Browse files
committed
simplify error handling
1 parent 4e1bf26 commit 58dbd3c

File tree

6 files changed

+36
-73
lines changed

6 files changed

+36
-73
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ function(resolve_arrow_dependency)
6565
set(ARROW_BUILD_STATIC
6666
ON
6767
CACHE BOOL "" FORCE)
68-
# Workaround undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
68+
# Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*)
6969
set(ARROW_IPC
7070
ON
7171
CACHE BOOL "" FORCE)

src/iceberg/arrow/arrow_error_transform_internal.h

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,18 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) {
4343
} \
4444
lhs = std::move(result_name).ValueOrDie();
4545

46-
#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
47-
ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
48-
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind)
49-
50-
#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
51-
do { \
52-
auto&& _status = (expr); \
53-
if (!_status.ok()) { \
54-
return std::unexpected<Error>{ \
55-
{.kind = ToErrorKind(_status), .message = _status.ToString()}}; \
56-
} \
46+
#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \
47+
ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \
48+
ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \
49+
::iceberg::arrow::ToErrorKind)
50+
51+
#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \
52+
do { \
53+
auto&& _status = (expr); \
54+
if (!_status.ok()) { \
55+
return std::unexpected<Error>{{.kind = ::iceberg::arrow::ToErrorKind(_status), \
56+
.message = _status.ToString()}}; \
57+
} \
5758
} while (0)
5859

5960
} // namespace iceberg::arrow

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
namespace iceberg::parquet {
2323

24-
Result<std::shared_ptr<::arrow::RecordBatch>> ConvertRecordBatch(
24+
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
2525
std::shared_ptr<::arrow::RecordBatch> record_batch,
2626
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
2727
const Schema& projected_schema, const SchemaProjection& projection) {

src/iceberg/parquet/parquet_data_util_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace iceberg::parquet {
3535
/// \param projected_schema The projected Iceberg schema.
3636
/// \param projection The projection from projected Iceberg schema to the record batch.
3737
/// \return The converted record batch.
38-
Result<std::shared_ptr<::arrow::RecordBatch>> ConvertRecordBatch(
38+
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
3939
std::shared_ptr<::arrow::RecordBatch> record_batch,
4040
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
4141
const Schema& projected_schema, const SchemaProjection& projection);

src/iceberg/parquet/parquet_reader.cc

Lines changed: 21 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@
2626
#include <arrow/record_batch.h>
2727
#include <arrow/result.h>
2828
#include <arrow/type.h>
29-
#include <iceberg/result.h>
30-
#include <iceberg/schema_util.h>
3129
#include <parquet/arrow/reader.h>
3230
#include <parquet/arrow/schema.h>
3331
#include <parquet/file_reader.h>
3432
#include <parquet/properties.h>
3533

36-
#include "iceberg/arrow/arrow_fs_file_io.h"
34+
#include "iceberg/arrow/arrow_error_transform_internal.h"
35+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3736
#include "iceberg/parquet/parquet_data_util_internal.h"
3837
#include "iceberg/parquet/parquet_schema_util_internal.h"
38+
#include "iceberg/result.h"
3939
#include "iceberg/schema_internal.h"
40+
#include "iceberg/schema_util.h"
4041
#include "iceberg/util/checked_cast.h"
4142
#include "iceberg/util/macros.h"
4243

@@ -52,13 +53,8 @@ Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream(
5253
}
5354

5455
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
55-
auto result = io->fs()->OpenInputFile(file_info);
56-
if (!result.ok()) {
57-
return IOError("Failed to open file {} for reading: {}", options.path,
58-
result.status().message());
59-
}
60-
61-
return result.MoveValueUnsafe();
56+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, io->fs()->OpenInputFile(file_info));
57+
return input;
6258
}
6359

6460
Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader,
@@ -73,17 +69,12 @@ Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader,
7369
}
7470

7571
::parquet::arrow::SchemaManifest schema_manifest;
76-
auto schema_manifest_result = ::parquet::arrow::SchemaManifest::Make(
72+
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::SchemaManifest::Make(
7773
metadata->schema(), metadata->key_value_metadata(), reader->properties(),
78-
&schema_manifest);
79-
if (!schema_manifest_result.ok()) {
80-
return ParquetError("Failed to make schema manifest: {}",
81-
schema_manifest_result.message());
82-
}
74+
&schema_manifest));
8375

8476
// Leverage SchemaManifest to project the schema
8577
ICEBERG_ASSIGN_OR_RAISE(auto projection, Project(read_schema, schema_manifest));
86-
8778
return projection;
8879
}
8980

@@ -141,11 +132,8 @@ class ParquetReader::Impl {
141132
ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options));
142133
auto file_reader =
143134
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
144-
auto make_reader_result = ::parquet::arrow::FileReader::Make(
145-
pool, std::move(file_reader), arrow_reader_properties, &reader_);
146-
if (!make_reader_result.ok()) {
147-
return ParquetError("Failed to make file reader: {}", make_reader_result.message());
148-
}
135+
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
136+
pool, std::move(file_reader), arrow_reader_properties, &reader_));
149137

150138
// Project read schema onto the Parquet file schema
151139
ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_));
@@ -159,27 +147,17 @@ class ParquetReader::Impl {
159147
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
160148
}
161149

162-
auto next_result = context_->record_batch_reader_->Next();
163-
if (!next_result.ok()) {
164-
return ParquetError("Failed to read next batch: {}",
165-
next_result.status().message());
166-
}
167-
168-
auto batch = next_result.MoveValueUnsafe();
150+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, context_->record_batch_reader_->Next());
169151
if (!batch) {
170152
return std::nullopt;
171153
}
172154

173155
ICEBERG_ASSIGN_OR_RAISE(
174-
batch, ConvertRecordBatch(std::move(batch), context_->output_arrow_schema_,
156+
batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_,
175157
*read_schema_, projection_));
176158

177159
ArrowArray arrow_array;
178-
auto export_result = ::arrow::ExportRecordBatch(*batch, &arrow_array);
179-
if (!export_result.ok()) {
180-
return ParquetError("Failed to export the Arrow record batch: {}",
181-
export_result.message());
182-
}
160+
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array));
183161
return arrow_array;
184162
}
185163

@@ -190,11 +168,7 @@ class ParquetReader::Impl {
190168
}
191169

192170
if (context_ != nullptr) {
193-
auto close_result = context_->record_batch_reader_->Close();
194-
if (!close_result.ok()) {
195-
return ParquetError("Failed to close record batch reader: {}",
196-
close_result.message());
197-
}
171+
ICEBERG_ARROW_RETURN_NOT_OK(context_->record_batch_reader_->Close());
198172
context_.reset();
199173
}
200174

@@ -209,11 +183,8 @@ class ParquetReader::Impl {
209183
}
210184

211185
ArrowSchema arrow_schema;
212-
auto export_result =
213-
::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema);
214-
if (!export_result.ok()) {
215-
return ParquetError("Failed to export Arrow schema: {}", export_result.message());
216-
}
186+
ICEBERG_ARROW_RETURN_NOT_OK(
187+
::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema));
217188
return arrow_schema;
218189
}
219190

@@ -224,12 +195,8 @@ class ParquetReader::Impl {
224195
// Build the output Arrow schema
225196
ArrowSchema arrow_schema;
226197
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
227-
auto import_result = ::arrow::ImportSchema(&arrow_schema);
228-
if (!import_result.ok()) {
229-
return ParquetError("Failed to import Arrow schema: {}",
230-
import_result.status().message());
231-
}
232-
context_->output_arrow_schema_ = import_result.MoveValueUnsafe();
198+
ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_,
199+
::arrow::ImportSchema(&arrow_schema));
233200

234201
// Row group pruning based on the split
235202
// TODO(gangwu): add row group filtering based on zone map, bloom filter, etc.
@@ -254,12 +221,9 @@ class ParquetReader::Impl {
254221

255222
// Create the record batch reader
256223
ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_));
257-
auto reader_result = reader_->GetRecordBatchReader(row_group_indices, column_indices);
258-
if (!reader_result.ok()) {
259-
return ParquetError("Failed to get record batch reader: {}",
260-
reader_result.status().message());
261-
}
262-
context_->record_batch_reader_ = std::move(reader_result).MoveValueUnsafe();
224+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
225+
context_->record_batch_reader_,
226+
reader_->GetRecordBatchReader(row_group_indices, column_indices));
263227

264228
return {};
265229
}

src/iceberg/result.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ enum class ErrorKind {
4646
kNotFound,
4747
kNotImplemented,
4848
kNotSupported,
49-
kParquetError,
5049
kUnknownError,
5150
};
5251

@@ -94,7 +93,6 @@ DEFINE_ERROR_FUNCTION(NotAllowed)
9493
DEFINE_ERROR_FUNCTION(NotFound)
9594
DEFINE_ERROR_FUNCTION(NotImplemented)
9695
DEFINE_ERROR_FUNCTION(NotSupported)
97-
DEFINE_ERROR_FUNCTION(ParquetError)
9896
DEFINE_ERROR_FUNCTION(UnknownError)
9997

10098
#undef DEFINE_ERROR_FUNCTION

0 commit comments

Comments
 (0)