Skip to content

Commit 8dc0697

Browse files
committed
TUN-7132 TUN-7136: Add filter support for streaming logs
Additionally adds similar support in cloudflared tail to provide filters for events and log level.
1 parent 5dbf76a commit 8dc0697

File tree

6 files changed

+376
-52
lines changed

6 files changed

+376
-52
lines changed

cmd/cloudflared/tail/cmd.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ func Command() *cli.Command {
3939
Value: "",
4040
EnvVars: []string{"TUNNEL_MANAGEMENT_CONNECTOR"},
4141
},
42+
&cli.StringSliceFlag{
43+
Name: "event",
44+
Usage: "Filter by specific Events (cloudflared, http, tcp, udp) otherwise, defaults to send all events",
45+
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_EVENTS"},
46+
},
47+
&cli.StringFlag{
48+
Name: "level",
49+
Usage: "Filter by specific log levels (debug, info, warn, error)",
50+
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_LEVEL"},
51+
Value: "debug",
52+
},
4253
&cli.StringFlag{
4354
Name: "token",
4455
Usage: "Access token for a specific tunnel",
@@ -61,7 +72,7 @@ func Command() *cli.Command {
6172
&cli.StringFlag{
6273
Name: logger.LogLevelFlag,
6374
Value: "info",
64-
Usage: "Application logging level {debug, info, warn, error, fatal}. ",
75+
Usage: "Application logging level {debug, info, warn, error, fatal}",
6576
EnvVars: []string{"TUNNEL_LOGLEVEL"},
6677
},
6778
},
@@ -113,6 +124,41 @@ func createLogger(c *cli.Context) *zerolog.Logger {
113124
return &log
114125
}
115126

127+
// parseFilters will attempt to parse provided filters to send to with the EventStartStreaming
128+
func parseFilters(c *cli.Context) (*management.StreamingFilters, error) {
129+
var level *management.LogLevel
130+
var events []management.LogEventType
131+
132+
argLevel := c.String("level")
133+
argEvents := c.StringSlice("event")
134+
135+
if argLevel != "" {
136+
l, ok := management.ParseLogLevel(argLevel)
137+
if !ok {
138+
return nil, fmt.Errorf("invalid --level filter provided, please use one of the following Log Levels: debug, info, warn, error")
139+
}
140+
level = &l
141+
}
142+
143+
for _, v := range argEvents {
144+
t, ok := management.ParseLogEventType(v)
145+
if !ok {
146+
return nil, fmt.Errorf("invalid --event filter provided, please use one of the following EventTypes: cloudflared, http, tcp, udp")
147+
}
148+
events = append(events, t)
149+
}
150+
151+
if level == nil && len(events) == 0 {
152+
// When no filters are provided, do not return a StreamingFilters struct
153+
return nil, nil
154+
}
155+
156+
return &management.StreamingFilters{
157+
Level: level,
158+
Events: events,
159+
}, nil
160+
}
161+
116162
// Run implements a foreground runner
117163
func Run(c *cli.Context) error {
118164
log := createLogger(c)
@@ -121,6 +167,12 @@ func Run(c *cli.Context) error {
121167
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
122168
defer signal.Stop(signals)
123169

170+
filters, err := parseFilters(c)
171+
if err != nil {
172+
log.Error().Err(err).Msgf("invalid filters provided")
173+
return nil
174+
}
175+
124176
managementHostname := c.String("management-hostname")
125177
token := c.String("token")
126178
u := url.URL{Scheme: "wss", Host: managementHostname, Path: "/logs", RawQuery: "access_token=" + token}
@@ -148,6 +200,7 @@ func Run(c *cli.Context) error {
148200
// Once connection is established, send start_streaming event to begin receiving logs
149201
err = management.WriteEvent(conn, ctx, &management.EventStartStreaming{
150202
ClientEvent: management.ClientEvent{Type: management.StartStreaming},
203+
Filters: filters,
151204
})
152205
if err != nil {
153206
log.Error().Err(err).Msg("unable to request logs from management tunnel")

management/events.go

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ type ClientEvent struct {
4848
// Additional filters can be provided to augment the log events requested.
4949
type EventStartStreaming struct {
5050
ClientEvent
51-
Filters []string `json:"filters"`
51+
Filters *StreamingFilters `json:"filters,omitempty"`
52+
}
53+
54+
type StreamingFilters struct {
55+
Events []LogEventType `json:"events,omitempty"`
56+
Level *LogLevel `json:"level,omitempty"`
5257
}
5358

5459
// EventStopStreaming signifies that the client wishes to halt receiving log events.
@@ -65,7 +70,7 @@ type EventLog struct {
6570
// LogEventType is the way that logging messages are able to be filtered.
6671
// Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
6772
// the Cloudflared-related events.
68-
type LogEventType int
73+
type LogEventType int8
6974

7075
const (
7176
// Cloudflared events are signficant to cloudflared operations like connection state changes.
@@ -76,6 +81,20 @@ const (
7681
UDP
7782
)
7883

84+
func ParseLogEventType(s string) (LogEventType, bool) {
85+
switch s {
86+
case "cloudflared":
87+
return Cloudflared, true
88+
case "http":
89+
return HTTP, true
90+
case "tcp":
91+
return TCP, true
92+
case "udp":
93+
return UDP, true
94+
}
95+
return -1, false
96+
}
97+
7998
func (l LogEventType) String() string {
8099
switch l {
81100
case Cloudflared:
@@ -91,18 +110,79 @@ func (l LogEventType) String() string {
91110
}
92111
}
93112

113+
func (l LogEventType) MarshalJSON() ([]byte, error) {
114+
return json.Marshal(l.String())
115+
}
116+
117+
func (e *LogEventType) UnmarshalJSON(data []byte) error {
118+
var s string
119+
if err := json.Unmarshal(data, &s); err != nil {
120+
return errors.New("unable to unmarshal LogEventType string")
121+
}
122+
if event, ok := ParseLogEventType(s); ok {
123+
*e = event
124+
return nil
125+
}
126+
return errors.New("unable to unmarshal LogEventType")
127+
}
128+
94129
// LogLevel corresponds to the zerolog logging levels
95130
// "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least
96131
// the the first two are limited to failure conditions that lead to cloudflared shutting down.
97-
type LogLevel string
132+
type LogLevel int8
98133

99134
const (
100-
Debug LogLevel = "debug"
101-
Info LogLevel = "info"
102-
Warn LogLevel = "warn"
103-
Error LogLevel = "error"
135+
Debug LogLevel = 0
136+
Info LogLevel = 1
137+
Warn LogLevel = 2
138+
Error LogLevel = 3
104139
)
105140

141+
func ParseLogLevel(l string) (LogLevel, bool) {
142+
switch l {
143+
case "debug":
144+
return Debug, true
145+
case "info":
146+
return Info, true
147+
case "warn":
148+
return Warn, true
149+
case "error":
150+
return Error, true
151+
}
152+
return -1, false
153+
}
154+
155+
func (l LogLevel) String() string {
156+
switch l {
157+
case Debug:
158+
return "debug"
159+
case Info:
160+
return "info"
161+
case Warn:
162+
return "warn"
163+
case Error:
164+
return "error"
165+
default:
166+
return ""
167+
}
168+
}
169+
170+
func (l LogLevel) MarshalJSON() ([]byte, error) {
171+
return json.Marshal(l.String())
172+
}
173+
174+
func (l *LogLevel) UnmarshalJSON(data []byte) error {
175+
var s string
176+
if err := json.Unmarshal(data, &s); err != nil {
177+
return errors.New("unable to unmarshal LogLevel string")
178+
}
179+
if level, ok := ParseLogLevel(s); ok {
180+
*l = level
181+
return nil
182+
}
183+
return fmt.Errorf("unable to unmarshal LogLevel")
184+
}
185+
106186
const (
107187
// TimeKey aligns with the zerolog.TimeFieldName
108188
TimeKey = "time"

management/events_test.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,83 @@ import (
1111
"github.com/cloudflare/cloudflared/internal/test"
1212
)
1313

14+
var (
15+
debugLevel *LogLevel
16+
infoLevel *LogLevel
17+
warnLevel *LogLevel
18+
errorLevel *LogLevel
19+
)
20+
21+
func init() {
22+
// created here because we can't do a reference to a const enum, i.e. &Info
23+
debugLevel := new(LogLevel)
24+
*debugLevel = Debug
25+
infoLevel := new(LogLevel)
26+
*infoLevel = Info
27+
warnLevel := new(LogLevel)
28+
*warnLevel = Warn
29+
errorLevel := new(LogLevel)
30+
*errorLevel = Error
31+
}
32+
1433
func TestIntoClientEvent_StartStreaming(t *testing.T) {
15-
event := ClientEvent{
16-
Type: StartStreaming,
17-
event: []byte(`{"type": "start_streaming"}`),
34+
for _, test := range []struct {
35+
name string
36+
expected EventStartStreaming
37+
}{
38+
{
39+
name: "no filters",
40+
expected: EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}},
41+
},
42+
{
43+
name: "level filter",
44+
expected: EventStartStreaming{
45+
ClientEvent: ClientEvent{Type: StartStreaming},
46+
Filters: &StreamingFilters{
47+
Level: infoLevel,
48+
},
49+
},
50+
},
51+
{
52+
name: "events filter",
53+
expected: EventStartStreaming{
54+
ClientEvent: ClientEvent{Type: StartStreaming},
55+
Filters: &StreamingFilters{
56+
Events: []LogEventType{Cloudflared, HTTP},
57+
},
58+
},
59+
},
60+
{
61+
name: "level and events filters",
62+
expected: EventStartStreaming{
63+
ClientEvent: ClientEvent{Type: StartStreaming},
64+
Filters: &StreamingFilters{
65+
Level: infoLevel,
66+
Events: []LogEventType{Cloudflared},
67+
},
68+
},
69+
},
70+
} {
71+
t.Run(test.name, func(t *testing.T) {
72+
data, err := json.Marshal(test.expected)
73+
require.NoError(t, err)
74+
event := ClientEvent{}
75+
err = json.Unmarshal(data, &event)
76+
require.NoError(t, err)
77+
event.event = data
78+
ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
79+
require.True(t, ok)
80+
require.Equal(t, test.expected.ClientEvent, ce.ClientEvent)
81+
if test.expected.Filters != nil {
82+
f := ce.Filters
83+
ef := test.expected.Filters
84+
if ef.Level != nil {
85+
require.Equal(t, *ef.Level, *f.Level)
86+
}
87+
require.ElementsMatch(t, ef.Events, f.Events)
88+
}
89+
})
1890
}
19-
ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
20-
require.True(t, ok)
21-
require.Equal(t, EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}}, *ce)
2291
}
2392

2493
func TestIntoClientEvent_StopStreaming(t *testing.T) {

0 commit comments

Comments
 (0)