Skip to content

Commit f04a8b7

Browse files
FIX (backup): Add double buffering for local storange
1 parent 552167e commit f04a8b7

File tree

2 files changed

+83
-37
lines changed

2 files changed

+83
-37
lines changed

backend/internal/features/backups/backups/usecases/postgresql/create_backup_uc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
const (
3131
backupTimeout = 23 * time.Hour
3232
shutdownCheckInterval = 1 * time.Second
33-
copyBufferSize = 16 * 1024 * 1024
33+
copyBufferSize = 8 * 1024 * 1024
3434
progressReportIntervalMB = 1.0
3535
pgConnectTimeout = 30
3636
compressionLevel = 5

backend/internal/features/storages/models/local/model.go

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package local_storage
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"io"
87
"log/slog"
@@ -16,10 +15,10 @@ import (
1615
)
1716

1817
const (
19-
// Chunk size for local storage writes - 16MB provides good balance between
20-
// memory usage and write efficiency. This creates backpressure to pg_dump
21-
// by only reading one chunk at a time and waiting for disk to confirm receipt.
22-
localChunkSize = 16 * 1024 * 1024
18+
// Chunk size for local storage writes - 8MB per buffer with double-buffering
19+
// allows overlapped I/O while keeping total memory under 32MB.
20+
// Two 8MB buffers = 16MB for local storage, plus 8MB for pg_dump buffer = ~25MB total.
21+
localChunkSize = 8 * 1024 * 1024
2322
)
2423

2524
// LocalStorage uses ./postgresus_local_backups folder as a
@@ -197,65 +196,112 @@ type writeResult struct {
197196
writeErr error
198197
}
199198

199+
type writeJob struct {
200+
data []byte
201+
n int
202+
}
203+
200204
func copyWithContext(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
201-
buf := make([]byte, localChunkSize)
205+
bufA := make([]byte, localChunkSize)
206+
bufB := make([]byte, localChunkSize)
207+
202208
var written int64
203209

210+
writeCh := make(chan writeJob, 1)
211+
resultCh := make(chan writeResult, 1)
212+
doneCh := make(chan struct{})
213+
214+
go func() {
215+
defer close(doneCh)
216+
for job := range writeCh {
217+
nw, err := dst.Write(job.data[:job.n])
218+
resultCh <- writeResult{nw, err}
219+
}
220+
}()
221+
222+
useBufferA := true
223+
pendingWrite := false
224+
204225
for {
205226
select {
206227
case <-ctx.Done():
228+
close(writeCh)
229+
<-doneCh
207230
return written, ctx.Err()
208231
default:
209232
}
210233

211-
nr, readErr := io.ReadFull(src, buf)
234+
var currentBuf []byte
235+
if useBufferA {
236+
currentBuf = bufA
237+
} else {
238+
currentBuf = bufB
239+
}
240+
241+
nr, readErr := src.Read(currentBuf)
212242

213243
if nr == 0 && readErr == io.EOF {
214244
break
215245
}
216246

217-
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
247+
if readErr != nil && readErr != io.EOF {
248+
close(writeCh)
249+
<-doneCh
218250
return written, readErr
219251
}
220252

221-
writeResultCh := make(chan writeResult, 1)
222-
go func() {
223-
nw, writeErr := dst.Write(buf[0:nr])
224-
writeResultCh <- writeResult{nw, writeErr}
225-
}()
226-
227-
var nw int
228-
var writeErr error
229-
230-
select {
231-
case <-ctx.Done():
232-
return written, ctx.Err()
233-
case result := <-writeResultCh:
234-
nw = result.bytesWritten
235-
writeErr = result.writeErr
253+
if pendingWrite {
254+
select {
255+
case <-ctx.Done():
256+
close(writeCh)
257+
<-doneCh
258+
return written, ctx.Err()
259+
case result := <-resultCh:
260+
if result.writeErr != nil {
261+
close(writeCh)
262+
<-doneCh
263+
return written, result.writeErr
264+
}
265+
written += int64(result.bytesWritten)
266+
}
236267
}
237268

238-
if nw < 0 || nr < nw {
239-
nw = 0
240-
if writeErr == nil {
241-
writeErr = errors.New("invalid write result")
269+
if nr > 0 {
270+
select {
271+
case <-ctx.Done():
272+
close(writeCh)
273+
<-doneCh
274+
return written, ctx.Err()
275+
case writeCh <- writeJob{currentBuf, nr}:
276+
pendingWrite = true
242277
}
243-
}
244278

245-
if writeErr != nil {
246-
return written, writeErr
279+
useBufferA = !useBufferA
247280
}
248281

249-
if nr != nw {
250-
return written, io.ErrShortWrite
282+
if readErr == io.EOF {
283+
break
251284
}
285+
}
252286

253-
written += int64(nw)
254-
255-
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
256-
break
287+
if pendingWrite {
288+
select {
289+
case <-ctx.Done():
290+
close(writeCh)
291+
<-doneCh
292+
return written, ctx.Err()
293+
case result := <-resultCh:
294+
if result.writeErr != nil {
295+
close(writeCh)
296+
<-doneCh
297+
return written, result.writeErr
298+
}
299+
written += int64(result.bytesWritten)
257300
}
258301
}
259302

303+
close(writeCh)
304+
<-doneCh
305+
260306
return written, nil
261307
}

0 commit comments

Comments
 (0)