Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,6 @@ bench: kill-containers run-containers run-bg

bench-run:
@go test -benchmem -bench . -benchtime 5s ./bench/...

mock-lib:
@mockgen github.com/topfreegames/arkadiko/lib ArkadikoInterface | sed 's/mock_lib/mocks/' > lib/mocks/arkadiko.go
15 changes: 15 additions & 0 deletions lib/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// arkadiko
// https://github.com/topfreegames/arkadiko
//
// Licensed under the MIT license:
// http://www.opensource.org/licenses/mit-license
// Copyright © 2016 Top Free Games <backend@tfgco.com>

package lib

import "context"

// ArkadikoInterface defines the interface for the arkadiko client
type ArkadikoInterface interface {
SendMQTT(ctx context.Context, topic string, payload interface{}, retained bool) (*SendMQTTResponse, error)
}
136 changes: 136 additions & 0 deletions lib/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// arkadiko
// https://github.com/topfreegames/arkadiko
//
// Licensed under the MIT license:
// http://www.opensource.org/licenses/mit-license
// Copyright © 2016 Top Free Games <backend@tfgco.com>

package lib

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"

"github.com/spf13/viper"

ehttp "github.com/topfreegames/extensions/http"
)

// Arkadiko represents an arkadiko API application
type Arkadiko struct {
client *http.Client

baseURL string
pass string
user string
}

// NewArkadiko returns a new arkadiko API application
func NewArkadiko(config *viper.Viper) *Arkadiko {
config.SetDefault("arkadiko.maxIdleConns", 100)
config.SetDefault("arkadiko.maxIdleConnsPerHost", http.DefaultMaxIdleConnsPerHost)
config.SetDefault("arkadiko.timeout", 500*time.Millisecond)

return &Arkadiko{
baseURL: config.GetString("arkadiko.url"),
pass: config.GetString("arkadiko.pass"),
user: config.GetString("arkadiko.user"),
client: getHTTPClient(
config.GetDuration("arkadiko.timeout"),
config.GetInt("arkadiko.maxIdleConns"),
config.GetInt("arkadiko.maxIdleConnsPerHost"),
),
}
}

func getHTTPClient(timeout time.Duration, maxIdleConns, maxIdleConnsPerHost int) *http.Client {
client := &http.Client{
Timeout: timeout,
Transport: getHTTPTransport(maxIdleConns, maxIdleConnsPerHost),
}

ehttp.Instrument(client)
return client
}

func getHTTPTransport(maxIdleConns, maxIdleConnsPerHost int) http.RoundTripper {
// Tests use a mock transport.
if _, ok := http.DefaultTransport.(*http.Transport); !ok {
return http.DefaultTransport
}

dialer := &net.Dialer{
DualStack: true,
KeepAlive: 30 * time.Second,
Timeout: 30 * time.Second,
}

// We can't get http.DefaultTransport here and update its
// fields since it's an exported variable, so other libs could
// also change it and overwrite. This hardcoded values are copied
// from http.DefaultTransport but could be configurable too.
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
ExpectContinueTimeout: 1 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 90 * time.Second,
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
}
}

// SendMQTT publishes an MQTT message on the given topic
func (a *Arkadiko) SendMQTT(ctx context.Context, topic string, payload interface{}, retained bool,
) (*SendMQTTResponse, error) {
path := fmt.Sprintf("/sendmqtt/%s?retained=%t", topic, retained)

response, err := a.sendRequest(ctx, "POST", path, payload)
if err != nil {
return nil, err
}

var result *SendMQTTResponse
err = json.Unmarshal(response, &result)

return result, err
}

func (a *Arkadiko) sendRequest(ctx context.Context, method, path string, payload interface{}) ([]byte, error) {
body, err := json.Marshal(payload)
if err != nil {
return nil, err
}

req, err := http.NewRequest(method, a.baseURL+path, bytes.NewBuffer(body))
if err != nil {
return nil, err
}

req.SetBasicAuth(a.user, a.pass)
req.Header.Set("Content-Type", "application/json")

resp, err := a.client.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode > 399 {
return nil, NewRequestError(resp.StatusCode, string(respBody))
}

return respBody, nil
}
13 changes: 13 additions & 0 deletions lib/lib_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package lib_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestLib(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Lib Suite")
}
86 changes: 86 additions & 0 deletions lib/lib_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package lib_test

import (
"context"

"github.com/jarcoal/httpmock"
"github.com/spf13/viper"
"github.com/topfreegames/arkadiko/lib"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Lib", func() {
var (
arkadiko *lib.Arkadiko
config *viper.Viper
)

BeforeSuite(func() {
config = viper.New()
httpmock.Activate()
})

BeforeEach(func() {
config.Set("arkadiko.url", "http://arkadiko")
config.Set("arkadiko.user", "user")
config.Set("arkadiko.pass", "pass")

arkadiko = lib.NewArkadiko(config)
httpmock.Reset()
})

Describe("NewArkadiko", func() {
It("Should start a new instance of Arkadiko Lib", func() {
arkadiko = lib.NewArkadiko(config)
Expect(arkadiko).ToNot(BeNil())
})
})

Describe("SendMQTT", func() {
It("Should call arkadiko API to send MQTT", func() {
httpmock.RegisterResponder(
"POST", "http://arkadiko/sendmqtt/topic?retained=false",
httpmock.NewStringResponder(200, `{
"payload": {"message": "message"},
"retained": false,
"topic": "topic"
}`,
),
)

payload := map[string]string{
"message": "message",
}

response, err := arkadiko.SendMQTT(context.Background(), "topic", payload, false)

Expect(err).To(BeNil())
Expect(response).ToNot(BeNil())
Expect(response.Payload).To(BeEquivalentTo(`{"message": "message"}`))
Expect(response.Retained).To(BeFalse())
Expect(response.Topic).To(Equal("topic"))
})

It("Should return meaningful error", func() {
httpmock.RegisterResponder(
"POST", "http://arkadiko/sendmqtt/topic?retained=false",
httpmock.NewStringResponder(404, "Not Found"),
)

payload := map[string]string{
"message": "message",
}

response, err := arkadiko.SendMQTT(context.Background(), "topic", payload, false)

Expect(err).To(Equal(lib.NewRequestError(404, "Not Found")))
Expect(response).To(BeNil())
})
})

AfterSuite(func() {
httpmock.DeactivateAndReset()
})
})
50 changes: 50 additions & 0 deletions lib/mocks/arkadiko.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions lib/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// arkadiko
// https://github.com/topfreegames/arkadiko
//
// Licensed under the MIT license:
// http://www.opensource.org/licenses/mit-license
// Copyright © 2016 Top Free Games <backend@tfgco.com>

package lib

import (
"encoding/json"
"fmt"
)

// RequestError contains the code and body of a failed request
type RequestError struct {
statusCode int
body string
}

// NewRequestError returns a new RequestError
func NewRequestError(statusCode int, body string) *RequestError {
return &RequestError{
statusCode: statusCode,
body: body,
}
}

func (r *RequestError) Error() string {
return fmt.Sprintf("request error: %d %s", r.statusCode, r.body)
}

// Status returns the status code of the error
func (r *RequestError) Status() int {
return r.statusCode
}

// SendMQTTResponse is the result of the SendMQTT request
type SendMQTTResponse struct {
Payload json.RawMessage
Retained bool
Topic string
}