Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
language: go

go:
- 1.3
- 1.4
- 1.5
- 1.8
- 1.9
- tip

install:
- go get -v "github.com/smartystreets/goconvey"
- go get -v "github.com/op/go-logging"
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func (c *Client) EstablishConnection() error {

c.SocketConnection = SocketConnection{
Conn: conn,
err: make(chan error),
m: make(chan *Message),

receive: make(chan message),
}

return nil
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewClient(host string, port uint, passwd string, timeout int) (*Client, err

err = client.Authenticate()
if err != nil {
client.Close()
_ = client.Close()
return nil, err
}

Expand Down
137 changes: 93 additions & 44 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package goesl
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net"
Expand All @@ -18,11 +19,17 @@ import (
"time"
)

type message struct {
err error
m *Message
}

// Main connection against ESL - Gotta add more description here
type SocketConnection struct {
net.Conn
err chan error
m chan *Message

receive chan message

mtx sync.Mutex
}

Expand All @@ -32,7 +39,7 @@ func (c *SocketConnection) Dial(network string, addr string, timeout time.Durati
}

// Send - Will send raw message to open net connection
func (c *SocketConnection) Send(cmd string) error {
func (c *SocketConnection) Send(ctx context.Context, cmd string) error {

if strings.Contains(cmd, "\r\n") {
return fmt.Errorf(EInvalidCommandProvided, cmd)
Expand All @@ -42,6 +49,12 @@ func (c *SocketConnection) Send(cmd string) error {
c.mtx.Lock()
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := io.WriteString(c, cmd)
if err != nil {
return err
Expand All @@ -56,10 +69,10 @@ func (c *SocketConnection) Send(cmd string) error {
}

// SendMany - Will loop against passed commands and return 1st error if error happens
func (c *SocketConnection) SendMany(cmds []string) error {
func (c *SocketConnection) SendMany(ctx context.Context, cmds []string) error {

for _, cmd := range cmds {
if err := c.Send(cmd); err != nil {
if err := c.Send(ctx, cmd); err != nil {
return err
}
}
Expand All @@ -68,7 +81,7 @@ func (c *SocketConnection) SendMany(cmds []string) error {
}

// SendEvent - Will loop against passed event headers
func (c *SocketConnection) SendEvent(eventHeaders []string) error {
func (c *SocketConnection) SendEvent(ctx context.Context, eventHeaders []string) error {
if len(eventHeaders) <= 0 {
return fmt.Errorf(ECouldNotSendEvent, len(eventHeaders))
}
Expand All @@ -77,18 +90,20 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
c.mtx.Lock()
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := io.WriteString(c, "sendevent ")
if err != nil {
return err
}

for _, eventHeader := range eventHeaders {
_, err := io.WriteString(c, eventHeader)
if err != nil {
return err
}

_, err = io.WriteString(c, "\r\n")
_, err := io.WriteString(c, eventHeader+"\r\n")
if err != nil {
return err
}
Expand All @@ -104,27 +119,29 @@ func (c *SocketConnection) SendEvent(eventHeaders []string) error {
}

// Execute - Helper fuck to execute commands with its args and sync/async mode
func (c *SocketConnection) Execute(command, args string, sync bool) (m *Message, err error) {
return c.SendMsg(map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, "", "")
func (c *SocketConnection) Execute(ctx context.Context, command, args string, sync bool) (m *Message, err error) {
return c.SendMsg(ctx,
map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, "", "")
}

// ExecuteUUID - Helper fuck to execute uuid specific commands with its args and sync/async mode
func (c *SocketConnection) ExecuteUUID(uuid string, command string, args string, sync bool) (m *Message, err error) {
return c.SendMsg(map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, uuid, "")
func (c *SocketConnection) ExecuteUUID(ctx context.Context, uuid string, command string, args string, sync bool) (m *Message, err error) {
return c.SendMsg(ctx,
map[string]string{
"call-command": "execute",
"execute-app-name": command,
"execute-app-arg": args,
"event-lock": strconv.FormatBool(sync),
}, uuid, "")
}

// SendMsg - Basically this func will send message to the opened connection
func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m *Message, err error) {
func (c *SocketConnection) SendMsg(ctx context.Context, msg map[string]string, uuid, data string) (*Message, error) {

b := bytes.NewBufferString("sendmsg")

Expand Down Expand Up @@ -160,19 +177,26 @@ func (c *SocketConnection) SendMsg(msg map[string]string, uuid, data string) (m

// lock mutex
c.mtx.Lock()
_, err = b.WriteTo(c)
defer c.mtx.Unlock()

deadline, ok := ctx.Deadline()
if ok {
_ = c.SetWriteDeadline(deadline)
defer func() { _ = c.SetWriteDeadline(time.Time{}) }()
}

_, err := b.WriteTo(c)
if err != nil {
c.mtx.Unlock()
return nil, err
}
c.mtx.Unlock()

select {
case err := <-c.err:
m, err := c.ReadMessage(ctx)
if err != nil {
return nil, err
case m := <-c.m:
return m, nil
}

return m, nil

}

// OriginatorAdd - Will return originator address known as net.RemoteAddr()
Expand All @@ -183,40 +207,65 @@ func (c *SocketConnection) OriginatorAddr() net.Addr {

// ReadMessage - Will read message from channels and return them back accordingy.
// If error is received, error will be returned. If not, message will be returned back!
func (c *SocketConnection) ReadMessage() (*Message, error) {
func (c *SocketConnection) ReadMessage(ctx context.Context) (*Message, error) {
Debug("Waiting for connection message to be received ...")

var m message
select {
case err := <-c.err:
return nil, err
case msg := <-c.m:
return msg, nil
case m = <-c.receive:
case <-ctx.Done():
return nil, fmt.Errorf("context deadline exceeded")
}

if m.m == nil {
return nil, fmt.Errorf("unable to read message, channel closed")
}

if m.err != nil {
return nil, m.err
}

return m.m, nil
}

const (
defaultHandleTimeout = time.Second
)

// Handle - Will handle new messages and close connection when there are no messages left to process
func (c *SocketConnection) Handle() {

done := make(chan bool)
done := make(chan struct{})

go func() {
for {
msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)

msg, err := newMessage(bufio.NewReaderSize(c, ReadBufferSize), true)
if err != nil {
c.err <- err
done <- true

select {
case c.receive <- message{err: err}:
case <-time.After(defaultHandleTimeout):
}

close(done)
break
}

c.m <- msg
select {
case c.receive <- message{m: msg}:
case <-time.After(defaultHandleTimeout):
// if messages are getting dropped, receive syncronization will be messed up and unreliable
}
}
}()

<-done

close(c.receive)

// Closing the connection now as there's nothing left to do ...
c.Close()
_ = c.Close()
}

// Close - Will close down net connection and return error if error happen
Expand Down
12 changes: 8 additions & 4 deletions examples/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package examples

import (
"context"
"flag"
"fmt"
. "github.com/0x19/goesl"
"runtime"
"strings"

. "github.com/0x19/goesl"
)

var (
Expand All @@ -24,6 +26,8 @@ var (
// Small client that will first make sure all events are returned as JSON and second, will originate
func main() {

ctx := context.Background()

// Boost it as much as it can go ...
runtime.GOMAXPROCS(runtime.NumCPU())

Expand All @@ -41,12 +45,12 @@ func main() {
// Remember that this is crutial part in handling incoming messages :)
go client.Handle()

client.Send("events json ALL")
client.Send(ctx, "events json ALL")

client.BgApi(fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))
client.BgApi(ctx, fmt.Sprintf("originate %s %s", "sofia/internal/[email protected]", "&socket(192.168.1.2:8084 async full)"))

for {
msg, err := client.ReadMessage()
msg, err := client.ReadMessage(ctx)

if err != nil {

Expand Down
15 changes: 9 additions & 6 deletions examples/server_playback.go → examples/serverplayback.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package examples

import (
"context"
"fmt"
. "github.com/0x19/goesl"
"os"
"runtime"
"strings"

. "github.com/0x19/goesl"
)

var welcomeFile = "%s/media/welcome.wav"
Expand Down Expand Up @@ -47,6 +49,7 @@ func main() {

// handle - Running under goroutine here to explain how to handle playback ( play to the caller )
func handle(s *OutboundServer) {
ctx := context.Background()

for {

Expand All @@ -55,12 +58,12 @@ func handle(s *OutboundServer) {
case conn := <-s.Conns:
Notice("New incomming connection: %v", conn)

if err := conn.Connect(); err != nil {
if err := conn.Connect(ctx); err != nil {
Error("Got error while accepting connection: %s", err)
break
}

answer, err := conn.ExecuteAnswer("", false)
answer, err := conn.ExecuteAnswer(ctx, "", false)

if err != nil {
Error("Got error while executing answer: %s", err)
Expand All @@ -72,14 +75,14 @@ func handle(s *OutboundServer) {

cUUID := answer.GetCallUUID()

if sm, err := conn.Execute("playback", welcomeFile, true); err != nil {
if sm, err := conn.Execute(ctx, "playback", welcomeFile, true); err != nil {
Error("Got error while executing playback: %s", err)
break
} else {
Debug("Playback Message: %s", sm)
}

if hm, err := conn.ExecuteHangup(cUUID, "", false); err != nil {
if hm, err := conn.ExecuteHangup(ctx, cUUID, "", false); err != nil {
Error("Got error while executing hangup: %s", err)
break
} else {
Expand All @@ -88,7 +91,7 @@ func handle(s *OutboundServer) {

go func() {
for {
msg, err := conn.ReadMessage()
msg, err := conn.ReadMessage(ctx)

if err != nil {

Expand Down
Loading