Skip to content

Commit 6993b99

Browse files
authored
RLP gateway logging, error channel, configurable retry. change es channel size to 10000 (#239)
1 parent 7c35279 commit 6993b99

File tree

110 files changed

+2178
-894
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2178
-894
lines changed

.circleci/ci_nozzle_manifest.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ applications:
77
cmd: splunk-firehose-nozzle
88
env:
99
GOPACKAGENAME: main
10-
API_ENDPOINT:
11-
CLIENT_ID:
12-
CLIENT_SECRET:
13-
SPLUNK_HOST:
14-
SPLUNK_TOKEN:
15-
SPLUNK_INDEX:
10+
API_ENDPOINT:
11+
CLIENT_ID:
12+
CLIENT_SECRET:
13+
SPLUNK_HOST:
14+
SPLUNK_TOKEN:
15+
SPLUNK_INDEX: main
1616
SKIP_SSL_VALIDATION_CF: true
1717
SKIP_SSL_VALIDATION_SPLUNK: true
1818
JOB_NAME: splunk-nozzle

.circleci/config.yml

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,6 @@ workflows:
141141
- deploy-nozzle:
142142
requires:
143143
- build
144-
filters:
145-
branches:
146-
only:
147-
- develop
148-
- master
149144
# - tile-builder:
150145
# requires:
151146
# - deploy-nozzle
@@ -155,11 +150,7 @@ workflows:
155150
- execute_tests:
156151
requires:
157152
- deploy-nozzle
158-
filters:
159-
branches:
160-
only:
161-
- develop
162-
- master
153+
163154

164155
# - execute_perf_tests:
165156
# requires:

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ testing/integration/venv/
1818
testing/integration/config/local.ini
1919
*.pyc
2020
testing/integration/config/env.json
21+
testing/manual_perf/data_gen_manifest.yml
22+

cache/boltdb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ func (c *Boltdb) Open() error {
7878
// Open bolt db
7979
db, err := bolt.Open(c.config.Path, 0600, &bolt.Options{Timeout: 5 * time.Second})
8080
if err != nil {
81-
c.config.Logger.Error("Fail to open boltdb: ", err)
81+
c.config.Logger.Error("Failed to open boltdb: ", err)
8282
return err
8383
}
8484
c.appdb = db
8585

8686
if err := c.createBucket(); err != nil {
87-
c.config.Logger.Error("Fail to create bucket: ", err)
87+
c.config.Logger.Error("Failed to create bucket: ", err)
8888
return err
8989
}
9090

eventsource/firehose.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
package eventsource
22

33
import (
4-
"code.cloudfoundry.org/go-loggregator"
4+
"code.cloudfoundry.org/go-loggregator/v8"
5+
"code.cloudfoundry.org/lager"
56
"github.com/cloudfoundry/sonde-go/events"
7+
"log"
68
"net/http"
79
"time"
810
)
911

1012
// FirehoseConfig struct with 4 fields of different types.
1113
type FirehoseConfig struct {
12-
KeepAlive time.Duration
13-
SkipSSL bool
14-
Endpoint string
15-
SubscriptionID string
14+
KeepAlive time.Duration
15+
SkipSSL bool
16+
Endpoint string
17+
SubscriptionID string
18+
GatewayErrChanAddr *chan error
19+
GatewayLoggerAddr *log.Logger
20+
GatewayMaxRetries int
21+
Logger lager.Logger
1622
}
1723

1824
// Doer is used to make HTTP requests to the RLP Gateway.
@@ -32,6 +38,9 @@ func NewFirehose(tokenClient doer, config *FirehoseConfig) *Firehose {
3238
c := loggregator.NewRLPGatewayClient(
3339
config.Endpoint,
3440
loggregator.WithRLPGatewayHTTPClient(tokenClient),
41+
loggregator.WithRLPGatewayClientLogger(config.GatewayLoggerAddr),
42+
loggregator.WithRLPGatewayErrChan(*config.GatewayErrChanAddr),
43+
loggregator.WithRLPGatewayMaxRetries(config.GatewayMaxRetries),
3544
)
3645

3746
f := &Firehose{
@@ -55,5 +64,5 @@ func (f *Firehose) Close() error {
5564

5665
// Read reads envelope stream
5766
func (f *Firehose) Read() <-chan *events.Envelope {
58-
return f.v2.Firehose(f.config.SubscriptionID)
67+
return f.v2.Firehose(f.config)
5968
}

eventsource/v2adapter.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package eventsource
22

33
import (
4-
"code.cloudfoundry.org/go-loggregator"
5-
"code.cloudfoundry.org/go-loggregator/conversion"
6-
"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
4+
"code.cloudfoundry.org/go-loggregator/v8"
5+
"code.cloudfoundry.org/go-loggregator/v8/conversion"
6+
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
77
"context"
88
"github.com/cloudfoundry/sonde-go/events"
99
)
@@ -27,11 +27,12 @@ func NewV2Adapter(s Streamer) V2Adapter {
2727
}
2828

2929
// Firehose returns only selected event stream
30-
func (a V2Adapter) Firehose(subscriptionID string) chan *events.Envelope {
30+
func (a V2Adapter) Firehose(config *FirehoseConfig) chan *events.Envelope {
3131
ctx := context.Background()
32-
32+
var v1msgs = make(chan *events.Envelope, 10000)
33+
var v2msgs = make(chan *loggregator_v2.Envelope, 10000)
3334
es := a.streamer.Stream(ctx, &loggregator_v2.EgressBatchRequest{
34-
ShardId: subscriptionID,
35+
ShardId: config.SubscriptionID,
3536
Selectors: []*loggregator_v2.Selector{
3637
{
3738
Message: &loggregator_v2.Selector_Log{
@@ -61,17 +62,23 @@ func (a V2Adapter) Firehose(subscriptionID string) chan *events.Envelope {
6162
},
6263
})
6364

64-
var msgs = make(chan *events.Envelope, 100)
6565
go func() {
6666
for ctx.Err() == nil {
6767
for _, e := range es() {
68-
// ToV1 converts v2 envelopes down to v1 envelopes.
69-
for _, v1e := range conversion.ToV1(e) {
70-
msgs <- v1e
71-
}
68+
v2msgs <- e
69+
}
70+
}
71+
}()
72+
73+
go func() {
74+
for ctx.Err() == nil {
75+
e := <-v2msgs
76+
//// ToV1 converts v2 envelopes down to v1 envelopes.
77+
for _, v1e := range conversion.ToV1(e) {
78+
v1msgs <- v1e
7279
}
7380
}
7481
}()
7582

76-
return msgs
83+
return v1msgs
7784
}

eventsource/v2adapter_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package eventsource_test
22

33
import (
4-
"code.cloudfoundry.org/go-loggregator"
5-
"code.cloudfoundry.org/go-loggregator/conversion"
6-
"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
4+
"code.cloudfoundry.org/go-loggregator/v8"
5+
"code.cloudfoundry.org/go-loggregator/v8/conversion"
6+
"code.cloudfoundry.org/go-loggregator/v8/rpc/loggregator_v2"
77
"context"
88

99
. "github.com/onsi/ginkgo"
@@ -29,9 +29,11 @@ var _ = Describe("V2adapter", func() {
2929

3030
stubStreamer := newStubStreamer()
3131
stubStreamer.envs = []*loggregator_v2.Envelope{v2Env}
32-
32+
config := &eventsource.FirehoseConfig{
33+
SubscriptionID: "test-subscription",
34+
}
3335
firehoseAdapter := eventsource.NewV2Adapter(stubStreamer)
34-
messages := firehoseAdapter.Firehose("test-subscription")
36+
messages := firehoseAdapter.Firehose(config)
3537

3638
expected := conversion.ToV1(v2Env)
3739
Eventually(messages).Should(Receive(Equal(expected[0])))

eventwriter/splunk.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package eventwriter
22

33
import (
44
"bytes"
5+
"code.cloudfoundry.org/cfhttp"
6+
"code.cloudfoundry.org/lager"
57
"crypto/tls"
68
"encoding/json"
79
"errors"
810
"fmt"
911
"io/ioutil"
1012
"net/http"
11-
12-
"code.cloudfoundry.org/cfhttp"
13-
"code.cloudfoundry.org/lager"
1413
)
1514

1615
type SplunkConfig struct {

glide.lock

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ func main() {
3131
logger.Info("Apps are not being cached. When apps are not cached, the org and space caching TTL is ineffective")
3232
}
3333

34-
splunkNozzle := splunknozzle.NewSplunkFirehoseNozzle(config)
35-
err := splunkNozzle.Run(shutdownChan, logger)
34+
splunkNozzle := splunknozzle.NewSplunkFirehoseNozzle(config, logger)
35+
err := splunkNozzle.Run(shutdownChan)
3636
if err != nil {
3737
logger.Error("Failed to run splunk-firehose-nozzle", err)
3838
}

0 commit comments

Comments
 (0)