Skip to content

Commit 0068ff2

Browse files
committed
refactor data buffering
1 parent 25a729b commit 0068ff2

File tree

1 file changed

+33
-31
lines changed

1 file changed

+33
-31
lines changed

src/stream.jl

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -389,19 +389,8 @@ function fillbuffer(stream::TranscodingStream)
389389
end
390390
makemargin!(buffer2, 1)
391391
readdata!(stream.stream, buffer2)
392-
makemargin!(buffer1, clamp(div(length(buffer1), 4), 1, DEFAULT_BUFFER_SIZE * 8))
393-
Δin, Δout, stream.state.code = process(
394-
stream.codec, buffermem(buffer2), marginmem(buffer1), stream.state.error)
395-
buffer2.bufferpos += Δin
396-
buffer1.marginpos += Δout
397-
buffer1.total += Δout
392+
_, Δout = call_process(stream.codec, stream.state, buffer2, buffer1)
398393
nfilled += Δout
399-
if stream.state.code == :error
400-
handle_error(stream)
401-
end
402-
if stream.state.code == :ok && Δin == Δout == 0
403-
makemargin!(buffer1, max(16, marginsize(buffer1) * 2))
404-
end
405394
end
406395
return nfilled
407396
end
@@ -445,21 +434,26 @@ function process_to_write(stream::TranscodingStream)
445434
end
446435
buffer2 = stream.state.buffer2
447436
writebuffer!(stream.stream, buffer2)
448-
makemargin!(buffer2, clamp(div(length(buffer2), 4), 1, DEFAULT_BUFFER_SIZE * 8))
449-
Δin, Δout, stream.state.code = process(
450-
stream.codec, buffermem(buffer1), marginmem(buffer2), stream.state.error)
451-
buffer1.bufferpos += Δin
452-
buffer2.marginpos += Δout
453-
buffer2.total += Δout
454-
if stream.state.code == :error
455-
handle_error(stream)
456-
elseif stream.state.code == :ok && Δin == Δout == 0
457-
makemargin!(buffer2, max(16, marginsize(buffer2) * 2))
458-
end
437+
Δin, _ = call_process(stream.codec, stream.state, buffer1, buffer2)
459438
makemargin!(buffer1, 0)
460439
return Δin
461440
end
462441

442+
function call_process(codec::Codec, state::State, inbuf::Buffer, outbuf::Buffer)
443+
makemargin!(outbuf, clamp(div(length(outbuf), 4), 1, DEFAULT_BUFFER_SIZE * 8))
444+
Δin, Δout, state.code = process(codec, buffermem(inbuf), marginmem(outbuf), state.error)
445+
inbuf.bufferpos += Δin
446+
outbuf.marginpos += Δout
447+
outbuf.total += Δout
448+
if state.code == :error
449+
handle_error(codec, state)
450+
elseif state.code == :ok && Δin == Δout == 0
451+
# When no progress, expand the output buffer.
452+
makemargin!(outbuf, max(16, marginsize(outbuf) * 2))
453+
end
454+
return Δin, Δout
455+
end
456+
463457

464458
# State Transition
465459
# ----------------
@@ -541,27 +535,35 @@ end
541535

542536
# Handle an error happened while transcoding data.
543537
function handle_error(stream::TranscodingStream)
544-
if !haserror(stream.state.error)
538+
handle_error(stream.codec, stream.state)
539+
end
540+
541+
function handle_error(codec::Codec, state::State)
542+
if !haserror(state.error)
545543
# set a generic error
546-
stream.state.error[] = ErrorException("unknown error happened while processing data")
544+
state.error[] = ErrorException("unknown error happened while processing data")
547545
end
548-
finalize_codec(stream, :panic)
549-
throw(stream.state.error[])
546+
finalize_codec(codec, state, :panic)
547+
throw(state.error[])
550548
end
551549

552550
# Call the finalize method of the codec.
553551
function finalize_codec(stream::TranscodingStream, newstate::Symbol)
552+
finalize_codec(stream.codec, stream.state, newstate)
553+
end
554+
555+
function finalize_codec(codec::Codec, state::State, newstate::Symbol)
554556
@assert newstate (:close, :panic)
555557
try
556-
finalize(stream.codec)
558+
finalize(codec)
557559
catch
558-
if stream.state.state == :error && haserror(stream.state.error)
560+
if state.state == :error && haserror(state.error)
559561
# throw an exception that happended before
560-
throw(stream.state.error[])
562+
throw(state.error[])
561563
else
562564
rethrow()
563565
end
564566
finally
565-
stream.state.state = newstate
567+
state.state = newstate
566568
end
567569
end

0 commit comments

Comments
 (0)