Skip to content

Commit 95d0254

Browse files
committed
add context deadline support
1 parent 54ed41e commit 95d0254

File tree

9 files changed

+363
-72
lines changed

9 files changed

+363
-72
lines changed

client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func (c *Client) EstablishConnection() error {
3535

3636
c.SocketConnection = SocketConnection{
3737
Conn: conn,
38-
err: make(chan error),
39-
m: make(chan *Message),
38+
39+
receive: make(chan message),
4040
}
4141

4242
return nil
@@ -102,7 +102,7 @@ func NewClient(host string, port uint, passwd string, timeout int) (*Client, err
102102

103103
err = client.Authenticate()
104104
if err != nil {
105-
client.Close()
105+
_ = client.Close()
106106
return nil, err
107107
}
108108

connection.go

Lines changed: 93 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package goesl
99
import (
1010
"bufio"
1111
"bytes"
12+
"context"
1213
"fmt"
1314
"io"
1415
"net"
@@ -18,11 +19,17 @@ import (
1819
"time"
1920
)
2021

22+
type message struct {
23+
err error
24+
m *Message
25+
}
26+
2127
// Main connection against ESL - Gotta add more description here
2228
type SocketConnection struct {
2329
net.Conn
24-
err chan error
25-
m chan *Message
30+
31+
receive chan message
32+
2633
mtx sync.Mutex
2734
}
2835

@@ -32,7 +39,7 @@ func (c *SocketConnection) Dial(network string, addr string, timeout time.Durati
3239
}
3340

3441
// Send - Will send raw message to open net connection
35-
func (c *SocketConnection) Send(cmd string) error {
42+
func (c *SocketConnection) Send(ctx context.Context, cmd string) error {
3643

3744
if strings.Contains(cmd, "\r\n") {
3845
return fmt.Errorf(EInvalidCommandProvided, cmd)
@@ -42,6 +49,12 @@ func (c *SocketConnection) Send(cmd string) error {
4249
c.mtx.Lock()
4350
defer c.mtx.Unlock()
4451

52+
deadline, ok := ctx.Deadline()
53+
if ok {
54+
_ = c.SetWriteDeadline(deadline)
55+
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
56+
}
57+
4558
_, err := io.WriteString(c, cmd)
4659
if err != nil {
4760
return err
@@ -56,10 +69,10 @@ func (c *SocketConnection) Send(cmd string) error {
5669
}
5770

5871
// SendMany - Will loop against passed commands and return 1st error if error happens
59-
func (c *SocketConnection) SendMany(cmds []string) error {
72+
func (c *SocketConnection) SendMany(ctx context.Context, cmds []string) error {
6073

6174
for _, cmd := range cmds {
62-
if err := c.Send(cmd); err != nil {
75+
if err := c.Send(ctx, cmd); err != nil {
6376
return err
6477
}
6578
}
@@ -68,7 +81,7 @@ func (c *SocketConnection) SendMany(cmds []string) error {
6881
}
6982

7083
// SendEvent - Will loop against passed event headers
71-
func (c *SocketConnection) SendEvent(eventHeaders []string) error {
84+
func (c *SocketConnection) SendEvent(ctx context.Context, eventHeaders []string) error {
7285
if len(eventHeaders) <= 0 {
7386
return fmt.Errorf(ECouldNotSendEvent, len(eventHeaders))
7487
}
@@ -77,18 +90,20 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
7790
c.mtx.Lock()
7891
defer c.mtx.Unlock()
7992

93+
deadline, ok := ctx.Deadline()
94+
if ok {
95+
_ = c.SetWriteDeadline(deadline)
96+
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
97+
}
98+
8099
_, err := io.WriteString(c, "sendevent ")
81100
if err != nil {
82101
return err
83102
}
84103

85104
for _, eventHeader := range eventHeaders {
86-
_, err := io.WriteString(c, eventHeader)
87-
if err != nil {
88-
return err
89-
}
90105

91-
_, err = io.WriteString(c, "\r\n")
106+
_, err := io.WriteString(c, eventHeader+"\r\n")
92107
if err != nil {
93108
return err
94109
}
@@ -104,27 +119,29 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
104119
}
105120

106121
// Execute - Helper fuck to execute commands with its args and sync/async mode
107-
func (c *SocketConnection) Execute(command, args string, sync bool) (m *Message, err error) {
108-
return c.SendMsg(map[string]string{
109-
"call-command": "execute",
110-
"execute-app-name": command,
111-
"execute-app-arg": args,
112-
"event-lock": strconv.FormatBool(sync),
113-
}, "", "")
122+
func (c *SocketConnection) Execute(ctx context.Context, command, args string, sync bool) (m *Message, err error) {
123+
return c.SendMsg(ctx,
124+
map[string]string{
125+
"call-command": "execute",
126+
"execute-app-name": command,
127+
"execute-app-arg": args,
128+
"event-lock": strconv.FormatBool(sync),
129+
}, "", "")
114130
}
115131

116132
// ExecuteUUID - Helper fuck to execute uuid specific commands with its args and sync/async mode
117-
func (c *SocketConnection) ExecuteUUID(uuid string, command string, args string, sync bool) (m *Message, err error) {
118-
return c.SendMsg(map[string]string{
119-
"call-command": "execute",
120-
"execute-app-name": command,
121-
"execute-app-arg": args,
122-
"event-lock": strconv.FormatBool(sync),
123-
}, uuid, "")
133+
func (c *SocketConnection) ExecuteUUID(ctx context.Context, uuid string, command string, args string, sync bool) (m *Message, err error) {
134+
return c.SendMsg(ctx,
135+
map[string]string{
136+
"call-command": "execute",
137+
"execute-app-name": command,
138+
"execute-app-arg": args,
139+
"event-lock": strconv.FormatBool(sync),
140+
}, uuid, "")
124141
}
125142

126143
// SendMsg - Basically this func will send message to the opened connection
127-
func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m *Message, err error) {
144+
func (c *SocketConnection) SendMsg(ctx context.Context, msg map[string]string, uuid, data string) (*Message, error) {
128145

129146
b := bytes.NewBufferString("sendmsg")
130147

@@ -160,19 +177,26 @@ func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m
160177

161178
// lock mutex
162179
c.mtx.Lock()
163-
_, err = b.WriteTo(c)
180+
defer c.mtx.Unlock()
181+
182+
deadline, ok := ctx.Deadline()
183+
if ok {
184+
_ = c.SetWriteDeadline(deadline)
185+
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
186+
}
187+
188+
_, err := b.WriteTo(c)
164189
if err != nil {
165-
c.mtx.Unlock()
166190
return nil, err
167191
}
168-
c.mtx.Unlock()
169192

170-
select {
171-
case err := <-c.err:
193+
m, err := c.ReadMessage(ctx)
194+
if err != nil {
172195
return nil, err
173-
case m := <-c.m:
174-
return m, nil
175196
}
197+
198+
return m, nil
199+
176200
}
177201

178202
// OriginatorAdd - Will return originator address known as net.RemoteAddr()
@@ -183,40 +207,65 @@ func (c *SocketConnection) OriginatorAddr() net.Addr {
183207

184208
// ReadMessage - Will read message from channels and return them back accordingy.
185209
// If error is received, error will be returned. If not, message will be returned back!
186-
func (c *SocketConnection) ReadMessage() (*Message, error) {
210+
func (c *SocketConnection) ReadMessage(ctx context.Context) (*Message, error) {
187211
Debug("Waiting for connection message to be received ...")
188212

213+
var m message
189214
select {
190-
case err := <-c.err:
191-
return nil, err
192-
case msg := <-c.m:
193-
return msg, nil
215+
case m = <-c.receive:
216+
case <-ctx.Done():
217+
return nil, fmt.Errorf("context deadline exceeded")
218+
}
219+
220+
if m.m == nil {
221+
return nil, fmt.Errorf("unable to read message, channel closed")
222+
}
223+
224+
if m.err != nil {
225+
return nil, m.err
194226
}
227+
228+
return m.m, nil
195229
}
196230

231+
const (
232+
defaultHandleTimeout = time.Second
233+
)
234+
197235
// Handle - Will handle new messages and close connection when there are no messages left to process
198236
func (c *SocketConnection) Handle() {
199237

200-
done := make(chan bool)
238+
done := make(chan struct{})
201239

202240
go func() {
203241
for {
204-
msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)
205242

243+
msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)
206244
if err != nil {
207-
c.err <- err
208-
done <- true
245+
246+
select {
247+
case c.receive <- message{err: err}:
248+
case <-time.After(defaultHandleTimeout):
249+
}
250+
251+
close(done)
209252
break
210253
}
211254

212-
c.m <- msg
255+
select {
256+
case c.receive <- message{m: msg}:
257+
case <-time.After(defaultHandleTimeout):
258+
// if messages are getting dropped, receive syncronization will be messed up and unreliable
259+
}
213260
}
214261
}()
215262

216263
<-done
217264

265+
close(c.receive)
266+
218267
// Closing the connection now as there's nothing left to do ...
219-
c.Close()
268+
_ = c.Close()
220269
}
221270

222271
// Close - Will close down net connection and return error if error happen

examples/client.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
package examples
88

99
import (
10+
"context"
1011
"flag"
1112
"fmt"
12-
. "github.com/0x19/goesl"
1313
"runtime"
1414
"strings"
15+
16+
. "github.com/0x19/goesl"
1517
)
1618

1719
var (
@@ -24,6 +26,8 @@ var (
2426
// Small client that will first make sure all events are returned as JSON and second, will originate
2527
func main() {
2628

29+
ctx := context.Background()
30+
2731
// Boost it as much as it can go ...
2832
runtime.GOMAXPROCS(runtime.NumCPU())
2933

@@ -41,12 +45,12 @@ func main() {
4145
// Remember that this is crutial part in handling incoming messages :)
4246
go client.Handle()
4347

44-
client.Send("events json ALL")
48+
client.Send(ctx, "events json ALL")
4549

46-
client.BgApi(fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))
50+
client.BgApi(ctx, fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))
4751

4852
for {
49-
msg, err := client.ReadMessage()
53+
msg, err := client.ReadMessage(ctx)
5054

5155
if err != nil {
5256

examples/server_playback.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ package examples
88

99
import (
1010
"fmt"
11-
. "github.com/0x19/goesl"
1211
"os"
1312
"runtime"
1413
"strings"
14+
15+
. "github.com/0x19/goesl"
1516
)
1617

1718
var welcomeFile = "%s/media/welcome.wav"

examples/server_tts.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
package examples
88

99
import (
10-
. "github.com/0x19/goesl"
1110
"runtime"
1211
"strings"
12+
13+
. "github.com/0x19/goesl"
1314
)
1415

1516
var (

0 commit comments

Comments
 (0)