1919#include < cstdint>
2020#include < memory>
2121#include < mutex>
22+ #include < optional>
2223#include < sstream>
2324#include < string>
2425#include < utility>
@@ -82,7 +83,7 @@ class ConcreteColumnBuilder : public ColumnBuilder {
8283 ReserveChunksUnlocked (block_index);
8384 }
8485
85- void ReserveChunksUnlocked (int64_t block_index) {
86+ virtual void ReserveChunksUnlocked (int64_t block_index) {
8687 // Create a null Array pointer at the back at the list.
8788 size_t chunk_index = static_cast <size_t >(block_index);
8889 if (chunks_.size () <= chunk_index) {
@@ -232,6 +233,7 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {
232233 Status TryConvertChunk (int64_t chunk_index);
233234 // This must be called unlocked!
234235 void ScheduleConvertChunk (int64_t chunk_index);
236+ void ReserveChunksUnlocked (int64_t block_index) override ;
235237
236238 // CAUTION: ConvertOptions can grow large (if it customizes hundreds or
237239 // thousands of columns), so avoid copying it in each InferringColumnBuilder.
@@ -243,6 +245,9 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {
243245
244246 // The parsers corresponding to each chunk (for reconverting)
245247 std::vector<std::shared_ptr<BlockParser>> parsers_;
248+
249+ // The inference kind for which the current chunks_ were obtained
250+ std::vector<std::optional<InferKind>> chunk_kinds_;
246251};
247252
248253Status InferringColumnBuilder::Init () { return UpdateType (); }
@@ -261,7 +266,12 @@ Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
261266 std::shared_ptr<BlockParser> parser = parsers_[chunk_index];
262267 InferKind kind = infer_status_.kind ();
263268
264- DCHECK_NE (parser, nullptr );
269+ if (chunks_[chunk_index] && chunk_kinds_[chunk_index] == kind) {
270+ // Already tried, nothing to do
271+ return Status::OK ();
272+ }
273+
274+ DCHECK_NE (parser, nullptr ) << " for chunk_index " << chunk_index;
265275
266276 lock.unlock ();
267277 auto maybe_array = converter->Convert (*parser, col_index_);
@@ -280,34 +290,45 @@ Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
280290 // We won't try to reconvert anymore
281291 parsers_[chunk_index].reset ();
282292 }
293+ chunk_kinds_[chunk_index] = kind;
283294 return SetChunkUnlocked (chunk_index, maybe_array);
284295 }
285296
286297 // Conversion failed, try another type
287298 infer_status_.LoosenType (maybe_array.status ());
288299 RETURN_NOT_OK (UpdateType ());
300+ kind = infer_status_.kind ();
289301
290302 // Reconvert past finished chunks
291303 // (unfinished chunks will notice by themselves if they need reconverting)
292304 const auto nchunks = static_cast <int64_t >(chunks_.size ());
305+ std::vector<int64_t > chunks_to_reconvert;
293306 for (int64_t i = 0 ; i < nchunks; ++i) {
294- if (i != chunk_index && chunks_[i]) {
295- // We're assuming the chunk was converted using the wrong type
296- // (which should be true unless the executor reorders tasks)
307+ if (i != chunk_index && chunks_[i] && chunk_kinds_[i] != kind) {
308+ // That chunk was converted using the wrong type
297309 chunks_[i].reset ();
298- lock.unlock ();
299- ScheduleConvertChunk (i);
300- lock.lock ();
310+ chunk_kinds_[i].reset ();
311+ chunks_to_reconvert.push_back (i);
301312 }
302313 }
314+ // Reconvert this chunk too
315+ chunks_to_reconvert.push_back (chunk_index);
303316
304- // Reconvert this chunk
305317 lock.unlock ();
306- ScheduleConvertChunk (chunk_index);
307-
318+ for (auto i : chunks_to_reconvert) {
319+ ScheduleConvertChunk (i);
320+ }
308321 return Status::OK ();
309322}
310323
324+ void InferringColumnBuilder::ReserveChunksUnlocked (int64_t block_index) {
325+ ConcreteColumnBuilder::ReserveChunksUnlocked (block_index);
326+ size_t chunk_index = static_cast <size_t >(block_index);
327+ if (chunk_kinds_.size () <= chunk_index) {
328+ chunk_kinds_.resize (chunk_index + 1 );
329+ }
330+ }
331+
311332void InferringColumnBuilder::Insert (int64_t block_index,
312333 const std::shared_ptr<BlockParser>& parser) {
313334 // Create a slot for the new chunk and spawn a task to convert it
0 commit comments