Skip to content

Commit d58434c

Browse files
committed
add WriteBulkFrom with bufio.Writer for
buff use io buffer write to w(io.Writer) for io.Copy r(io.Reader) to w(io.Writer) Signed-off-by: weedge <weege007@gmail.com>
1 parent 77c37f9 commit d58434c

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

redcon.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ type Conn interface {
113113
PeekPipeline() []Command
114114
// NetConn returns the base net.Conn connection
115115
NetConn() net.Conn
116+
// WriteBulkFrom write bulk from io.Reader, size n
117+
WriteBulkFrom(n int64, rb io.Reader)
116118
}
117119

118120
// NewServer returns a new Redcon server configured on "tcp" network net.
@@ -494,6 +496,9 @@ func (c *conn) PeekPipeline() []Command {
494496
func (c *conn) NetConn() net.Conn {
495497
return c.conn
496498
}
499+
func (c *conn) WriteBulkFrom(n int64, rb io.Reader) {
500+
c.wr.WriteBulkFrom(n, rb)
501+
}
497502

498503
// BaseWriter returns the underlying connection writer, if any
499504
func BaseWriter(c Conn) *Writer {
@@ -589,13 +594,27 @@ type Writer struct {
589594
w io.Writer
590595
b []byte
591596
err error
597+
598+
// buff use io buffer write to w(io.Writer)
599+
// for io.Copy r(io.Reader) to w(io.Writer)
600+
buff *bufio.Writer
592601
}
593602

594603
// NewWriter creates a new RESP writer.
595604
func NewWriter(wr io.Writer) *Writer {
596605
return &Writer{
597-
w: wr,
606+
w: wr,
607+
buff: bufio.NewWriter(wr),
608+
}
609+
}
610+
611+
func (w *Writer) WriteBulkFrom(n int64, r io.Reader) {
612+
if w != nil && w.err != nil {
613+
return
598614
}
615+
w.buff.Write(appendPrefix(w.b, '$', n))
616+
io.Copy(w.buff, r)
617+
w.buff.Write([]byte{'\r', '\n'})
599618
}
600619

601620
// WriteNull writes a null to the client
@@ -656,6 +675,10 @@ func (w *Writer) SetBuffer(raw []byte) {
656675

657676
// Flush writes all unflushed Write* calls to the underlying writer.
658677
func (w *Writer) Flush() error {
678+
if w.buff != nil {
679+
w.buff.Flush()
680+
}
681+
659682
if w.err != nil {
660683
return w.err
661684
}

redcon_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,35 @@ func testServerNetwork(t *testing.T, network, laddr string) {
365365
<-done
366366
}
367367

368+
func TestConnImpl(t *testing.T) {
369+
var i interface{} = &conn{}
370+
if _, ok := i.(Conn); !ok {
371+
t.Fatalf("conn does not implement Conn interface")
372+
}
373+
}
374+
375+
func TestWriteBulkFrom(t *testing.T) {
376+
wbuf := &bytes.Buffer{}
377+
wr := NewWriter(wbuf)
378+
rbuf := &bytes.Buffer{}
379+
testStr := "hello world"
380+
rbuf.WriteString(testStr)
381+
wr.WriteBulkFrom(int64(len(testStr)), rbuf)
382+
wr.Flush()
383+
if wbuf.String() != fmt.Sprintf("$%d\r\n%s\r\n", len(testStr), testStr) {
384+
t.Fatal("failed")
385+
}
386+
wbuf.Reset()
387+
testStr1 := "hi world"
388+
rbuf.WriteString(testStr1)
389+
wr.WriteBulkFrom(int64(len(testStr1)), rbuf)
390+
wr.Flush()
391+
if wbuf.String() != fmt.Sprintf("$%d\r\n%s\r\n", len(testStr1), testStr1) {
392+
t.Fatal("failed")
393+
}
394+
wbuf.Reset()
395+
}
396+
368397
func TestWriter(t *testing.T) {
369398
buf := &bytes.Buffer{}
370399
wr := NewWriter(buf)

0 commit comments

Comments
 (0)