Skip to content

Commit 23cfe80

Browse files
authored
pref(net): improve concurrent read and write buffer (#416)
* pref(net): improve concurrent read and write buffer * chore
1 parent d89d0a0 commit 23cfe80

File tree

1 file changed

+51
-21
lines changed

1 file changed

+51
-21
lines changed

internal/net/request.go

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,9 @@ type Buf struct {
619619
ctx context.Context
620620
off int
621621
rw sync.Mutex
622+
623+
readSignal chan struct{}
624+
readPending bool
622625
}
623626

624627
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
@@ -628,9 +631,16 @@ func NewBuf(ctx context.Context, maxSize int) *Buf {
628631
ctx: ctx,
629632
buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
630633
size: maxSize,
634+
635+
readSignal: make(chan struct{}, 1),
631636
}
632637
}
633638
func (br *Buf) Reset(size int) {
639+
br.rw.Lock()
640+
defer br.rw.Unlock()
641+
if br.buffer == nil {
642+
return
643+
}
634644
br.buffer.Reset()
635645
br.size = size
636646
br.off = 0
@@ -646,27 +656,34 @@ func (br *Buf) Read(p []byte) (n int, err error) {
646656
if br.off >= br.size {
647657
return 0, io.EOF
648658
}
649-
br.rw.Lock()
650-
n, err = br.buffer.Read(p)
651-
br.rw.Unlock()
652-
if err == nil {
653-
br.off += n
654-
return n, err
655-
}
656-
if err != io.EOF {
657-
return n, err
658-
}
659-
if n != 0 {
660-
br.off += n
661-
return n, nil
662-
}
663-
// n==0, err==io.EOF
664-
// wait for new write for 200ms
665-
select {
666-
case <-br.ctx.Done():
667-
return 0, br.ctx.Err()
668-
case <-time.After(time.Millisecond * 200):
669-
return 0, nil
659+
for {
660+
br.rw.Lock()
661+
if br.buffer != nil {
662+
n, err = br.buffer.Read(p)
663+
} else {
664+
err = io.ErrClosedPipe
665+
}
666+
br.rw.Unlock()
667+
if err != nil && err != io.EOF {
668+
return
669+
}
670+
if n > 0 {
671+
br.off += n
672+
return n, nil
673+
}
674+
br.rw.Lock()
675+
br.readPending = true
676+
br.rw.Unlock()
677+
// n==0, err==io.EOF
678+
select {
679+
case <-br.ctx.Done():
680+
return 0, br.ctx.Err()
681+
case _, ok := <-br.readSignal:
682+
if !ok {
683+
return 0, io.ErrClosedPipe
684+
}
685+
continue
686+
}
670687
}
671688
}
672689

@@ -676,10 +693,23 @@ func (br *Buf) Write(p []byte) (n int, err error) {
676693
}
677694
br.rw.Lock()
678695
defer br.rw.Unlock()
696+
if br.buffer == nil {
697+
return 0, io.ErrClosedPipe
698+
}
679699
n, err = br.buffer.Write(p)
700+
if br.readPending {
701+
br.readPending = false
702+
select {
703+
case br.readSignal <- struct{}{}:
704+
default:
705+
}
706+
}
680707
return
681708
}
682709

683710
func (br *Buf) Close() {
711+
br.rw.Lock()
712+
defer br.rw.Unlock()
684713
br.buffer = nil
714+
close(br.readSignal)
685715
}

0 commit comments

Comments
 (0)