diff --git a/transport/nats/nats.go b/transport/nats/nats.go index 9cb861b3c7..829166746a 100644 --- a/transport/nats/nats.go +++ b/transport/nats/nats.go @@ -10,7 +10,6 @@ import ( "time" "github.com/nats-io/nats.go" - "go-micro.dev/v5/codec/json" "go-micro.dev/v5/server" "go-micro.dev/v5/transport" ) @@ -61,8 +60,6 @@ var ( DefaultTimeout = time.Minute ) - - func configure(n *ntport, opts ...transport.Option) { for _, o := range opts { o(&n.opts) @@ -121,21 +118,28 @@ func (n *ntportClient) Remote() string { } func (n *ntportClient) Send(m *transport.Message) error { - b, err := n.opts.Codec.Marshal(m) - if err != nil { - return err + header := nats.Header{} + for k, v := range m.Header { + header.Add(k, v) + } + + msg := &nats.Msg{ + Subject: n.addr, + Reply: n.id, + Header: header, + Data: m.Body, } // no deadline if n.opts.Timeout == time.Duration(0) { - return n.conn.PublishRequest(n.addr, n.id, b) + return n.conn.PublishMsg(msg) } // use the deadline ch := make(chan error, 1) go func() { - ch <- n.conn.PublishRequest(n.addr, n.id, b) + ch <- n.conn.PublishMsg(msg) }() select { @@ -157,12 +161,13 @@ func (n *ntportClient) Recv(m *transport.Message) error { return err } - var mr transport.Message - if err := n.opts.Codec.Unmarshal(rsp.Data, &mr); err != nil { - return err + m.Header = make(map[string]string) + for k, v := range rsp.Header { + m.Header[k] = v[0] } - *m = mr + m.Body = rsp.Data + return nil } @@ -213,28 +218,37 @@ func (n *ntportSocket) Recv(m *transport.Message) error { } n.Unlock() - if err := n.opts.Codec.Unmarshal(r.Data, m); err != nil { - return err + m.Header = make(map[string]string) + for k, v := range r.Header { + m.Header[k] = v[0] } + m.Body = r.Data + return nil } func (n *ntportSocket) Send(m *transport.Message) error { - b, err := n.opts.Codec.Marshal(m) - if err != nil { - return err + header := nats.Header{} + for k, v := range m.Header { + header.Add(k, v) + } + + msg := &nats.Msg{ + Subject: n.m.Reply, + Header: header, + Data: m.Body, } // no deadline if n.opts.Timeout == time.Duration(0) { - return n.conn.Publish(n.m.Reply, b) + return n.conn.PublishMsg(msg) } // use the deadline ch := make(chan error, 1) go func() { - ch <- n.conn.Publish(n.m.Reply, b) + ch <- n.conn.PublishMsg(msg) }() select { @@ -435,8 +449,6 @@ func (n *ntport) String() string { func NewTransport(opts ...transport.Option) transport.Transport { options := transport.Options{ - // Default codec - Codec: json.Marshaler{}, Timeout: DefaultTimeout, Context: context.Background(), }