Skip to content

Commit 9502c68

Browse files
committed
optimize NATS message sending and receiving logic by using the nats.Msg struct instead of the original
1 parent 0e45edf commit 9502c68

File tree

1 file changed

+17
-11
lines changed

1 file changed

+17
-11
lines changed

transport/nats/nats.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ var (
6161
DefaultTimeout = time.Minute
6262
)
6363

64-
65-
6664
func configure(n *ntport, opts ...transport.Option) {
6765
for _, o := range opts {
6866
o(&n.opts)
@@ -121,21 +119,28 @@ func (n *ntportClient) Remote() string {
121119
}
122120

123121
func (n *ntportClient) Send(m *transport.Message) error {
124-
b, err := n.opts.Codec.Marshal(m)
125-
if err != nil {
126-
return err
122+
header := nats.Header{}
123+
for k, v := range m.Header {
124+
header.Add(k, v)
125+
}
126+
127+
msg := &nats.Msg{
128+
Subject: n.addr,
129+
Reply: n.id,
130+
Header: header,
131+
Data: m.Body,
127132
}
128133

129134
// no deadline
130135
if n.opts.Timeout == time.Duration(0) {
131-
return n.conn.PublishRequest(n.addr, n.id, b)
136+
return n.conn.PublishMsg(msg)
132137
}
133138

134139
// use the deadline
135140
ch := make(chan error, 1)
136141

137142
go func() {
138-
ch <- n.conn.PublishRequest(n.addr, n.id, b)
143+
ch <- n.conn.PublishMsg(msg)
139144
}()
140145

141146
select {
@@ -157,12 +162,13 @@ func (n *ntportClient) Recv(m *transport.Message) error {
157162
return err
158163
}
159164

160-
var mr transport.Message
161-
if err := n.opts.Codec.Unmarshal(rsp.Data, &mr); err != nil {
162-
return err
165+
m.Header = make(map[string]string)
166+
for k, v := range rsp.Header {
167+
m.Header[k] = v[0]
163168
}
164169

165-
*m = mr
170+
m.Body = rsp.Data
171+
166172
return nil
167173
}
168174

0 commit comments

Comments
 (0)