Skip to content
This repository was archived by the owner on Sep 9, 2022. It is now read-only.

Commit 91eb488

Browse files
committed
Resolve conflit
2 parents e3ba077 + c55501e commit 91eb488

File tree

5 files changed

+175
-1
lines changed

5 files changed

+175
-1
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ build/
1313
pump.conf
1414
tyk-pump
1515
.DS_Store
16-
migrate.js
16+
migrate.js
17+
*.orig

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,27 @@ Create a `pump.conf` file:
9494
"org_id",
9595
"oauth_id"]
9696
}
97+
},
98+
"graylog": {
99+
"name": "graylog",
100+
"meta": {
101+
"host": "10.60.6.15",
102+
"port": 12216,
103+
"tags": [
104+
"method",
105+
"path",
106+
"response_code",
107+
"api_key",
108+
"api_version",
109+
"api_name",
110+
"api_id",
111+
"org_id",
112+
"oauth_id",
113+
"raw_request",
114+
"request_time",
115+
"raw_response"
116+
]
117+
}
97118
}
98119
},
99120
"uptime_pump_config": {

pumps/graylog.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package pumps
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/json"
6+
"github.com/Sirupsen/logrus"
7+
"github.com/TykTechnologies/tyk-pump/analytics"
8+
"github.com/mitchellh/mapstructure"
9+
"github.com/robertkowalski/graylog-golang"
10+
"strconv"
11+
)
12+
13+
type GraylogPump struct {
14+
client *gelf.Gelf
15+
conf *GraylogConf
16+
}
17+
18+
type GraylogConf struct {
19+
GraylogHost string `mapstructure:"host"`
20+
GraylogPort int `mapstructure:"port"`
21+
Tags []string `mapstructure: "tags"`
22+
}
23+
24+
var graylogPrefix string = "graylog-pump"
25+
26+
func (p *GraylogPump) New() Pump {
27+
newPump := GraylogPump{}
28+
return &newPump
29+
}
30+
31+
func (p *GraylogPump) GetName() string {
32+
return "Graylog Pump"
33+
}
34+
35+
func (p *GraylogPump) Init(conf interface{}) error {
36+
p.conf = &GraylogConf{}
37+
err := mapstructure.Decode(conf, &p.conf)
38+
if err != nil {
39+
log.WithFields(logrus.Fields{
40+
"prefix": graylogPrefix,
41+
}).Fatal("Failed to decode configuration: ", err)
42+
}
43+
44+
if p.conf.GraylogHost == "" {
45+
p.conf.GraylogHost = "localhost"
46+
}
47+
48+
if p.conf.GraylogPort == 0 {
49+
p.conf.GraylogPort = 1000
50+
}
51+
log.WithFields(logrus.Fields{
52+
"prefix": graylogPrefix,
53+
}).Info("GraylogHost:", p.conf.GraylogHost)
54+
log.WithFields(logrus.Fields{
55+
"prefix": graylogPrefix,
56+
}).Info("GraylogPort:", p.conf.GraylogPort)
57+
58+
p.connect()
59+
return nil
60+
}
61+
62+
func (p *GraylogPump) connect() {
63+
p.client = gelf.New(gelf.Config{
64+
GraylogPort: p.conf.GraylogPort,
65+
GraylogHostname: p.conf.GraylogHost,
66+
})
67+
}
68+
69+
func (p *GraylogPump) WriteData(data []interface{}) error {
70+
log.WithFields(logrus.Fields{
71+
"prefix": graylogPrefix,
72+
}).Info("Writing ", len(data), " records")
73+
74+
if p.client == nil {
75+
p.connect()
76+
p.WriteData(data)
77+
}
78+
79+
for _, item := range data {
80+
record := item.(analytics.AnalyticsRecord)
81+
82+
rReq, err := base64.StdEncoding.DecodeString(record.RawRequest)
83+
if err != nil {
84+
log.WithFields(logrus.Fields{
85+
"prefix": graylogPrefix,
86+
}).Fatal(err)
87+
}
88+
89+
rResp, err := base64.StdEncoding.DecodeString(record.RawRequest)
90+
91+
if err != nil {
92+
log.WithFields(logrus.Fields{
93+
"prefix": graylogPrefix,
94+
}).Fatal(err)
95+
}
96+
97+
mapping := map[string]interface{}{
98+
"method": record.Method,
99+
"path": record.Path,
100+
"response_code": strconv.Itoa(record.ResponseCode),
101+
"api_key": record.APIKey,
102+
"api_version": record.APIVersion,
103+
"api_name": record.APIName,
104+
"api_id": record.APIID,
105+
"org_id": record.OrgID,
106+
"oauth_id": record.OauthID,
107+
"raw_request": string(rReq),
108+
"request_time": strconv.Itoa(int(record.RequestTime)),
109+
"raw_response": string(rResp),
110+
}
111+
112+
messageMap := map[string]string{}
113+
114+
for _, key := range p.conf.Tags {
115+
if value, ok := mapping[key]; ok {
116+
messageMap[key] = value.(string)
117+
}
118+
}
119+
120+
message, err := json.Marshal(messageMap)
121+
if err != nil {
122+
log.WithFields(logrus.Fields{
123+
"prefix": graylogPrefix,
124+
}).Fatal(err)
125+
}
126+
127+
gelfData := map[string]interface{}{
128+
//"version": "1.1",
129+
"host": "tyk-pumps",
130+
"timestamp": record.TimeStamp.Unix(),
131+
"message": string(message),
132+
}
133+
134+
gelfString, err := json.Marshal(gelfData)
135+
136+
if err != nil {
137+
log.WithFields(logrus.Fields{
138+
"prefix": graylogPrefix,
139+
}).Fatal(err)
140+
}
141+
142+
log.WithFields(logrus.Fields{
143+
"prefix": graylogPrefix,
144+
}).Info("Writing ", string(message))
145+
146+
p.client.Log(string(gelfString))
147+
}
148+
return nil
149+
}

pumps/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ func init() {
1717
AvailablePumps["influx"] = &InfluxPump{}
1818
AvailablePumps["statsd"] = &StatsdPump{}
1919
AvailablePumps["segment"] = &SegmentPump{}
20+
AvailablePumps["graylog"] = &GraylogPump{}
2021
}

pumps/pump.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ func GetPumpByName(name string) (Pump, error) {
2929
return AvailablePumps["statsd"], nil
3030
case "segment":
3131
return AvailablePumps["segment"], nil
32+
case "graylog":
33+
return AvailablePumps["graylog"], nil
3234
}
3335

3436
return nil, errors.New("Not found")

0 commit comments

Comments
 (0)