Skip to content

Commit 3b96fe7

Browse files
authored
Improve efficiency in event source monitoring and AddAppInfo param clean up (#254)
* clean event source and restructure AddAppInfo param
1 parent 72ac4e9 commit 3b96fe7

File tree

7 files changed

+47
-49
lines changed

7 files changed

+47
-49
lines changed

.circleci/ci_nozzle_manifest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ applications:
1818
JOB_NAME: splunk-nozzle
1919
JOB_INDEX: -1
2020
JOB_HOST: localhost
21-
ADD_APP_INFO: true
21+
ADD_APP_INFO: AppName,OrgName,OrgGuid,SpaceName,SpaceGuid
2222
IGNORE_MISSING_APP: true
2323
MISSING_APP_CACHE_INVALIDATE_TTL: 3600s
2424
APP_CACHE_INVALIDATE_TTL: 86440s

events/events.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,21 @@ type Event struct {
2020

2121
type Config struct {
2222
SelectedEvents string
23-
AddAppName bool
23+
AddAppName bool
2424
AddOrgName bool
2525
AddOrgGuid bool
2626
AddSpaceName bool
2727
AddSpaceGuid bool
2828
}
2929

30+
var AppMetadata = []string{
31+
"AppName",
32+
"OrgName",
33+
"OrgGuid",
34+
"SpaceName",
35+
"SpaceGuid",
36+
}
37+
3038
func HttpStartStop(msg *events.Envelope) *Event {
3139
httpStartStop := msg.GetHttpStartStop()
3240

@@ -258,3 +266,7 @@ func ParseExtraFields(extraEventsString string) (map[string]string, error) {
258266
}
259267
return extraEvents, nil
260268
}
269+
270+
func AuthorizedMetadata() string {
271+
return strings.Join(AppMetadata, ", ")
272+
}

eventsource/v2adapter.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func NewV2Adapter(s Streamer) V2Adapter {
3333
func (a V2Adapter) Firehose(config *FirehoseConfig) chan *events.Envelope {
3434
ctx := context.Background()
3535
var v1msgs = make(chan *events.Envelope, 10000)
36-
var v2msgs = make(chan *loggregator_v2.Envelope, 10000)
36+
var v2msgs = make(chan []*loggregator_v2.Envelope, 10000)
3737
es := a.streamer.Stream(ctx, &loggregator_v2.EgressBatchRequest{
3838
ShardId: config.SubscriptionID,
3939
Selectors: []*loggregator_v2.Selector{
@@ -67,9 +67,8 @@ func (a V2Adapter) Firehose(config *FirehoseConfig) chan *events.Envelope {
6767

6868
go func() {
6969
for ctx.Err() == nil {
70-
for _, e := range es() {
71-
v2msgs <- e
72-
}
70+
e := es()
71+
v2msgs <- e
7372
}
7473
}()
7574

@@ -81,27 +80,30 @@ func (a V2Adapter) Firehose(config *FirehoseConfig) chan *events.Envelope {
8180
for ctx.Err() == nil {
8281
select {
8382
case <-timer.C:
84-
config.Logger.Info("Data_Flow_Monitoring", lager.Data{"events_pre_processing": len(v2msgs), "events_in_process": len(v1msgs)})
8583
config.Logger.Info("Event_Count", lager.Data{"event_count_received": receivedCount})
8684
timer.Reset(config.StatusMonitorInterval)
8785
receivedCount = 0
8886
default:
8987
}
9088
select {
91-
case e := <-v2msgs:
92-
atomic.AddUint64(&receivedCount, 1)
93-
//// ToV1 converts v2 envelopes down to v1 envelopes.
94-
for _, v1e := range conversion.ToV1(e) {
95-
v1msgs <- v1e
89+
case eArray := <-v2msgs:
90+
atomic.AddUint64(&receivedCount, uint64(len(eArray)))
91+
for _, e := range eArray {
92+
//// ToV1 converts v2 envelopes down to v1 envelopes.
93+
for _, v1e := range conversion.ToV1(e) {
94+
v1msgs <- v1e
95+
}
9696
}
9797
default:
9898
}
9999
}
100100
} else {
101101
for ctx.Err() == nil {
102-
e := <-v2msgs
103-
for _, v1e := range conversion.ToV1(e) {
104-
v1msgs <- v1e
102+
eArray := <-v2msgs
103+
for _, e := range eArray {
104+
for _, v1e := range conversion.ToV1(e) {
105+
v1msgs <- v1e
106+
}
105107
}
106108
}
107109
}

splunknozzle/config.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,13 @@ type Config struct {
3131
SubscriptionID string `json:"subscription-id"`
3232
KeepAlive time.Duration `json:"keep-alive"`
3333

34-
AddAppInfo bool `json:"add-app-info"`
34+
AddAppInfo string `json:"add-app-info"`
3535
IgnoreMissingApps bool `json:"ignore-missing-apps"`
3636
MissingAppCacheTTL time.Duration `json:"missing-app-cache-ttl"`
3737
AppCacheTTL time.Duration `json:"app-cache-ttl"`
3838
OrgSpaceCacheTTL time.Duration `json:"org-space-cache-ttl"`
3939
AppLimits int `json:"app-limits"`
4040

41-
// Add configuration to select interested fields from app info.
42-
AddAppName bool `json:"add-app-name"`
43-
AddOrgName bool `json:"add-org-name"`
44-
AddOrgGuid bool `json:"add-org-guid"`
45-
AddSpaceName bool `json:"add-space-name"`
46-
AddSpaceGuid bool `json:"add-space-guid"`
47-
4841
BoltDBPath string `json:"boltdb-path"`
4942
WantedEvents string `json:"wanted-events"`
5043
ExtraFields string `json:"extra-fields"`
@@ -111,18 +104,8 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config {
111104
kingpin.Flag("firehose-keep-alive", "Keep Alive duration for the firehose consumer").
112105
OverrideDefaultFromEnvar("FIREHOSE_KEEP_ALIVE").Default("25s").DurationVar(&c.KeepAlive)
113106

114-
kingpin.Flag("add-app-info", "Query API to fetch app details").
115-
OverrideDefaultFromEnvar("ADD_APP_INFO").Default("false").BoolVar(&c.AddAppInfo)
116-
kingpin.Flag("add-app-name", "Add app name from app cache").
117-
OverrideDefaultFromEnvar("ADD_APP_NAME").Default("true").BoolVar(&c.AddAppName)
118-
kingpin.Flag("add-org-name", "Add org name from app cache").
119-
OverrideDefaultFromEnvar("ADD_ORG_NAME").Default("true").BoolVar(&c.AddOrgName)
120-
kingpin.Flag("add-org-guid", "Add org guid from app cache").
121-
OverrideDefaultFromEnvar("ADD_ORG_GUID").Default("true").BoolVar(&c.AddOrgGuid)
122-
kingpin.Flag("add-space-name", "Add space name from app cache").
123-
OverrideDefaultFromEnvar("ADD_SPACE_NAME").Default("true").BoolVar(&c.AddSpaceName)
124-
kingpin.Flag("add-space-guid", "Add space guid from app cache").
125-
OverrideDefaultFromEnvar("ADD_SPACE_GUID").Default("true").BoolVar(&c.AddSpaceGuid)
107+
kingpin.Flag("add-app-info", fmt.Sprintf("Comma separated list of app metadata to enrich event. Valid options are %s", events.AuthorizedMetadata())).
108+
OverrideDefaultFromEnvar("ADD_APP_INFO").Default("").StringVar(&c.AddAppInfo)
126109

127110
kingpin.Flag("ignore-missing-app", "If app is missing, stop repeatedly querying app info from PCF").
128111
OverrideDefaultFromEnvar("IGNORE_MISSING_APP").Default("true").BoolVar(&c.IgnoreMissingApps)

splunknozzle/config_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var _ = Describe("Config", func() {
4747
os.Setenv("FIREHOSE_SUBSCRIPTION_ID", "my-nozzle")
4848
os.Setenv("FIREHOSE_KEEP_ALIVE", "42s")
4949

50-
os.Setenv("ADD_APP_INFO", "true")
50+
os.Setenv("ADD_APP_INFO", "AppName")
5151
os.Setenv("IGNORE_MISSING_APP", "true")
5252
os.Setenv("MISSING_APP_CACHE_INVALIDATE_TTL", "100s")
5353
os.Setenv("APP_CACHE_INVALIDATE_TTL", "100s")
@@ -89,7 +89,7 @@ var _ = Describe("Config", func() {
8989
Expect(c.SubscriptionID).To(Equal("my-nozzle"))
9090
Expect(c.KeepAlive).To(Equal(42 * time.Second))
9191

92-
Expect(c.AddAppInfo).To(BeTrue())
92+
Expect(c.AddAppInfo).To(Equal("AppName"))
9393
Expect(c.IgnoreMissingApps).To(BeTrue())
9494
Expect(c.MissingAppCacheTTL).To(Equal(100 * time.Second))
9595
Expect(c.AppCacheTTL).To(Equal(100 * time.Second))
@@ -128,7 +128,7 @@ var _ = Describe("Config", func() {
128128
Expect(c.SubscriptionID).To(Equal("splunk-firehose"))
129129
Expect(c.KeepAlive).To(Equal(25 * time.Second))
130130

131-
Expect(c.AddAppInfo).To(BeFalse())
131+
Expect(c.AddAppInfo).To(Equal(""))
132132
Expect(c.IgnoreMissingApps).To(BeTrue())
133133
Expect(c.MissingAppCacheTTL).To(Equal(0 * time.Second))
134134
Expect(c.AppCacheTTL).To(Equal(0 * time.Second))
@@ -179,7 +179,7 @@ var _ = Describe("Config", func() {
179179
"--skip-ssl-validation-splunk",
180180
"--subscription-id=my-nozzlec",
181181
"--firehose-keep-alive=24s",
182-
"--add-app-info",
182+
"--add-app-info=OrgName",
183183
"--ignore-missing-app",
184184
"--missing-app-cache-invalidate-ttl=15s",
185185
"--app-cache-invalidate-ttl=15s",
@@ -222,7 +222,7 @@ var _ = Describe("Config", func() {
222222
Expect(c.SubscriptionID).To(Equal("my-nozzlec"))
223223
Expect(c.KeepAlive).To(Equal(24 * time.Second))
224224

225-
Expect(c.AddAppInfo).To(BeTrue())
225+
Expect(c.AddAppInfo).To(Equal("OrgName"))
226226
Expect(c.IgnoreMissingApps).To(BeTrue())
227227
Expect(c.MissingAppCacheTTL).To(Equal(15 * time.Second))
228228
Expect(c.AppCacheTTL).To(Equal(15 * time.Second))

splunknozzle/nozzle.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ func NewSplunkFirehoseNozzle(config *Config, logger lager.Logger) *SplunkFirehos
3939

4040
// EventRouter creates EventRouter object and setup routes for interested events
4141
func (s *SplunkFirehoseNozzle) EventRouter(cache cache.Cache, eventSink eventsink.Sink) (eventrouter.Router, error) {
42+
LowerAddAppInfo := strings.ToLower(s.config.AddAppInfo)
4243
config := &eventrouter.Config{
4344
SelectedEvents: s.config.WantedEvents,
44-
AddAppName: s.config.AddAppName,
45-
AddOrgName: s.config.AddOrgName,
46-
AddOrgGuid: s.config.AddOrgGuid,
47-
AddSpaceName: s.config.AddSpaceName,
48-
AddSpaceGuid: s.config.AddSpaceGuid,
45+
AddAppName: strings.Contains(LowerAddAppInfo, "appname"),
46+
AddOrgName: strings.Contains(LowerAddAppInfo, "orgname"),
47+
AddOrgGuid: strings.Contains(LowerAddAppInfo, "orgguid"),
48+
AddSpaceName: strings.Contains(LowerAddAppInfo, "spacename"),
49+
AddSpaceGuid: strings.Contains(LowerAddAppInfo, "spaceguid"),
4950
}
5051
return eventrouter.New(cache, eventSink, config)
5152
}
@@ -66,7 +67,7 @@ func (s *SplunkFirehoseNozzle) PCFClient() (*cfclient.Client, error) {
6667

6768
// AppCache creates in-memory cache or boltDB cache
6869
func (s *SplunkFirehoseNozzle) AppCache(client cache.AppClient) (cache.Cache, error) {
69-
if s.config.AddAppInfo {
70+
if s.config.AddAppInfo != "" {
7071
c := cache.BoltdbConfig{
7172
Path: s.config.BoltDBPath,
7273
IgnoreMissingApps: s.config.IgnoreMissingApps,

splunknozzle/nozzle_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func newConfig() *Config {
3434
SubscriptionID: "splunk-sub",
3535
KeepAlive: time.Second * 25,
3636

37-
AddAppInfo: true,
37+
AddAppInfo: "AppName,OrgName,OrgGuid,SpaceName,SpaceGuid",
3838
IgnoreMissingApps: true,
3939
MissingAppCacheTTL: time.Second * 30,
4040
AppCacheTTL: time.Second * 30,
@@ -106,7 +106,7 @@ var _ = Describe("SplunkFirehoseNozzle", func() {
106106
_, err := noz.AppCache(client)
107107
Ω(err).ShouldNot(HaveOccurred())
108108

109-
config.AddAppInfo = false
109+
config.AddAppInfo = ""
110110
_, err = noz.AppCache(client)
111111
Ω(err).ShouldNot(HaveOccurred())
112112
})
@@ -146,7 +146,7 @@ var _ = Describe("SplunkFirehoseNozzle", func() {
146146
})
147147

148148
It("Run with cloudcontroller", func() {
149-
config.AddAppInfo = false
149+
config.AddAppInfo = ""
150150
port := 9911
151151
cc := testing.NewCloudControllerMock(port)
152152
started := make(chan struct{})

0 commit comments

Comments
 (0)