Skip to content

Commit eebf3de

Browse files
committed
stream: change Readand Write logic
1 parent 0974616 commit eebf3de

File tree

1 file changed

+24
-32
lines changed

1 file changed

+24
-32
lines changed

stream.go

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"strconv"
1313
"strings"
1414
"sync"
15-
16-
"golang.org/x/xerrors"
1715
)
1816

1917
const (
@@ -62,9 +60,9 @@ type Stream interface {
6260
}
6361

6462
type stream struct {
65-
in *bufio.Reader
66-
out io.Writer
67-
sync.Mutex
63+
in *bufio.Reader
64+
out io.Writer
65+
outMu sync.Mutex
6866
}
6967

7068
func NewStream(in io.Reader, out io.Writer) Stream {
@@ -82,61 +80,55 @@ func (s *stream) Read(ctx context.Context) ([]byte, error) {
8280
}
8381

8482
var length int64
83+
// read the header, stop on the first empty line
8584
for {
8685
line, err := s.in.ReadString('\n')
8786
if err != nil {
88-
return nil, xerrors.Errorf("failed reading header line: %w", err)
87+
return nil, fmt.Errorf("failed reading header line %q", err)
8988
}
90-
9189
line = strings.TrimSpace(line)
92-
if line == "" { // check we have a header line
90+
// check we have a header line
91+
if line == "" {
9392
break
9493
}
95-
9694
colon := strings.IndexRune(line, ':')
9795
if colon < 0 {
98-
return nil, xerrors.Errorf("invalid header line: %q", line)
96+
return nil, fmt.Errorf("invalid header line %q", line)
9997
}
100-
10198
name, value := line[:colon], strings.TrimSpace(line[colon+1:])
102-
if name != "Content-Length" {
103-
continue
104-
}
105-
106-
if length, err = strconv.ParseInt(value, 10, 32); err != nil {
107-
return nil, xerrors.Errorf("failed parsing Content-Length: %v", value)
108-
}
109-
110-
if length <= 0 {
111-
return nil, xerrors.Errorf("invalid Content-Length: %v", length)
99+
switch name {
100+
case "Content-Length":
101+
if length, err = strconv.ParseInt(value, 10, 32); err != nil {
102+
return nil, fmt.Errorf("failed parsing Content-Length: %v", value)
103+
}
104+
if length <= 0 {
105+
return nil, fmt.Errorf("invalid Content-Length: %v", length)
106+
}
107+
default:
108+
// ignoring unknown headers
112109
}
113110
}
114-
115111
if length == 0 {
116-
return nil, xerrors.New("missing Content-Length header")
112+
return nil, fmt.Errorf("missing Content-Length header")
117113
}
118-
119114
data := make([]byte, length)
120115
if _, err := io.ReadFull(s.in, data); err != nil {
121-
return nil, xerrors.Errorf("failed reading data: %w", err)
116+
return nil, err
122117
}
123-
124118
return data, nil
125119
}
126120

127-
func (s *stream) Write(ctx context.Context, data []byte) (err error) {
121+
func (s *stream) Write(ctx context.Context, data []byte) error {
128122
select {
129123
case <-ctx.Done():
130124
return ctx.Err()
131125
default:
132126
}
133-
134-
s.Lock()
135-
_, err = fmt.Fprintf(s.out, HeaderContentLengthFmt+HeaderContentTypeFmt+HeaderContentSeparator, len(data), ContentTypeJSONRPC)
127+
s.outMu.Lock()
128+
_, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data))
136129
if err == nil {
137130
_, err = s.out.Write(data)
138131
}
139-
s.Unlock()
140-
132+
s.outMu.Unlock()
141133
return err
142134
}

0 commit comments

Comments
 (0)