|
| 1 | +package cube |
| 2 | + |
| 3 | +import ( |
| 4 | + "os" |
| 5 | + "sync" |
| 6 | + "sync/atomic" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/go-resty/resty" |
| 10 | + "github.com/labstack/gommon/log" |
| 11 | + "github.com/shirou/gopsutil/process" |
| 12 | +) |
| 13 | + |
| 14 | +type ( |
| 15 | + Cube struct { |
| 16 | + Options |
| 17 | + startTime int64 |
| 18 | + uptime int64 |
| 19 | + cpu float32 |
| 20 | + memory uint64 |
| 21 | + requests []*Request |
| 22 | + activeRequests int64 |
| 23 | + mutex sync.RWMutex |
| 24 | + client *resty.Client |
| 25 | + logger *log.Logger |
| 26 | + } |
| 27 | + |
| 28 | + Options struct { |
| 29 | + // Node id |
| 30 | + Node string |
| 31 | + |
| 32 | + // Tags |
| 33 | + Tags []string |
| 34 | + |
| 35 | + // Number of requests in a batch |
| 36 | + BatchSize int |
| 37 | + |
| 38 | + // Interval in seconds to dispatch the batch |
| 39 | + DispatchInterval time.Duration |
| 40 | + } |
| 41 | + |
| 42 | + Request struct { |
| 43 | + ID string `json:"id,omitempty"` |
| 44 | + Time int64 `json:"time,omitempty"` |
| 45 | + Tags []string `json:"tags,omitempty"` |
| 46 | + Host string `json:"host,omitempty"` |
| 47 | + Path string `json:"path,omitempty"` |
| 48 | + Method string `json:"method,omitempty"` |
| 49 | + Status int `json:"status,omitempty"` |
| 50 | + BytesIn int64 `json:"bytes_in,omitempty"` |
| 51 | + BytesOut int64 `json:"bytes_out,omitempty"` |
| 52 | + Latency int64 `json:"latency,omitempty"` |
| 53 | + ClientID string `json:"client_id,omitempty"` |
| 54 | + RemoteIP string `json:"remote_ip,omitempty"` |
| 55 | + UserAgent string `json:"user_agent,omitempty"` |
| 56 | + Active int64 `json:"active,omitempty"` |
| 57 | + Error string `json:"error,omitempty"` |
| 58 | + StackTrace string `json:"stack_trace,omitempty"` |
| 59 | + Node string `json:"node,omitempty"` |
| 60 | + Uptime int64 `json:"uptime,omitempty"` |
| 61 | + CPU float32 `json:"cpu,omitempty"` |
| 62 | + Memory uint64 `json:"memory,omitempty"` |
| 63 | + } |
| 64 | + |
| 65 | + // APIError struct { |
| 66 | + // Code int `json:"code"` |
| 67 | + // Message string `json:"message"` |
| 68 | + // } |
| 69 | +) |
| 70 | + |
| 71 | +func New(apiKey string, options Options) *Cube { |
| 72 | + c := &Cube{ |
| 73 | + startTime: time.Now().Unix(), |
| 74 | + client: resty.New(). |
| 75 | + SetHostURL("https://api.labstack.com"). |
| 76 | + SetAuthToken(apiKey). |
| 77 | + SetHeader("User-Agent", "labstack/cube"), |
| 78 | + logger: log.New("cube"), |
| 79 | + } |
| 80 | + c.Options = options |
| 81 | + |
| 82 | + // Defaults |
| 83 | + if c.Node == "" { |
| 84 | + c.Node, _ = os.Hostname() |
| 85 | + } |
| 86 | + if c.BatchSize == 0 { |
| 87 | + c.BatchSize = 60 |
| 88 | + } |
| 89 | + if c.DispatchInterval == 0 { |
| 90 | + c.DispatchInterval = 60 |
| 91 | + } |
| 92 | + |
| 93 | + // Dispatch daemon |
| 94 | + go func() { |
| 95 | + d := time.Duration(c.DispatchInterval) * time.Second |
| 96 | + for range time.Tick(d) { |
| 97 | + c.Dispatch() |
| 98 | + } |
| 99 | + }() |
| 100 | + |
| 101 | + // System daemon |
| 102 | + go func() { |
| 103 | + p, _ := process.NewProcess(int32(os.Getpid())) |
| 104 | + |
| 105 | + for range time.Tick(time.Second) { |
| 106 | + c.uptime = time.Now().Unix() - c.startTime |
| 107 | + cpu, _ := p.CPUPercent() |
| 108 | + c.cpu = float32(cpu) |
| 109 | + mem, _ := p.MemoryInfo() |
| 110 | + c.memory = mem.RSS |
| 111 | + } |
| 112 | + }() |
| 113 | + |
| 114 | + return c |
| 115 | +} |
| 116 | + |
| 117 | +func (c *Cube) resetRequests() { |
| 118 | + c.mutex.Lock() |
| 119 | + defer c.mutex.Unlock() |
| 120 | + c.requests = make([]*Request, 0, c.BatchSize) |
| 121 | +} |
| 122 | + |
| 123 | +func (c *Cube) appendRequest(r *Request) { |
| 124 | + c.mutex.Lock() |
| 125 | + defer c.mutex.Unlock() |
| 126 | + c.requests = append(c.requests, r) |
| 127 | +} |
| 128 | + |
| 129 | +func (c *Cube) listRequests() []*Request { |
| 130 | + c.mutex.Lock() |
| 131 | + defer c.mutex.Unlock() |
| 132 | + requests := make([]*Request, len(c.requests)) |
| 133 | + for i, r := range c.requests { |
| 134 | + requests[i] = r |
| 135 | + } |
| 136 | + return requests |
| 137 | +} |
| 138 | + |
| 139 | +func (c *Cube) requestsLength() int { |
| 140 | + c.mutex.RLock() |
| 141 | + defer c.mutex.RUnlock() |
| 142 | + return len(c.requests) |
| 143 | +} |
| 144 | + |
| 145 | +// Dispatch dispatches the requests batch. |
| 146 | +func (c *Cube) Dispatch() { |
| 147 | + if len(c.requests) == 0 { |
| 148 | + return |
| 149 | + } |
| 150 | + |
| 151 | + // err := new(APIError) |
| 152 | + res, err := c.client.R(). |
| 153 | + SetBody(c.listRequests()). |
| 154 | + // SetError(err). |
| 155 | + Post("/cube") |
| 156 | + if err != nil { |
| 157 | + c.logger.Error(err) |
| 158 | + return |
| 159 | + } |
| 160 | + if res.StatusCode() < 200 || res.StatusCode() >= 300 { |
| 161 | + c.logger.Error(res.Body()) |
| 162 | + } |
| 163 | + |
| 164 | + c.resetRequests() |
| 165 | +} |
| 166 | + |
| 167 | +// Start starts cording an HTTP request. |
| 168 | +func (c *Cube) Start(r *Request) { |
| 169 | + atomic.AddInt64(&c.activeRequests, 1) |
| 170 | + |
| 171 | + r.Time = time.Now().UnixNano() / 1000 |
| 172 | + r.Active = c.activeRequests |
| 173 | + r.Node = c.Node |
| 174 | + r.Uptime = c.uptime |
| 175 | + r.CPU = c.cpu |
| 176 | + r.Memory = c.memory |
| 177 | + r.Tags = c.Tags |
| 178 | + |
| 179 | + c.appendRequest(r) |
| 180 | +} |
| 181 | + |
| 182 | +// Stop stops recording an HTTP request. |
| 183 | +func (c *Cube) Stop(r *Request) { |
| 184 | + atomic.AddInt64(&c.activeRequests, -1) |
| 185 | + |
| 186 | + r.Latency = time.Now().UnixNano()/1000 - r.Time |
| 187 | + |
| 188 | + // Dispatch batch |
| 189 | + if c.requestsLength() >= c.BatchSize { |
| 190 | + go func() { |
| 191 | + c.Dispatch() |
| 192 | + }() |
| 193 | + } |
| 194 | +} |
| 195 | + |
| 196 | +// RequestID returns the request ID from the request or response. |
| 197 | +// func RequestID(r *http.Request, w http.ResponseWriter) string { |
| 198 | +// id := r.Header.Get("X-Request-ID") |
| 199 | +// if id == "" { |
| 200 | +// id = w.Header().Get("X-Request-ID") |
| 201 | +// } |
| 202 | +// return id |
| 203 | +// } |
| 204 | + |
| 205 | +// RealIP returns the real IP from the request. |
| 206 | +// func RealIP(r *http.Request) string { |
| 207 | +// ra := r.RemoteAddr |
| 208 | +// if ip := r.Header.Get("X-Forwarded-For"); ip != "" { |
| 209 | +// ra = strings.Split(ip, ", ")[0] |
| 210 | +// } else if ip := r.Header.Get("X-Real-IP"); ip != "" { |
| 211 | +// ra = ip |
| 212 | +// } else { |
| 213 | +// ra, _, _ = net.SplitHostPort(ra) |
| 214 | +// } |
| 215 | +// return ra |
| 216 | +// } |
| 217 | + |
| 218 | +// func (e *APIError) Error() string { |
| 219 | +// return e.Message |
| 220 | +// } |
0 commit comments