Skip to content

Commit ccd570d

Browse files
committed
use nats.Msg in ntportSocket
1 parent 58e4fc9 commit ccd570d

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

transport/nats/nats.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,28 +218,37 @@ func (n *ntportSocket) Recv(m *transport.Message) error {
218218
}
219219
n.Unlock()
220220

221-
if err := n.opts.Codec.Unmarshal(r.Data, m); err != nil {
222-
return err
221+
m.Header = make(map[string]string)
222+
for k, v := range r.Header {
223+
m.Header[k] = v[0]
223224
}
225+
m.Body = r.Data
226+
224227
return nil
225228
}
226229

227230
func (n *ntportSocket) Send(m *transport.Message) error {
228-
b, err := n.opts.Codec.Marshal(m)
229-
if err != nil {
230-
return err
231+
header := nats.Header{}
232+
for k, v := range m.Header {
233+
header.Add(k, v)
234+
}
235+
236+
msg := &nats.Msg{
237+
Reply: n.m.Reply,
238+
Header: header,
239+
Data: m.Body,
231240
}
232241

233242
// no deadline
234243
if n.opts.Timeout == time.Duration(0) {
235-
return n.conn.Publish(n.m.Reply, b)
244+
return n.conn.PublishMsg(msg)
236245
}
237246

238247
// use the deadline
239248
ch := make(chan error, 1)
240249

241250
go func() {
242-
ch <- n.conn.Publish(n.m.Reply, b)
251+
ch <- n.conn.PublishMsg(msg)
243252
}()
244253

245254
select {

0 commit comments

Comments
 (0)