Skip to content

Commit 806693b

Browse files
committed
Putting cube back
Signed-off-by: Vishal Rana <[email protected]>
1 parent 15d130a commit 806693b

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ func (c *Client) error(r *resty.Response) bool {
4545
return r.StatusCode() < 200 || r.StatusCode() >= 300
4646
}
4747

48+
func (c *Client) Cube() *Cube {
49+
return &Cube{
50+
client: c,
51+
}
52+
}
53+
4854
func (c *Client) Download(id string, path string) (err *APIError) {
4955
_, e := c.resty.R().
5056
SetOutput(path).

cube.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package labstack
2+
3+
import (
4+
"net"
5+
"net/http"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
)
12+
13+
type (
14+
// Cube defines the LabStack cube service.
15+
Cube struct {
16+
client *Client
17+
requests []*Request
18+
activeRequests int64
19+
started int64
20+
mutex sync.RWMutex
21+
22+
// LabStack API key
23+
APIKey string
24+
25+
// API node
26+
Node string
27+
28+
// API group
29+
Group string
30+
31+
// Tags
32+
Tags []string
33+
34+
// Number of requests in a batch
35+
BatchSize int
36+
37+
// Interval in seconds to dispatch the batch
38+
DispatchInterval time.Duration
39+
40+
// TODO: To be implemented
41+
ClientLookup string
42+
}
43+
44+
// Request defines a request payload to be corded.
45+
Request struct {
46+
ID string `json:"id"`
47+
Time time.Time `json:"time"`
48+
Node string `json:"node"`
49+
Group string `json:"group"`
50+
Tags []string `json:"tags,omitempty"`
51+
Host string `json:"host"`
52+
Path string `json:"path"`
53+
Method string `json:"method"`
54+
Status int `json:"status"`
55+
BytesIn int64 `json:"bytes_in"`
56+
BytesOut int64 `json:"bytes_out"`
57+
Latency int64 `json:"latency"`
58+
ClientID string `json:"client_id"`
59+
RemoteIP string `json:"remote_ip"`
60+
UserAgent string `json:"user_agent"`
61+
Active int64 `json:"active"`
62+
// TODO: CPU, Uptime, Memory
63+
Error string `json:"error"`
64+
StackTrace string `json:"stack_trace"`
65+
}
66+
)
67+
68+
func (c *Cube) resetRequests() {
69+
c.mutex.Lock()
70+
defer c.mutex.Unlock()
71+
c.requests = make([]*Request, 0, c.BatchSize)
72+
}
73+
74+
func (c *Cube) appendRequest(r *Request) {
75+
c.mutex.Lock()
76+
defer c.mutex.Unlock()
77+
c.requests = append(c.requests, r)
78+
}
79+
80+
func (c *Cube) listRequests() []*Request {
81+
c.mutex.Lock()
82+
defer c.mutex.Unlock()
83+
requests := make([]*Request, len(c.requests))
84+
for i, r := range c.requests {
85+
requests[i] = r
86+
}
87+
return requests
88+
}
89+
90+
func (c *Cube) requestsLength() int {
91+
c.mutex.RLock()
92+
defer c.mutex.RUnlock()
93+
return len(c.requests)
94+
}
95+
96+
// Dispatch dispatches the requests batch.
97+
func (c *Cube) Dispatch() error {
98+
if len(c.requests) == 0 {
99+
return nil
100+
}
101+
102+
err := new(APIError)
103+
r, e := c.client.resty.R().
104+
SetBody(c.listRequests()).
105+
SetError(err).
106+
Post("/cube")
107+
if e != nil {
108+
return &APIError{
109+
Message: e.Error(),
110+
}
111+
}
112+
113+
if c.client.error(r) {
114+
return err
115+
}
116+
c.resetRequests()
117+
return nil
118+
}
119+
120+
// Start starts cording an HTTP request.
121+
func (c *Cube) Start(r *http.Request, w http.ResponseWriter) (req *Request) {
122+
if c.started == 0 {
123+
go func() {
124+
d := time.Duration(c.DispatchInterval) * time.Second
125+
for range time.Tick(d) {
126+
if err := c.Dispatch(); err != nil {
127+
c.client.logger.Error(err)
128+
}
129+
}
130+
}()
131+
atomic.AddInt64(&c.started, 1)
132+
}
133+
134+
req = &Request{
135+
ID: RequestID(r, w),
136+
Time: time.Now(),
137+
Node: c.Node,
138+
Group: c.Group,
139+
Tags: c.Tags,
140+
Host: r.Host,
141+
Path: r.URL.Path,
142+
Method: r.Method,
143+
UserAgent: r.UserAgent(),
144+
RemoteIP: RealIP(r),
145+
}
146+
req.ClientID = req.RemoteIP
147+
atomic.AddInt64(&c.activeRequests, 1)
148+
req.Active = c.activeRequests
149+
cl := r.Header.Get("Content-Length")
150+
if cl == "" {
151+
cl = "0"
152+
}
153+
l, err := strconv.ParseInt(cl, 10, 64)
154+
if err != nil {
155+
c.client.logger.Error(err)
156+
}
157+
req.BytesIn = l
158+
c.appendRequest(req)
159+
return
160+
}
161+
162+
// Stop stops cording an HTTP request.
163+
func (a *Cube) Stop(r *Request, status int, size int64) {
164+
atomic.AddInt64(&a.activeRequests, -1)
165+
r.Status = status
166+
r.BytesOut = size
167+
r.Latency = int64(time.Now().Sub(r.Time))
168+
169+
// Dispatch batch
170+
if a.requestsLength() >= a.BatchSize {
171+
go func() {
172+
if err := a.Dispatch(); err != nil {
173+
a.client.logger.Error(err)
174+
}
175+
}()
176+
}
177+
}
178+
179+
// RequestID returns the request ID from the request or response.
180+
func RequestID(r *http.Request, w http.ResponseWriter) string {
181+
id := r.Header.Get("X-Request-ID")
182+
if id == "" {
183+
id = w.Header().Get("X-Request-ID")
184+
}
185+
return id
186+
}
187+
188+
// RealIP returns the real IP from the request.
189+
func RealIP(r *http.Request) string {
190+
ra := r.RemoteAddr
191+
if ip := r.Header.Get("X-Forwarded-For"); ip != "" {
192+
ra = strings.Split(ip, ", ")[0]
193+
} else if ip := r.Header.Get("X-Real-IP"); ip != "" {
194+
ra = ip
195+
} else {
196+
ra, _, _ = net.SplitHostPort(ra)
197+
}
198+
return ra
199+
}

0 commit comments

Comments
 (0)