Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cmd/syslog-agent/app/syslog_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ func NewSyslogAgent(
cfg.WarnOnInvalidDrains,
l,
)
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata)
cupsFetcher = bindings.NewDrainParamParser(cupsFetcher, cfg.DefaultDrainMetadata, l)
}

aggregateFetcher := bindings.NewAggregateDrainFetcher(cfg.AggregateDrainURLs, cacheClient)
bindingManager := binding.NewManager(
cupsFetcher,
bindings.NewDrainParamParser(aggregateFetcher, cfg.DefaultDrainMetadata),
bindings.NewDrainParamParser(aggregateFetcher, cfg.DefaultDrainMetadata, l),
connector,
m,
cfg.Cache.PollingInterval,
Expand Down
45 changes: 37 additions & 8 deletions src/pkg/egress/syslog/filtering_drain_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package syslog

import (
"errors"
"strings"

"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
Expand Down Expand Up @@ -50,7 +51,12 @@ func (w *FilteringDrainWriter) Write(env *loggregator_v2.Envelope) error {
}
}
if env.GetLog() != nil {
if sendsLogs(w.binding.DrainData) {
sourceType, ok := env.GetTags()["source_type"]
if !ok {
// Default to sending logs if no source_type tag is present
sourceType = ""
}
if sendsLogs(w.binding.DrainData, w.binding.LogFilter, sourceType) {
return w.writer.Write(env)
}
}
Expand All @@ -63,17 +69,40 @@ func (w *FilteringDrainWriter) Write(env *loggregator_v2.Envelope) error {
return nil
}

func sendsLogs(drainData DrainData) bool {
switch drainData {
case LOGS:
// shouldIncludeLog determines if a log with the given sourceTypeTag should be forwarded
func shouldIncludeLog(logFilter *LogTypeSet, sourceTypeTag string) bool {
// Empty filter or missing source type means no filtering
if logFilter == nil || sourceTypeTag == "" {
return true
case LOGS_AND_METRICS:
return true
case LOGS_NO_EVENTS:
}

// Find the first "/" to extract prefix
idx := strings.IndexByte(sourceTypeTag, '/')
prefix := sourceTypeTag
if idx != -1 {
prefix = sourceTypeTag[:idx]
}

// Prefer map lookup over switch for performance
logType := LogType(prefix)
if !logType.IsValid() {
// Unknown log type, default to not filtering
return true
default:
}

return logFilter.Contains(logType)
}

func sendsLogs(drainData DrainData, logFilter *LogTypeSet, sourceTypeTag string) bool {
if drainData != LOGS && drainData != LOGS_AND_METRICS && drainData != LOGS_NO_EVENTS {
return false
}

if shouldIncludeLog(logFilter, sourceTypeTag) {
return true
}

return false
}

func sendsMetrics(drainData DrainData) bool {
Expand Down
137 changes: 137 additions & 0 deletions src/pkg/egress/syslog/filtering_drain_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,143 @@ var _ = Describe("Filtering Drain Writer", func() {
_, err := syslog.NewFilteringDrainWriter(binding, &fakeWriter{})
Expect(err).To(HaveOccurred())
})

It("sends logs when source_type tag is missing", func() {
binding := syslog.Binding{
DrainData: syslog.LOGS,
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelope := &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte("test log"),
},
},
Tags: map[string]string{
// source_type tag is intentionally missing
},
}

err = drainWriter.Write(envelope)

Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))
})

It("filters logs based on include filter - includes only APP logs", func() {
appFilter := syslog.LogTypeSet{syslog.LOG_APP: struct{}{}}
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: &appFilter,
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelopes := []*loggregator_v2.Envelope{
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("app log")},
},
Tags: map[string]string{"source_type": "APP/PROC/WEB/0"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("rtr log")},
},
Tags: map[string]string{"source_type": "RTR/1"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("stg log")},
},
Tags: map[string]string{"source_type": "STG/0"},
},
}

for _, envelope := range envelopes {
err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
}

// Only APP log should be sent
Expect(fakeWriter.received).To(Equal(1))
})

