Skip to content

Commit fb527fc

Browse files
committed
1 parent 08b3cc9 commit fb527fc

File tree

9 files changed

+140
-42
lines changed

9 files changed

+140
-42
lines changed

cpp/src/arrow/csv/column_builder.cc

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <cstdint>
2020
#include <memory>
2121
#include <mutex>
22+
#include <optional>
2223
#include <sstream>
2324
#include <string>
2425
#include <utility>
@@ -82,7 +83,7 @@ class ConcreteColumnBuilder : public ColumnBuilder {
8283
ReserveChunksUnlocked(block_index);
8384
}
8485

85-
void ReserveChunksUnlocked(int64_t block_index) {
86+
virtual void ReserveChunksUnlocked(int64_t block_index) {
8687
// Create a null Array pointer at the back at the list.
8788
size_t chunk_index = static_cast<size_t>(block_index);
8889
if (chunks_.size() <= chunk_index) {
@@ -232,6 +233,7 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {
232233
Status TryConvertChunk(int64_t chunk_index);
233234
// This must be called unlocked!
234235
void ScheduleConvertChunk(int64_t chunk_index);
236+
void ReserveChunksUnlocked(int64_t block_index) override;
235237

236238
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
237239
// thousands of columns), so avoid copying it in each InferringColumnBuilder.
@@ -243,6 +245,9 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {
243245

244246
// The parsers corresponding to each chunk (for reconverting)
245247
std::vector<std::shared_ptr<BlockParser>> parsers_;
248+
249+
// The inferrence kind for which the current chunks_ were obtained
250+
std::vector<std::optional<InferKind>> chunk_kinds_;
246251
};
247252

248253
Status InferringColumnBuilder::Init() { return UpdateType(); }
@@ -261,14 +266,20 @@ Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
261266
std::shared_ptr<BlockParser> parser = parsers_[chunk_index];
262267
InferKind kind = infer_status_.kind();
263268

264-
DCHECK_NE(parser, nullptr);
269+
if (chunks_[chunk_index] && chunk_kinds_[chunk_index] == kind) {
270+
// Already tried, nothing to do
271+
return Status::OK();
272+
}
273+
274+
DCHECK_NE(parser, nullptr) << " for chunk_index " << chunk_index;
265275

266276
lock.unlock();
267277
auto maybe_array = converter->Convert(*parser, col_index_);
268278
lock.lock();
269279

270280
if (kind != infer_status_.kind()) {
271281
// infer_kind_ was changed by another task, reconvert
282+
kind = infer_status_.kind();
272283
lock.unlock();
273284
ScheduleConvertChunk(chunk_index);
274285
return Status::OK();
@@ -280,34 +291,45 @@ Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
280291
// We won't try to reconvert anymore
281292
parsers_[chunk_index].reset();
282293
}
294+
chunk_kinds_[chunk_index] = kind;
283295
return SetChunkUnlocked(chunk_index, maybe_array);
284296
}
285297

286298
// Conversion failed, try another type
287299
infer_status_.LoosenType(maybe_array.status());
288300
RETURN_NOT_OK(UpdateType());
301+
kind = infer_status_.kind();
289302

290303
// Reconvert past finished chunks
291304
// (unfinished chunks will notice by themselves if they need reconverting)
292305
const auto nchunks = static_cast<int64_t>(chunks_.size());
306+
std::vector<int64_t> chunks_to_reconvert;
293307
for (int64_t i = 0; i < nchunks; ++i) {
294-
if (i != chunk_index && chunks_[i]) {
295-
// We're assuming the chunk was converted using the wrong type
296-
// (which should be true unless the executor reorders tasks)
308+
if (i != chunk_index && chunks_[i] && chunk_kinds_[i] != kind) {
309+
// That chunk was converted using the wrong type
297310
chunks_[i].reset();
298-
lock.unlock();
299-
ScheduleConvertChunk(i);
300-
lock.lock();
311+
chunk_kinds_[i].reset();
312+
chunks_to_reconvert.push_back(i);
301313
}
302314
}
315+
// Reconvert this chunk too
316+
chunks_to_reconvert.push_back(chunk_index);
303317

304-
// Reconvert this chunk
305318
lock.unlock();
306-
ScheduleConvertChunk(chunk_index);
307-
319+
for (auto i : chunks_to_reconvert) {
320+
ScheduleConvertChunk(i);
321+
}
308322
return Status::OK();
309323
}
310324

325+
void InferringColumnBuilder::ReserveChunksUnlocked(int64_t block_index) {
326+
ConcreteColumnBuilder::ReserveChunksUnlocked(block_index);
327+
size_t chunk_index = static_cast<size_t>(block_index);
328+
if (chunk_kinds_.size() <= chunk_index) {
329+
chunk_kinds_.resize(chunk_index + 1);
330+
}
331+
}
332+
311333
void InferringColumnBuilder::Insert(int64_t block_index,
312334
const std::shared_ptr<BlockParser>& parser) {
313335
// Create a slot for the new chunk and spawn a task to convert it

cpp/src/arrow/csv/fuzz.cc

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <cstdint>
1919
#include <memory>
20+
#include <optional>
2021

2122
#include "arrow/buffer.h"
2223
#include "arrow/csv/reader.h"
@@ -25,10 +26,18 @@
2526
#include "arrow/status.h"
2627
#include "arrow/table.h"
2728
#include "arrow/util/macros.h"
29+
#include "arrow/util/thread_pool.h"
2830

2931
namespace arrow::csv {
3032

3133
Status FuzzCsvReader(const uint8_t* data, int64_t size) {
34+
// Since the Fuzz-allocated data is not owned, any task that outlives the TableReader
35+
// may try to read memory that has been deallocated. Hence we wait for all pending
36+
// tasks to end before leaving.
37+
struct TaskGuard {
38+
~TaskGuard() { ::arrow::internal::GetCpuThreadPool()->WaitForIdle(); }
39+
};
40+
3241
auto io_context = arrow::io::default_io_context();
3342

3443
auto read_options = ReadOptions::Defaults();
@@ -42,11 +51,14 @@ Status FuzzCsvReader(const uint8_t* data, int64_t size) {
4251
std::make_shared<::arrow::io::BufferReader>(std::make_shared<Buffer>(data, size));
4352

4453
// TODO test other reader types
45-
ARROW_ASSIGN_OR_RAISE(auto table_reader,
46-
TableReader::Make(io_context, input_stream, read_options,
47-
parse_options, convert_options));
48-
ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read());
49-
RETURN_NOT_OK(table->ValidateFull());
54+
{
55+
ARROW_ASSIGN_OR_RAISE(auto table_reader,
56+
TableReader::Make(io_context, input_stream, read_options,
57+
parse_options, convert_options));
58+
TaskGuard task_guard;
59+
ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read());
60+
RETURN_NOT_OK(table->ValidateFull());
61+
}
5062
return Status::OK();
5163
}
5264

cpp/src/arrow/util/formatting_util_test.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
#include <cmath>
19+
#include <limits>
1920
#include <locale>
2021
#include <stdexcept>
2122
#include <string>
@@ -561,6 +562,11 @@ TEST(Formatting, Timestamp) {
561562
"2018-11-13 17:11:10.000000007");
562563
AssertFormatting(formatter, -2203932304LL * 1000000000LL + 8,
563564
"1900-02-28 12:34:56.000000008");
565+
// XXX check with NumPy?
566+
AssertFormatting(formatter, std::numeric_limits<int64_t>::min(),
567+
"1677-09-21 00:12:43.145224192");
568+
AssertFormatting(formatter, std::numeric_limits<int64_t>::max(),
569+
"2262-04-11 23:47:16.854775807");
564570
}
565571

566572
{

cpp/src/arrow/util/task_group.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,12 @@ class ThreadedTaskGroup : public TaskGroup {
117117
}
118118
self->OneTaskDone();
119119
};
120-
UpdateStatus(executor_->Spawn(std::move(callable)));
120+
auto st = executor_->Spawn(std::move(callable));
121+
bool spawn_successful = st.ok();
122+
UpdateStatus(std::move(st));
123+
if (!spawn_successful) {
124+
OneTaskDone();
125+
}
121126
}
122127
}
123128

cpp/src/arrow/util/time.h

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
#pragma once
1919

2020
#include <chrono>
21+
#include <cstdlib>
2122
#include <memory>
23+
#include <optional>
24+
#include <type_traits>
2225
#include <utility>
2326

2427
#include "arrow/type_fwd.h"
28+
#include "arrow/util/int_util_overflow.h"
29+
#include "arrow/util/macros.h"
2530
#include "arrow/util/visibility.h"
2631

2732
namespace arrow {
@@ -66,17 +71,26 @@ VisitDuration(TimeUnit::type unit, Visitor&& visitor, Args&&... args) {
6671
return visitor(std::chrono::seconds{}, std::forward<Args>(args)...);
6772
}
6873

69-
/// Convert a count of seconds to the corresponding count in a different TimeUnit
70-
struct CastSecondsToUnitImpl {
71-
template <typename Duration>
72-
int64_t operator()(Duration, int64_t seconds) {
73-
auto duration = std::chrono::duration_cast<Duration>(std::chrono::seconds{seconds});
74-
return static_cast<int64_t>(duration.count());
75-
}
76-
};
74+
inline std::optional<int64_t> CastSecondsToUnit(TimeUnit::type unit, int64_t seconds) {
75+
auto cast_seconds_to_unit = [](auto duration,
76+
int64_t seconds) -> std::optional<int64_t> {
77+
constexpr auto kMultiplier = static_cast<int64_t>(decltype(duration)::period::den);
78+
int64_t out;
79+
if (ARROW_PREDICT_FALSE(
80+
::arrow::internal::MultiplyWithOverflow(seconds, kMultiplier, &out))) {
81+
return {};
82+
}
83+
return out;
84+
};
85+
return VisitDuration(unit, cast_seconds_to_unit, seconds);
86+
}
7787

78-
inline int64_t CastSecondsToUnit(TimeUnit::type unit, int64_t seconds) {
79-
return VisitDuration(unit, CastSecondsToUnitImpl{}, seconds);
88+
inline bool CastSecondsToUnit(TimeUnit::type unit, int64_t seconds, int64_t* out) {
89+
auto maybe_value = CastSecondsToUnit(unit, seconds);
90+
if (ARROW_PREDICT_TRUE(maybe_value.has_value())) {
91+
*out = *maybe_value;
92+
}
93+
return maybe_value.has_value();
8094
}
8195

8296
} // namespace util

cpp/src/arrow/util/value_parsing.h

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "arrow/util/checked_cast.h"
3434
#include "arrow/util/config.h"
3535
#include "arrow/util/float16.h"
36+
#include "arrow/util/int_util_overflow.h"
3637
#include "arrow/util/macros.h"
3738
#include "arrow/util/time.h"
3839
#include "arrow/util/visibility.h"
@@ -696,8 +697,7 @@ static inline bool ParseTimestampISO8601(const char* s, size_t length,
696697
}
697698

698699
if (length == 10) {
699-
*out = util::CastSecondsToUnit(unit, seconds_since_epoch.count());
700-
return true;
700+
return util::CastSecondsToUnit(unit, seconds_since_epoch.count(), out);
701701
}
702702

703703
if (ARROW_PREDICT_FALSE(s[10] != ' ') && ARROW_PREDICT_FALSE(s[10] != 'T')) {
@@ -768,12 +768,16 @@ static inline bool ParseTimestampISO8601(const char* s, size_t length,
768768
return false;
769769
}
770770

771-
seconds_since_epoch += seconds_since_midnight;
772-
seconds_since_epoch += zone_offset;
771+
// Switch to plain integers to take advantage of the overflow arithmetic ops
772+
auto count = (seconds_since_midnight + zone_offset).count();
773+
774+
if (ARROW_PREDICT_FALSE(::arrow::internal::AddWithOverflow(
775+
count, seconds_since_epoch.count(), &count))) {
776+
return false;
777+
}
773778

774779
if (length <= 19) {
775-
*out = util::CastSecondsToUnit(unit, seconds_since_epoch.count());
776-
return true;
780+
return util::CastSecondsToUnit(unit, count, out);
777781
}
778782

779783
if (ARROW_PREDICT_FALSE(s[19] != '.')) {
@@ -786,7 +790,12 @@ static inline bool ParseTimestampISO8601(const char* s, size_t length,
786790
return false;
787791
}
788792

789-
*out = util::CastSecondsToUnit(unit, seconds_since_epoch.count()) + subseconds;
793+
if (ARROW_PREDICT_FALSE(!util::CastSecondsToUnit(unit, count, out))) {
794+
return false;
795+
}
796+
if (ARROW_PREDICT_FALSE(::arrow::internal::AddWithOverflow(*out, subseconds, out))) {
797+
return false;
798+
}
790799
return true;
791800
}
792801

@@ -828,8 +837,7 @@ static inline bool ParseTimestampStrptime(const char* buf, size_t length,
828837
secs -= std::chrono::seconds(result.tm_gmtoff);
829838
#endif
830839
}
831-
*out = util::CastSecondsToUnit(unit, secs.time_since_epoch().count());
832-
return true;
840+
return util::CastSecondsToUnit(unit, secs.time_since_epoch().count(), out);
833841
}
834842

835843
template <>
@@ -892,13 +900,21 @@ struct StringConverter<TIME_TYPE, enable_if_time<TIME_TYPE>> {
892900
const auto unit = type.unit();
893901
std::chrono::seconds since_midnight;
894902

903+
auto get_seconds_since_midnight = [&](value_type* out) -> bool {
904+
int64_t long_out;
905+
if (ARROW_PREDICT_FALSE(
906+
!util::CastSecondsToUnit(unit, since_midnight.count(), &long_out))) {
907+
return false;
908+
}
909+
*out = static_cast<value_type>(long_out);
910+
return *out == long_out;
911+
};
912+
895913
if (length == 5) {
896914
if (ARROW_PREDICT_FALSE(!detail::ParseHH_MM(s, &since_midnight))) {
897915
return false;
898916
}
899-
*out =
900-
static_cast<value_type>(util::CastSecondsToUnit(unit, since_midnight.count()));
901-
return true;
917+
return get_seconds_since_midnight(out);
902918
}
903919

904920
if (ARROW_PREDICT_FALSE(length < 8)) {
@@ -908,7 +924,9 @@ struct StringConverter<TIME_TYPE, enable_if_time<TIME_TYPE>> {
908924
return false;
909925
}
910926

911-
*out = static_cast<value_type>(util::CastSecondsToUnit(unit, since_midnight.count()));
927+
if (ARROW_PREDICT_FALSE(!get_seconds_since_midnight(out))) {
928+
return false;
929+
}
912930

913931
if (length == 8) {
914932
return true;

cpp/src/arrow/util/value_parsing_test.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
#include <cmath>
19+
#include <limits>
1920
#include <string>
2021
#include <type_traits>
2122
#include <vector>
@@ -814,8 +815,22 @@ TEST(StringConversion, ToTimestampDateTime_ISO8601) {
814815
AssertConversion(type, "1900-02-28 12:34:56.123456789-01:17",
815816
-2203932304000000000LL + 123456789LL + 4620000000000LL);
816817

818+
// The theoretical lower bound is "1677-09-21 00:12:43.145224192",
819+
// but supporting it would require a bit more care in the timestamp parsing
820+
// code.
821+
AssertConversion(type, "1677-09-22", -9223286400000000000);
822+
AssertConversion(type, "1677-09-22 00:00:00.000000000", -9223286400000000000);
823+
AssertConversion(type, "2262-04-11 23:47:16.854775806",
824+
std::numeric_limits<int64_t>::max() - 1);
825+
AssertConversion(type, "2262-04-11 23:47:16.854775807",
826+
std::numeric_limits<int64_t>::max());
827+
817828
// Invalid subseconds
818829
AssertConversionFails(type, "1900-02-28 12:34:56.1234567890");
830+
// Out of bounds
831+
AssertConversionFails(type, "3989-07-14T11:22:33.000777Z");
832+
AssertConversionFails(type, "1677-09-21 00:12:43.145224191");
833+
AssertConversionFails(type, "2262-04-11 23:47:16.854775808");
819834
}
820835
}
821836

cpp/src/arrow/util/windows_fixup.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# undef min
2727
# endif
2828

29-
// The Windows API defines macros from *File resolving to either
29+
// The Windows API defines macros *File resolving to either
3030
// *FileA or *FileW. Need to undo them.
3131
# ifdef CopyFile
3232
# undef CopyFile
@@ -37,6 +37,12 @@
3737
# ifdef DeleteFile
3838
# undef DeleteFile
3939
# endif
40+
# ifdef GetObject
41+
# undef GetObject
42+
# endif
43+
# ifdef GetMessage
44+
# undef GetMessage
45+
# endif
4046

4147
// Other annoying Windows macro definitions...
4248
# ifdef IN

0 commit comments

Comments
 (0)