Skip to content

Commit a677ff0

Browse files
authored
Fix use of limit in multithreaded parsing (#964)
* Fix use of limit in multithreaded parsing Fixes #963. The issue here is that although we were adjusting the # of rows to a provided limit when multithreaded parsing, we failed to adjust the actual column arrays to the correct size. This was an issue when we converted from the old `CSV.Column` custom array type to returning "normal" arrays in the 0.7 -> 0.8 transition. With `CSV.Column`, we just passed the final row total and it adjusted the size dynamically, without physically resizing the underlying array. With regular arrays, however, we need to ensure the array gets resized appropriately. This became more apparent in the recent pooling change that was released since it actually became a silenced BoundsError because of the use of `@inbounds` in the new `checkpooled!` routine. I've taken out those `@inbounds` uses for now to be more conservative. The fix is fairly straightforward in that if we adjust our final row down to a user-provided limit, then we loop over the parsing tasks and "accumulate" rows until we hit the limit and then resize or `empty!` columns as appropriate. * Fix
1 parent 1061e24 commit a677ff0

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

src/file.jl

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,27 @@ function File(ctx::Context, @nospecialize(chunking::Bool=false))
243243
finalrows = sum(rows)
244244
if ctx.limit < finalrows
245245
finalrows = ctx.limit
246+
# adjust columns according to limit
247+
acc = 0
248+
for i = 1:ntasks
249+
if acc + rows[i] > finalrows
250+
# need to resize this tasks columns down
251+
if finalrows - acc > 0
252+
for col in pertaskcolumns[i]
253+
if isdefined(col, :column)
254+
resize!(col.column, finalrows - acc)
255+
end
256+
end
257+
else
258+
for col in pertaskcolumns[i]
259+
if isdefined(col, :column)
260+
empty!(col.column)
261+
end
262+
end
263+
end
264+
end
265+
acc += rows[i]
266+
end
246267
end
247268
# ok, all the parsing tasks have finished and we've promoted their types w/ the top-level columns
248269
# so now we just need to finish processing each column by making ChainedVectors of the individual columns
@@ -465,33 +486,33 @@ function checkpooled!(::Type{T}, pertaskcolumns, col, j, ntasks, nrows, ctx) whe
465486
for x in column
466487
if x isa PosLen
467488
if x.missingvalue
468-
@inbounds refs[k] = get!(pool, missing) do
489+
refs[k] = get!(pool, missing) do
469490
lastref[] += UInt32(1)
470491
end
471492
elseif x.escapedvalue
472493
val = S === PosLenString ? S(ctx.buf, x, ctx.options.e) : Parsers.getstring(ctx.buf, x, ctx.options.e)
473-
@inbounds refs[k] = get!(pool, val) do
494+
refs[k] = get!(pool, val) do
474495
lastref[] += UInt32(1)
475496
end
476497
else
477498
val = PointerString(pointer(ctx.buf, x.pos), x.len)
478499
index = Base.ht_keyindex2!(pool, val)
479500
if index > 0
480-
@inbounds found_key = pool.vals[index]
501+
found_key = pool.vals[index]
481502
ref = found_key::UInt32
482503
else
483504
new = lastref[] += UInt32(1)
484505
if S === PosLenString
485-
@inbounds Base._setindex!(pool, new, S(ctx.buf, x, ctx.options.e), -index)
506+
Base._setindex!(pool, new, S(ctx.buf, x, ctx.options.e), -index)
486507
else
487-
@inbounds Base._setindex!(pool, new, S(val), -index)
508+
Base._setindex!(pool, new, S(val), -index)
488509
end
489510
ref = new
490511
end
491-
@inbounds refs[k] = ref
512+
refs[k] = ref
492513
end
493514
else
494-
@inbounds refs[k] = get!(pool, x) do
515+
refs[k] = get!(pool, x) do
495516
lastref[] += UInt32(1)
496517
end
497518
end

test/basics.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,4 +769,8 @@ f = CSV.File(IOBuffer(data); delim='|', normalizenames=true, stripwhitespace=fal
769769
f = CSV.File(IOBuffer(data); delim='|', stripwhitespace=true)
770770
@test f.Name[2] == "Mary Anne"
771771

772+
# 963
773+
f = CSV.File(IOBuffer(join((rand(("a,$(rand())", "b,$(rand())")) for _ = 1:10^6), "\n")), header=false, limit=10000)
774+
@test length(f) == 10000
775+
772776
end

0 commit comments

Comments
 (0)