Skip to content

Commit 876595a

Browse files
committed
refactor: simplify code
1 parent ee43565 commit 876595a

File tree

1 file changed

+6
-46
lines changed

1 file changed

+6
-46
lines changed

transport/nats/nats.go

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ type ntportClient struct {
3232

3333
type ntportSocket struct {
3434
conn *nats.Conn
35-
m *nats.Msg
3635
r chan *nats.Msg
3736

3837
close chan bool
@@ -123,31 +122,12 @@ func (n *ntportClient) Send(m *transport.Message) error {
123122
header.Add(k, v)
124123
}
125124

126-
msg := &nats.Msg{
125+
return n.conn.PublishMsg(&nats.Msg{
127126
Subject: n.addr,
128127
Reply: n.id,
129128
Header: header,
130129
Data: m.Body,
131-
}
132-
133-
// no deadline
134-
if n.opts.Timeout == time.Duration(0) {
135-
return n.conn.PublishMsg(msg)
136-
}
137-
138-
// use the deadline
139-
ch := make(chan error, 1)
140-
141-
go func() {
142-
ch <- n.conn.PublishMsg(msg)
143-
}()
144-
145-
select {
146-
case err := <-ch:
147-
return err
148-
case <-time.After(n.opts.Timeout):
149-
return errors.New("deadline exceeded")
150-
}
130+
})
151131
}
152132

153133
func (n *ntportClient) Recv(m *transport.Message) error {
@@ -233,30 +213,11 @@ func (n *ntportSocket) Send(m *transport.Message) error {
233213
header.Add(k, v)
234214
}
235215

236-
msg := &nats.Msg{
237-
Subject: n.m.Reply,
216+
return n.conn.PublishMsg(&nats.Msg{
217+
Subject: n.remote,
238218
Header: header,
239219
Data: m.Body,
240-
}
241-
242-
// no deadline
243-
if n.opts.Timeout == time.Duration(0) {
244-
return n.conn.PublishMsg(msg)
245-
}
246-
247-
// use the deadline
248-
ch := make(chan error, 1)
249-
250-
go func() {
251-
ch <- n.conn.PublishMsg(msg)
252-
}()
253-
254-
select {
255-
case err := <-ch:
256-
return err
257-
case <-time.After(n.opts.Timeout):
258-
return errors.New("deadline exceeded")
259-
}
220+
})
260221
}
261222

262223
func (n *ntportSocket) Close() error {
@@ -305,7 +266,6 @@ func (n *ntportListener) Accept(fn func(transport.Socket)) error {
305266
if !ok {
306267
sock = &ntportSocket{
307268
conn: n.conn,
308-
m: m,
309269
r: make(chan *nats.Msg, 1),
310270
close: make(chan bool),
311271
opts: n.opts,
@@ -329,7 +289,7 @@ func (n *ntportListener) Accept(fn func(transport.Socket)) error {
329289
go func() {
330290
<-sock.close
331291
n.Lock()
332-
delete(n.so, sock.m.Reply)
292+
delete(n.so, m.Reply)
333293
n.Unlock()
334294
}()
335295
}

0 commit comments

Comments
 (0)