Skip to content

Commit 697f501

Browse files
sgilmore10pitrou
andauthored
GH-41476: [Python][C++] Impossible to specify is_adjusted_to_utc for Time type when writing to Parquet (#47316)
### Rationale for this change As of today, it's not possible to write Parquet `TIME` data whose `isAdjustedToUTC` parameter is `false`. Instead, `isAdjustedToUTC` is hard-coded to `true` [here](https://github.com/apache/arrow/blob/2dd3ccda6437f79aa34641bd3197dd7392ae4aec/cpp/src/parquet/arrow/schema.cc#L431). Unfortunately, some Parquet consumers only support `TIME` data if the `isAdjustedToUTC` parameter is `false`, meaning they cannot import Parquet `TIME` data generated by our Parquet Writer. For example, the apache/spark Parquet reader only supports Parquet `TIME` columns if [`isAdjustedToUTC=false` and `units=MICROSECONDS`](https://github.com/apache/spark/blob/554f6b64f1e2b2346499f6d3340a3695244bfc84/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L309). Adding support for writing `TIME` data with the `isAdjustedToUTC` set to `false` would unblock users who need to write Spark-compatible Parquet data. ### What changes are included in this PR? 1. Added a `write_time_adjusted_to_utc` as a property to `parquet::ArrowWriterProperties`. If `true`, all `TIME` columns have their `isAdjustedToUTC` parameters set to `true`. Otherwise, `isAdjustedToUTC` is set to `false` for all `TIME` columns. This property is `false` by default. 2. Added `enable_write_time_adjusted_to_utc()` and `disable_write_time_adjusted_to_utc()` methods to `parquet::ArrowWriterProperties::Builder`. ### Are these changes tested? Yes. I added test case `ParquetTimeAdjustedToUTC` to test suite `TestConvertArrowSchema`. ### Are there any user-facing changes? Yes. Users can now configure the `isAdjustedToUTC` parameter for Parquet `TIME` data. NOTE: This change introduces an incompatibility. The default value for `isAdjustedToUTC` parameter is now `false` instead of `true`. ### NOTE 1. I did not update the PyArrow interface because I am not familiar with that code base. I was planning on creating a new GitHub issue to track that work separately. 2. There already exists an open PR (#43268) for addressing this issue. However, that PR was last active over a year ago and seems stale. * GitHub Issue: #41476 Lead-authored-by: Sarah Gilmore <[email protected]> Co-authored-by: Sarah Gilmore <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Sarah Gilmore <[email protected]>
1 parent de52048 commit 697f501

File tree

3 files changed

+89
-13
lines changed

3 files changed

+89
-13
lines changed

cpp/src/parquet/arrow/arrow_schema_test.cc

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,11 +1352,11 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
13521352
{"float16", ::arrow::float16(), LogicalType::Float16(),
13531353
ParquetType::FIXED_LEN_BYTE_ARRAY, 2},
13541354
{"time32", ::arrow::time32(::arrow::TimeUnit::MILLI),
1355-
LogicalType::Time(true, LogicalType::TimeUnit::MILLIS), ParquetType::INT32, -1},
1355+
LogicalType::Time(false, LogicalType::TimeUnit::MILLIS), ParquetType::INT32, -1},
13561356
{"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO),
1357-
LogicalType::Time(true, LogicalType::TimeUnit::MICROS), ParquetType::INT64, -1},
1357+
LogicalType::Time(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64, -1},
13581358
{"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
1359-
LogicalType::Time(true, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1},
1359+
LogicalType::Time(false, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1},
13601360
{"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
13611361
LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
13621362
/*is_from_converted_type=*/false,
@@ -1782,6 +1782,59 @@ TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
17821782
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
17831783
}
17841784

1785+
TEST_F(TestConvertArrowSchema, ParquetTimeAdjustedToUTC) {
1786+
// Verify Parquet Time types have the appropriate isAdjustedToUTC value, depending
1787+
// on the return value of ArrowWriterProperties::write_time_adjusted_to_utc()
1788+
1789+
struct FieldConstructionArguments {
1790+
std::string name;
1791+
std::shared_ptr<::arrow::DataType> datatype;
1792+
std::shared_ptr<const LogicalType> logical_type;
1793+
parquet::Type::type physical_type;
1794+
int physical_length;
1795+
};
1796+
1797+
auto run_test =
1798+
[this](const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties,
1799+
bool time_adjusted_to_utc) {
1800+
std::vector<FieldConstructionArguments> cases = {
1801+
{"time32", ::arrow::time32(::arrow::TimeUnit::MILLI),
1802+
LogicalType::Time(time_adjusted_to_utc, LogicalType::TimeUnit::MILLIS),
1803+
ParquetType::INT32, -1},
1804+
{"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO),
1805+
LogicalType::Time(time_adjusted_to_utc, LogicalType::TimeUnit::MICROS),
1806+
ParquetType::INT64, -1},
1807+
{"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
1808+
LogicalType::Time(time_adjusted_to_utc, LogicalType::TimeUnit::NANOS),
1809+
ParquetType::INT64, -1}};
1810+
1811+
std::vector<std::shared_ptr<Field>> arrow_fields;
1812+
std::vector<NodePtr> parquet_fields;
1813+
for (const FieldConstructionArguments& c : cases) {
1814+
arrow_fields.push_back(::arrow::field(c.name, c.datatype, false));
1815+
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED,
1816+
c.logical_type, c.physical_type,
1817+
c.physical_length));
1818+
}
1819+
1820+
EXPECT_EQ(arrow_writer_properties->write_time_adjusted_to_utc(),
1821+
time_adjusted_to_utc);
1822+
ASSERT_OK(ConvertSchema(arrow_fields, arrow_writer_properties));
1823+
CheckFlatSchema(parquet_fields);
1824+
};
1825+
1826+
// Verify write_time_adjusted_to_utc is false by default.
1827+
ArrowWriterProperties::Builder builder;
1828+
auto arrow_writer_properties = builder.build();
1829+
run_test(arrow_writer_properties, false);
1830+
1831+
arrow_writer_properties = builder.set_time_adjusted_to_utc(true)->build();
1832+
run_test(arrow_writer_properties, true);
1833+
1834+
arrow_writer_properties = builder.set_time_adjusted_to_utc(false)->build();
1835+
run_test(arrow_writer_properties, false);
1836+
}
1837+
17851838
class TestConvertRoundTrip : public ::testing::Test {
17861839
public:
17871840
::arrow::Status RoundTripSchema(

cpp/src/parquet/arrow/schema.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -420,18 +420,21 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
420420
break;
421421
case ArrowTypeId::TIME32:
422422
type = ParquetType::INT32;
423-
logical_type =
424-
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MILLIS);
423+
logical_type = LogicalType::Time(
424+
/*is_adjusted_to_utc=*/arrow_properties.write_time_adjusted_to_utc(),
425+
LogicalType::TimeUnit::MILLIS);
425426
break;
426427
case ArrowTypeId::TIME64: {
427428
type = ParquetType::INT64;
428429
auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
429430
if (time_type->unit() == ::arrow::TimeUnit::NANO) {
430-
logical_type =
431-
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::NANOS);
431+
logical_type = LogicalType::Time(
432+
/*is_adjusted_to_utc=*/arrow_properties.write_time_adjusted_to_utc(),
433+
LogicalType::TimeUnit::NANOS);
432434
} else {
433-
logical_type =
434-
LogicalType::Time(/*is_adjusted_to_utc=*/true, LogicalType::TimeUnit::MICROS);
435+
logical_type = LogicalType::Time(
436+
/*is_adjusted_to_utc=*/arrow_properties.write_time_adjusted_to_utc(),
437+
LogicalType::TimeUnit::MICROS);
435438
}
436439
} break;
437440
case ArrowTypeId::DURATION:

