Skip to content

Commit 41c72cf

Browse files
FIX (buffering): Simplify buffering logic for localstorage
1 parent f04a8b7 commit 41c72cf

File tree

1 file changed

+12
-95
lines changed
  • backend/internal/features/storages/models/local

1 file changed

+12
-95
lines changed

backend/internal/features/storages/models/local/model.go

Lines changed: 12 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -191,117 +191,34 @@ func (l *LocalStorage) EncryptSensitiveData(encryptor encryption.FieldEncryptor)
191191
func (l *LocalStorage) Update(incoming *LocalStorage) {
192192
}
193193

194-
type writeResult struct {
195-
bytesWritten int
196-
writeErr error
197-
}
198-
199-
type writeJob struct {
200-
data []byte
201-
n int
202-
}
203-
204194
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
205-
bufA := make([]byte, localChunkSize)
206-
bufB := make([]byte, localChunkSize)
207-
195+
buf := make([]byte, localChunkSize)
208196
var written int64
209197

210-
writeCh := make(chan writeJob, 1)
211-
resultCh := make(chan writeResult, 1)
212-
doneCh := make(chan struct{})
213-
214-
go func() {
215-
defer close(doneCh)
216-
for job := range writeCh {
217-
nw, err := dst.Write(job.data[:job.n])
218-
resultCh <- writeResult{nw, err}
219-
}
220-
}()
221-
222-
useBufferA := true
223-
pendingWrite := false
224-
225198
for {
226199
select {
227200
case <-ctx.Done():
228-
close(writeCh)
229-
<-doneCh
230201
return written, ctx.Err()
231202
default:
232203
}
233204

234-
var currentBuf []byte
235-
if useBufferA {
236-
currentBuf = bufA
237-
} else {
238-
currentBuf = bufB
239-
}
240-
241-
nr, readErr := src.Read(currentBuf)
242-
243-
if nr == 0 && readErr == io.EOF {
244-
break
245-
}
246-
247-
if readErr != nil && readErr != io.EOF {
248-
close(writeCh)
249-
<-doneCh
250-
return written, readErr
251-
}
252-
253-
if pendingWrite {
254-
select {
255-
case <-ctx.Done():
256-
close(writeCh)
257-
<-doneCh
258-
return written, ctx.Err()
259-
case result := <-resultCh:
260-
if result.writeErr != nil {
261-
close(writeCh)
262-
<-doneCh
263-
return written, result.writeErr
264-
}
265-
written += int64(result.bytesWritten)
266-
}
267-
}
268-
205+
nr, readErr := src.Read(buf)
269206
if nr > 0 {
270-
select {
271-
case <-ctx.Done():
272-
close(writeCh)
273-
<-doneCh
274-
return written, ctx.Err()
275-
case writeCh <- writeJob{currentBuf, nr}:
276-
pendingWrite = true
207+
nw, writeErr := dst.Write(buf[:nr])
208+
written += int64(nw)
209+
if writeErr != nil {
210+
return written, writeErr
211+
}
212+
if nr != nw {
213+
return written, io.ErrShortWrite
277214
}
278-
279-
useBufferA = !useBufferA
280215
}
281216

282217
if readErr == io.EOF {
283-
break
218+
return written, nil
284219
}
285-
}
286-
287-
if pendingWrite {
288-
select {
289-
case <-ctx.Done():
290-
close(writeCh)
291-
<-doneCh
292-
return written, ctx.Err()
293-
case result := <-resultCh:
294-
if result.writeErr != nil {
295-
close(writeCh)
296-
<-doneCh
297-
return written, result.writeErr
298-
}
299-
written += int64(result.bytesWritten)
220+
if readErr != nil {
221+
return written, readErr
300222
}
301223
}
302-
303-
close(writeCh)
304-
<-doneCh
305-
306-
return written, nil
307224
}

0 commit comments

Comments
 (0)