It("filters logs based on exclude filter - excludes RTR logs", func() {
// Include APP and STG, effectively excluding RTR
includeFilter := syslog.LogTypeSet{
syslog.LOG_APP: struct{}{},
syslog.LOG_STG: struct{}{},
}
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: &includeFilter,
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelopes := []*loggregator_v2.Envelope{
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("app log")},
},
Tags: map[string]string{"source_type": "APP/PROC/WEB/0"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("rtr log")},
},
Tags: map[string]string{"source_type": "RTR/1"},
},
{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{Payload: []byte("stg log")},
},
Tags: map[string]string{"source_type": "STG/0"},
},
}

for _, envelope := range envelopes {
err = drainWriter.Write(envelope)
Expect(err).NotTo(HaveOccurred())
}

// APP and STG logs should be sent, RTR should be filtered out
Expect(fakeWriter.received).To(Equal(2))
})

It("sends logs with unknown source_type prefix when filter is set", func() {
appFilter := syslog.LogTypeSet{syslog.LOG_APP: struct{}{}}
binding := syslog.Binding{
DrainData: syslog.LOGS,
LogFilter: &appFilter,
}
fakeWriter := &fakeWriter{}
drainWriter, err := syslog.NewFilteringDrainWriter(binding, fakeWriter)
Expect(err).NotTo(HaveOccurred())

envelope := &loggregator_v2.Envelope{
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte("test log"),
},
},
Tags: map[string]string{
"source_type": "UNKNOWN/some/path",
},
}

err = drainWriter.Write(envelope)

// Should send the log because unknown types default to being included
Expect(err).NotTo(HaveOccurred())
Expect(fakeWriter.received).To(Equal(1))
})
})

type fakeWriter struct {
Expand Down
55 changes: 55 additions & 0 deletions src/pkg/egress/syslog/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package syslog

// LogType defines the log types used within Cloud Foundry
// Their order in the code is as documented in https://docs.cloudfoundry.org/devguide/deploy-apps/streaming-logs.html#format
type LogType string

const (
LOG_API LogType = "API"
LOG_STG LogType = "STG"
LOG_RTR LogType = "RTR"
LOG_LGR LogType = "LGR"
LOG_APP LogType = "APP"
LOG_SSH LogType = "SSH"
LOG_CELL LogType = "CELL"
)

// validLogTypes contains LogType prefixes for efficient lookup
var validLogTypes = map[LogType]struct{}{
LOG_API: {},
LOG_STG: {},
LOG_RTR: {},
LOG_LGR: {},
LOG_APP: {},
LOG_SSH: {},
LOG_CELL: {},
}

// IsValid checks if the provided LogType is valid
func (lt LogType) IsValid() bool {
_, ok := validLogTypes[lt]
return ok
}

// AllLogTypes returns all valid log types
func AllLogTypes() []LogType {
types := make([]LogType, 0, len(validLogTypes))
for t := range validLogTypes {
types = append(types, t)
}
return types
}

// LogTypeSet is a set of LogTypes for efficient membership checking
type LogTypeSet map[LogType]struct{}

// Add adds a LogType to the set
func (s LogTypeSet) Add(lt LogType) {
s[lt] = struct{}{}
}

// Contains checks if the set contains a LogType
func (s LogTypeSet) Contains(lt LogType) bool {
_, exists := s[lt]
return exists
}
1 change: 1 addition & 0 deletions src/pkg/egress/syslog/syslog_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Binding struct {
DrainData DrainData `json:"type,omitempty"`
OmitMetadata bool
InternalTls bool
LogFilter *LogTypeSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this has to be a pointer.

Copy link
Contributor Author

@jorbaum jorbaum Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that at first, but in manager.go line 45 Binding is used as a map key:

	sourceDrainMap    map[string]map[syslog.Binding]drainHolder

My understanding is that maps in Go are not comparable, so adding LogTypeSet (which is a map[LogType]struct{}) to the Binding struct makes it non-comparable.

AFAICS the alternative to using a pointer would be to implement a custom comparable key for the map, but using a pointer seemed simpler to me.

}

type Drain struct {
Expand Down
Loading
Loading