cpp/src/parquet/properties.h

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,7 +1161,8 @@ class PARQUET_EXPORT ArrowWriterProperties {
11611161
compliant_nested_types_(true),
11621162
engine_version_(V2),
11631163
use_threads_(kArrowDefaultUseThreads),
1164-
executor_(NULLPTR) {}
1164+
executor_(NULLPTR),
1165+
write_time_adjusted_to_utc_(false) {}
11651166
virtual ~Builder() = default;
11661167

11671168
/// \brief Disable writing legacy int96 timestamps (default disabled).
@@ -1256,12 +1257,21 @@ class PARQUET_EXPORT ArrowWriterProperties {
12561257
return this;
12571258
}
12581259

1260+
/// \brief Set the value of isAdjustedTOUTC when writing a TIME column
1261+
///
1262+
/// Default is false because Arrow TIME data is expressed in an unspecified timezone.
1263+
/// Note this setting doesn't affect TIMESTAMP data.
1264+
Builder* set_time_adjusted_to_utc(bool adjusted) {
1265+
write_time_adjusted_to_utc_ = adjusted;
1266+
return this;
1267+
}
1268+
12591269
/// Create the final properties.
12601270
std::shared_ptr<ArrowWriterProperties> build() {
12611271
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
12621272
write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
12631273
truncated_timestamps_allowed_, store_schema_, compliant_nested_types_,
1264-
engine_version_, use_threads_, executor_));
1274+
engine_version_, use_threads_, executor_, write_time_adjusted_to_utc_));
12651275
}
12661276

