Skip to content

Commit a5f07ba

Browse files
rockb1017luckyj5
andauthored
Data monitor (#242)
* data reliability monitoring Co-authored-by: Shubham Jain <[email protected]>
1 parent 6a29e97 commit a5f07ba

File tree

12 files changed

+171
-85
lines changed

12 files changed

+171
-85
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ testall: test vet race cov
6565
test:
6666
@go test ${PKGS}
6767

68+
testv:
69+
@go test -v ${PKGS}
6870
# Run "short" unit tests
6971
test-short:
7072
@go test -short ${PKGS}

eventsink/splunk.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ import (
66
"strconv"
77
"strings"
88
"sync"
9-
"time"
10-
119
"sync/atomic"
10+
"time"
1211

1312
"code.cloudfoundry.org/lager"
1413
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventwriter"
@@ -18,25 +17,27 @@ import (
1817
const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4"
1918

2019
type SplunkConfig struct {
21-
FlushInterval time.Duration
22-
QueueSize int // consumer queue buffer size
23-
BatchSize int
24-
Retries int // No of retries to post events to HEC before dropping events
25-
Hostname string
26-
Version string
27-
SubscriptionID string
28-
ExtraFields map[string]string
29-
TraceLogging bool
30-
UUID string
31-
Logger lager.Logger
20+
FlushInterval time.Duration
21+
QueueSize int // consumer queue buffer size
22+
BatchSize int
23+
Retries int // No of retries to post events to HEC before dropping events
24+
Hostname string
25+
Version string
26+
SubscriptionID string
27+
ExtraFields map[string]string
28+
TraceLogging bool
29+
UUID string
30+
Logger lager.Logger
31+
StatusMonitorInterval time.Duration
3232
}
3333

3434
type Splunk struct {
35-
writers []eventwriter.Writer
36-
config *SplunkConfig
37-
events chan map[string]interface{}
38-
wg sync.WaitGroup
39-
eventCount uint64
35+
writers []eventwriter.Writer
36+
config *SplunkConfig
37+
events chan map[string]interface{}
38+
wg sync.WaitGroup
39+
eventCount uint64
40+
sentCountChan chan uint64
4041

4142
// cached IP
4243
ip string
@@ -47,11 +48,12 @@ func NewSplunk(writers []eventwriter.Writer, config *SplunkConfig) *Splunk {
4748
config.Hostname = hostname
4849

4950
return &Splunk{
50-
writers: writers,
51-
config: config,
52-
events: make(chan map[string]interface{}, config.QueueSize),
53-
ip: ip,
54-
eventCount: 0,
51+
writers: writers,
52+
config: config,
53+
events: make(chan map[string]interface{}, config.QueueSize),
54+
ip: ip,
55+
eventCount: 0,
56+
sentCountChan: make(chan uint64, 100),
5557
}
5658
}
5759

@@ -74,7 +76,6 @@ func (s *Splunk) Write(fields map[string]interface{}, msg string) error {
7476
if len(msg) > 0 {
7577
fields["msg"] = msg
7678
}
77-
7879
s.events <- fields
7980
return nil
8081
}
@@ -121,10 +122,14 @@ func (s *Splunk) indexEvents(writer eventwriter.Writer, batch []map[string]inter
121122
}
122123
var err error
123124
for i := 0; i < s.config.Retries; i++ {
124-
err = writer.Write(batch)
125+
err, sentCount := writer.Write(batch)
125126
if err == nil {
127+
if s.config.StatusMonitorInterval > time.Second*0 {
128+
s.sentCountChan <- sentCount
129+
}
126130
return nil
127131
}
132+
// add length of batch to counter
128133
s.config.Logger.Error("Unable to talk to Splunk", err, lager.Data{"Retry attempt": i + 1})
129134
time.Sleep(getRetryInterval(i))
130135
}
@@ -213,6 +218,35 @@ func (s *Splunk) Log(message lager.LogFormat) {
213218
s.writers[len(s.writers)-1].Write(events)
214219
}
215220

221+
func (s *Splunk) LogStatus() {
222+
timer := time.NewTimer(s.config.StatusMonitorInterval)
223+
var sent uint64 = 0
224+
for {
225+
select {
226+
case <-timer.C:
227+
percent := float64(len(s.events)) / float64(s.config.QueueSize) * 100.0
228+
status := "low"
229+
switch {
230+
case percent > 99.9:
231+
status = "too high"
232+
case percent > 90:
233+
status = "high"
234+
case percent > 50:
235+
status = "medium"
236+
}
237+
s.config.Logger.Info("Memory_Queue_Pressure", lager.Data{"events_in_consumer_queue": len(s.events), "percentage": int(percent), "status": status})
238+
s.config.Logger.Info("Event_Count", lager.Data{"event_count_sent": sent})
239+
sent = 0
240+
timer.Reset(s.config.StatusMonitorInterval)
241+
default:
242+
}
243+
select {
244+
case sentCount := <-s.sentCountChan:
245+
atomic.AddUint64(&sent, sentCount)
246+
default:
247+
}
248+
}
249+
}
216250
func getRetryInterval(attempt int) time.Duration {
217251
// algorithm taken from https://en.wikipedia.org/wiki/Exponential_backoff
218252
timeInSec := 5 + (0.5 * (math.Exp2(float64(attempt)) - 1.0))

eventsource/firehose.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import (
1111

1212
// FirehoseConfig struct with 4 fields of different types.
1313
type FirehoseConfig struct {
14-
KeepAlive time.Duration
15-
SkipSSL bool
16-
Endpoint string
17-
SubscriptionID string
18-
GatewayErrChanAddr *chan error
19-
GatewayLoggerAddr *log.Logger
20-
GatewayMaxRetries int
21-
Logger lager.Logger
14+
KeepAlive time.Duration
15+
SkipSSL bool
16+
Endpoint string
17+
SubscriptionID string
18+
GatewayErrChanAddr *chan error
19+
GatewayLoggerAddr *log.Logger
20+
GatewayMaxRetries int
21+
StatusMonitorInterval time.Duration
22+
Logger lager.Logger
2223
}
2324

2425
// Doer is used to make HTTP requests to the RLP Gateway.

eventsource/v2adapter.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"code.cloudfoundry.org/go-loggregator/v8"
55
"code.cloudfoundry.org/go-loggregator/v8/conversion"
66
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
7+
"code.cloudfoundry.org/lager"
78
"context"
89
"github.com/cloudfoundry/sonde-go/events"
10+
"sync/atomic"
11+
"time"
912
)
1013

1114
// Streamer implements Stream which returns a new EnvelopeStream for the given context and request.
@@ -71,11 +74,35 @@ func (a V2Adapter) Firehose(config *FirehoseConfig) chan *events.Envelope {
7174
}()
7275

7376
go func() {
74-
for ctx.Err() == nil {
75-
e := <-v2msgs
76-
//// ToV1 converts v2 envelopes down to v1 envelopes.
77-
for _, v1e := range conversion.ToV1(e) {
78-
v1msgs <- v1e
77+
var receivedCount uint64 = 0
78+
79+
if config.StatusMonitorInterval > time.Second*0 {
80+
timer := time.NewTimer(config.StatusMonitorInterval)
81+
for ctx.Err() == nil {
82+
select {
83+
case <-timer.C:
84+
config.Logger.Info("Data_Flow_Monitoring", lager.Data{"events_pre_processing": len(v2msgs), "events_in_process": len(v1msgs)})
85+
config.Logger.Info("Event_Count", lager.Data{"event_count_received": receivedCount})
86+
timer.Reset(config.StatusMonitorInterval)
87+
receivedCount = 0
88+
default:
89+
}
90+
select {
91+
case e := <-v2msgs:
92+
atomic.AddUint64(&receivedCount, 1)
93+
//// ToV1 converts v2 envelopes down to v1 envelopes.
94+
for _, v1e := range conversion.ToV1(e) {
95+
v1msgs <- v1e
96+
}
97+
default:
98+
}
99+
}
100+
} else {
101+
for ctx.Err() == nil {
102+
e := <-v2msgs
103+
for _, v1e := range conversion.ToV1(e) {
104+
v1msgs <- v1e
105+
}
79106
}
80107
}
81108
}()

eventsource/v2adapter_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"code.cloudfoundry.org/go-loggregator/v8"
55
"code.cloudfoundry.org/go-loggregator/v8/conversion"
66
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
7+
"code.cloudfoundry.org/lager"
78
"context"
89

910
. "github.com/onsi/ginkgo"
@@ -30,7 +31,9 @@ var _ = Describe("V2adapter", func() {
3031
stubStreamer := newStubStreamer()
3132
stubStreamer.envs = []*loggregator_v2.Envelope{v2Env}
3233
config := &eventsource.FirehoseConfig{
33-
SubscriptionID: "test-subscription",
34+
SubscriptionID: "test-subscription",
35+
StatusMonitorInterval: time.Second * 10,
36+
Logger: lager.NewLogger("test"),
3437
}
3538
firehoseAdapter := eventsource.NewV2Adapter(stubStreamer)
3639
messages := firehoseAdapter.Firehose(config)

eventwriter/splunk.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ package eventwriter
22

33
import (
44
"bytes"
5-
"code.cloudfoundry.org/cfhttp"
6-
"code.cloudfoundry.org/lager"
75
"crypto/tls"
86
"encoding/json"
97
"errors"
108
"fmt"
119
"io/ioutil"
1210
"net/http"
11+
"sync/atomic"
12+
13+
"code.cloudfoundry.org/cfhttp"
14+
"code.cloudfoundry.org/lager"
1315
)
1416

1517
type SplunkConfig struct {
@@ -40,10 +42,10 @@ func NewSplunk(config *SplunkConfig) Writer {
4042
}
4143
}
4244

43-
func (s *splunkClient) Write(events []map[string]interface{}) error {
45+
func (s *splunkClient) Write(events []map[string]interface{}) (error, uint64) {
4446
bodyBuffer := new(bytes.Buffer)
47+
var count uint64 = 0
4548
for i, event := range events {
46-
4749
if event["event"].(map[string]interface{})["info_splunk_index"] != nil {
4850
event["index"] = event["event"].(map[string]interface{})["info_splunk_index"]
4951
} else if s.config.Index != "" {
@@ -57,6 +59,7 @@ func (s *splunkClient) Write(events []map[string]interface{}) error {
5759
eventJson, err := json.Marshal(event)
5860
if err == nil {
5961
bodyBuffer.Write(eventJson)
62+
atomic.AddUint64(&count, 1)
6063
if i < len(events)-1 {
6164
bodyBuffer.Write([]byte("\n\n"))
6265
}
@@ -70,7 +73,7 @@ func (s *splunkClient) Write(events []map[string]interface{}) error {
7073
}
7174
bodyBytes := bodyBuffer.Bytes()
7275

73-
return s.send(&bodyBytes)
76+
return s.send(&bodyBytes), count
7477
}
7578

7679
func (s *splunkClient) send(postBody *[]byte) error {
@@ -93,6 +96,5 @@ func (s *splunkClient) send(postBody *[]byte) error {
9396
responseBody, _ := ioutil.ReadAll(resp.Body)
9497
return errors.New(fmt.Sprintf("Non-ok response code [%d] from splunk: %s", resp.StatusCode, responseBody))
9598
}
96-
9799
return nil
98100
}

eventwriter/splunk_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ var _ = Describe("Splunk", func() {
6565

6666
client := NewSplunk(config)
6767
events := []map[string]interface{}{}
68-
err := client.Write(events)
68+
err, _ := client.Write(events)
6969

7070
Expect(err).To(BeNil())
7171
Expect(capturedRequest).NotTo(BeNil())
@@ -79,7 +79,7 @@ var _ = Describe("Splunk", func() {
7979
It("sets content type to json", func() {
8080
client := NewSplunk(config)
8181
events := []map[string]interface{}{}
82-
err := client.Write(events)
82+
err, _ := client.Write(events)
8383

8484
Expect(err).To(BeNil())
8585
Expect(capturedRequest).NotTo(BeNil())
@@ -101,10 +101,11 @@ var _ = Describe("Splunk", func() {
101101
}}
102102

103103
events := []map[string]interface{}{event1, event2, event3}
104-
err := client.Write(events)
104+
err, sentCount := client.Write(events)
105105

106106
Expect(err).To(BeNil())
107107
Expect(capturedRequest).NotTo(BeNil())
108+
Expect(sentCount).To(Equal(uint64(3)))
108109

109110
expectedPayload := strings.TrimSpace(`
110111
{"event":{"greeting":"hello world"}}
@@ -127,7 +128,7 @@ var _ = Describe("Splunk", func() {
127128
}}
128129

129130
events := []map[string]interface{}{event1, event2}
130-
err := client.Write(events)
131+
err, _ := client.Write(events)
131132

132133
Expect(err).To(BeNil())
133134
Expect(capturedRequest).NotTo(BeNil())
@@ -156,7 +157,7 @@ var _ = Describe("Splunk", func() {
156157
}}
157158

158159
events := []map[string]interface{}{event1, event2}
159-
err := client.Write(events)
160+
err, _ := client.Write(events)
160161

161162
Expect(err).To(BeNil())
162163
Expect(capturedRequest).NotTo(BeNil())
@@ -173,7 +174,7 @@ var _ = Describe("Splunk", func() {
173174
It("Writes to correct endpoint", func() {
174175
client := NewSplunk(config)
175176
events := []map[string]interface{}{}
176-
err := client.Write(events)
177+
err, _ := client.Write(events)
177178

178179
Expect(err).To(BeNil())
179180
Expect(capturedRequest.URL.Path).To(Equal("/services/collector"))
@@ -184,7 +185,7 @@ var _ = Describe("Splunk", func() {
184185
config.Host = ":"
185186
client := NewSplunk(config)
186187
events := []map[string]interface{}{}
187-
err := client.Write(events)
188+
err, _ := client.Write(events)
188189

189190
Expect(err).NotTo(BeNil())
190191
Expect(err.Error()).To(ContainSubstring("protocol"))
@@ -199,7 +200,7 @@ var _ = Describe("Splunk", func() {
199200
config.Host = testServer.URL
200201
client := NewSplunk(config)
201202
events := []map[string]interface{}{}
202-
err := client.Write(events)
203+
err, _ := client.Write(events)
203204

204205
Expect(err).NotTo(BeNil())
205206
Expect(err.Error()).To(ContainSubstring("500"))
@@ -209,7 +210,7 @@ var _ = Describe("Splunk", func() {
209210
config.Host = "foo://example.com"
210211
client := NewSplunk(config)
211212
events := []map[string]interface{}{}
212-
err := client.Write(events)
213+
err, _ := client.Write(events)
213214

214215
Expect(err).NotTo(BeNil())
215216
Expect(err.Error()).To(ContainSubstring("foo"))

eventwriter/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package eventwriter
22

33
type Writer interface {
4-
Write([]map[string]interface{}) error
4+
Write([]map[string]interface{}) (error, uint64)
55
}

0 commit comments

Comments
 (0)