12671277
private:
@@ -1277,6 +1287,8 @@ class PARQUET_EXPORT ArrowWriterProperties {
12771287

12781288
bool use_threads_;
12791289
::arrow::internal::Executor* executor_;
1290+
1291+
bool write_time_adjusted_to_utc_;
12801292
};
12811293

12821294
bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
@@ -1310,14 +1322,20 @@ class PARQUET_EXPORT ArrowWriterProperties {
13101322
/// \brief Returns the executor used to write columns in parallel.
13111323
::arrow::internal::Executor* executor() const;
13121324

1325+
/// \brief The value of isAdjustedTOUTC when writing a TIME column
1326+
///
1327+
/// Note this setting doesn't affect TIMESTAMP data.
1328+
bool write_time_adjusted_to_utc() const { return write_time_adjusted_to_utc_; }
1329+
13131330
private:
13141331
explicit ArrowWriterProperties(bool write_nanos_as_int96,
13151332
bool coerce_timestamps_enabled,
13161333
::arrow::TimeUnit::type coerce_timestamps_unit,
13171334
bool truncated_timestamps_allowed, bool store_schema,
13181335
bool compliant_nested_types,
13191336
EngineVersion engine_version, bool use_threads,
1320-
::arrow::internal::Executor* executor)
1337+
::arrow::internal::Executor* executor,
1338+
bool write_time_adjusted_to_utc)
13211339
: write_timestamps_as_int96_(write_nanos_as_int96),
13221340
coerce_timestamps_enabled_(coerce_timestamps_enabled),
13231341
coerce_timestamps_unit_(coerce_timestamps_unit),
@@ -1326,7 +1344,8 @@ class PARQUET_EXPORT ArrowWriterProperties {
13261344
compliant_nested_types_(compliant_nested_types),
13271345
engine_version_(engine_version),
13281346
use_threads_(use_threads),
1329-
executor_(executor) {}
1347+
executor_(executor),
1348+
write_time_adjusted_to_utc_(write_time_adjusted_to_utc) {}
13301349

13311350
const bool write_timestamps_as_int96_;
13321351
const bool coerce_timestamps_enabled_;
@@ -1337,6 +1356,7 @@ class PARQUET_EXPORT ArrowWriterProperties {
13371356
const EngineVersion engine_version_;
13381357
const bool use_threads_;
13391358
::arrow::internal::Executor* executor_;
1359+
const bool write_time_adjusted_to_utc_;
13401360
};
13411361

13421362
/// \brief State object used for writing Arrow data directly to a Parquet

0 commit comments

Comments
 (0)