Skip to content

Commit 88c25d2

Browse files
committed
TUN-7133: Add sampling support for streaming logs
In addition to supporting sampling support for streaming logs, cloudflared tail also supports this via `--sample 0.5` to sample 50% of your log events.
1 parent 38cd455 commit 88c25d2

File tree

6 files changed

+137
-37
lines changed

6 files changed

+137
-37
lines changed

cmd/cloudflared/tail/cmd.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,16 @@ func buildTailCommand(subcommands []*cli.Command) *cli.Command {
8383
},
8484
&cli.StringFlag{
8585
Name: "level",
86-
Usage: "Filter by specific log levels (debug, info, warn, error)",
86+
Usage: "Filter by specific log levels (debug, info, warn, error). Filters by debug log level by default.",
8787
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_LEVEL"},
8888
Value: "debug",
8989
},
90+
&cli.Float64Flag{
91+
Name: "sample",
92+
Usage: "Sample log events by percentage (0.0 .. 1.0). No sampling by default.",
93+
EnvVars: []string{"TUNNEL_MANAGEMENT_FILTER_SAMPLE"},
94+
Value: 1.0,
95+
},
9096
&cli.StringFlag{
9197
Name: "token",
9298
Usage: "Access token for a specific tunnel",
@@ -172,9 +178,11 @@ func createLogger(c *cli.Context) *zerolog.Logger {
172178
func parseFilters(c *cli.Context) (*management.StreamingFilters, error) {
173179
var level *management.LogLevel
174180
var events []management.LogEventType
181+
var sample float64
175182

176183
argLevel := c.String("level")
177184
argEvents := c.StringSlice("event")
185+
argSample := c.Float64("sample")
178186

179187
if argLevel != "" {
180188
l, ok := management.ParseLogLevel(argLevel)
@@ -192,14 +200,20 @@ func parseFilters(c *cli.Context) (*management.StreamingFilters, error) {
192200
events = append(events, t)
193201
}
194202

195-
if level == nil && len(events) == 0 {
203+
if argSample <= 0.0 || argSample > 1.0 {
204+
return nil, fmt.Errorf("invalid --sample value provided, please make sure it is in the range (0.0 .. 1.0)")
205+
}
206+
sample = argSample
207+
208+
if level == nil && len(events) == 0 && argSample != 1.0 {
196209
// When no filters are provided, do not return a StreamingFilters struct
197210
return nil, nil
198211
}
199212

200213
return &management.StreamingFilters{
201-
Level: level,
202-
Events: events,
214+
Level: level,
215+
Events: events,
216+
Sampling: sample,
203217
}, nil
204218
}
205219

component-tests/test_tail.py

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@ async def test_start_stop_streaming(self, tmp_path, component_tests_config):
2121
print("test_start_stop_streaming")
2222
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
2323
LOGGER.debug(config)
24-
headers = {}
25-
headers["Content-Type"] = "application/json"
2624
config_path = write_config(tmp_path, config.full_config)
2725
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
28-
wait_tunnel_ready(tunnel_url=config.get_url(),
29-
require_min_connections=1)
26+
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
3027
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
3128
url = cfd_cli.get_management_wsurl("logs", config, config_path)
3229
async with connect(url, open_timeout=5, close_timeout=3) as websocket:
@@ -43,19 +40,16 @@ async def test_streaming_logs(self, tmp_path, component_tests_config):
4340
print("test_streaming_logs")
4441
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
4542
LOGGER.debug(config)
46-
headers = {}
47-
headers["Content-Type"] = "application/json"
4843
config_path = write_config(tmp_path, config.full_config)
4944
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
50-
wait_tunnel_ready(tunnel_url=config.get_url(),
51-
require_min_connections=1)
45+
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
5246
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
5347
url = cfd_cli.get_management_wsurl("logs", config, config_path)
5448
async with connect(url, open_timeout=5, close_timeout=5) as websocket:
5549
# send start_streaming
5650
await websocket.send('{"type": "start_streaming"}')
5751
# send some http requests to the tunnel to trigger some logs
58-
await asyncio.wait_for(generate_and_validate_log_event(websocket, config.get_url()), 10)
52+
await generate_and_validate_http_events(websocket, config.get_url(), 10)
5953
# send stop_streaming
6054
await websocket.send('{"type": "stop_streaming"}')
6155

@@ -68,26 +62,51 @@ async def test_streaming_logs_filters(self, tmp_path, component_tests_config):
6862
print("test_streaming_logs_filters")
6963
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
7064
LOGGER.debug(config)
71-
headers = {}
72-
headers["Content-Type"] = "application/json"
7365
config_path = write_config(tmp_path, config.full_config)
7466
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
75-
wait_tunnel_ready(tunnel_url=config.get_url(),
76-
require_min_connections=1)
67+
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
7768
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
7869
url = cfd_cli.get_management_wsurl("logs", config, config_path)
7970
async with connect(url, open_timeout=5, close_timeout=5) as websocket:
80-
# send start_streaming with info logs only
71+
# send start_streaming with tcp logs only
8172
await websocket.send(json.dumps({
8273
"type": "start_streaming",
8374
"filters": {
84-
"events": ["tcp"]
75+
"events": ["tcp"],
76+
"level": "debug"
8577
}
8678
}))
8779
# don't expect any http logs
8880
await generate_and_validate_no_log_event(websocket, config.get_url())
8981
# send stop_streaming
9082
await websocket.send('{"type": "stop_streaming"}')
83+
84+
@pytest.mark.asyncio
85+
async def test_streaming_logs_sampling(self, tmp_path, component_tests_config):
86+
"""
87+
Validates that a streaming logs connection will stream logs with sampling.
88+
"""
89+
print("test_streaming_logs_sampling")
90+
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
91+
LOGGER.debug(config)
92+
config_path = write_config(tmp_path, config.full_config)
93+
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
94+
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
95+
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
96+
url = cfd_cli.get_management_wsurl("logs", config, config_path)
97+
async with connect(url, open_timeout=5, close_timeout=5) as websocket:
98+
# send start_streaming with info logs only
99+
await websocket.send(json.dumps({
100+
"type": "start_streaming",
101+
"filters": {
102+
"sampling": 0.5
103+
}
104+
}))
105+
# don't expect any http logs
106+
count = await generate_and_validate_http_events(websocket, config.get_url(), 10)
107+
assert count < (10 * 2) # There are typically always two log lines for http requests (request and response)
108+
# send stop_streaming
109+
await websocket.send('{"type": "stop_streaming"}')
91110

92111
@pytest.mark.asyncio
93112
async def test_streaming_logs_actor_override(self, tmp_path, component_tests_config):
@@ -97,12 +116,9 @@ async def test_streaming_logs_actor_override(self, tmp_path, component_tests_con
97116
print("test_streaming_logs_actor_override")
98117
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
99118
LOGGER.debug(config)
100-
headers = {}
101-
headers["Content-Type"] = "application/json"
102119
config_path = write_config(tmp_path, config.full_config)
103120
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
104-
wait_tunnel_ready(tunnel_url=config.get_url(),
105-
require_min_connections=1)
121+
wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
106122
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
107123
url = cfd_cli.get_management_wsurl("logs", config, config_path)
108124
task = asyncio.ensure_future(start_streaming_to_be_remotely_closed(url))
@@ -131,16 +147,22 @@ async def start_streaming_override(url):
131147
await asyncio.sleep(1)
132148

133149
# Every http request has two log lines sent
134-
async def generate_and_validate_log_event(websocket: WebSocketClientProtocol, url: str):
135-
send_request(url)
136-
req_line = await websocket.recv()
137-
log_line = json.loads(req_line)
138-
assert log_line["type"] == "logs"
139-
assert log_line["logs"][0]["event"] == "http"
140-
req_line = await websocket.recv()
141-
log_line = json.loads(req_line)
142-
assert log_line["type"] == "logs"
143-
assert log_line["logs"][0]["event"] == "http"
150+
async def generate_and_validate_http_events(websocket: WebSocketClientProtocol, url: str, count_send: int):
151+
for i in range(count_send):
152+
send_request(url)
153+
# There are typically always two log lines for http requests (request and response)
154+
count = 0
155+
while True:
156+
try:
157+
req_line = await asyncio.wait_for(websocket.recv(), 2)
158+
log_line = json.loads(req_line)
159+
assert log_line["type"] == "logs"
160+
assert log_line["logs"][0]["event"] == "http"
161+
count += 1
162+
except asyncio.TimeoutError:
163+
# ignore timeout from waiting for recv
164+
break
165+
return count
144166

145167
# Every http request has two log lines sent
146168
async def generate_and_validate_no_log_event(websocket: WebSocketClientProtocol, url: str):

management/events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ type EventStartStreaming struct {
5252
}
5353

5454
type StreamingFilters struct {
55-
Events []LogEventType `json:"events,omitempty"`
56-
Level *LogLevel `json:"level,omitempty"`
55+
Events []LogEventType `json:"events,omitempty"`
56+
Level *LogLevel `json:"level,omitempty"`
57+
Sampling float64 `json:"sampling,omitempty"`
5758
}
5859

5960
// EventStopStreaming signifies that the client wishes to halt receiving log events.

management/events_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,23 @@ func TestIntoClientEvent_StartStreaming(t *testing.T) {
5757
},
5858
},
5959
},
60+
{
61+
name: "sampling filter",
62+
expected: EventStartStreaming{
63+
ClientEvent: ClientEvent{Type: StartStreaming},
64+
Filters: &StreamingFilters{
65+
Sampling: 0.5,
66+
},
67+
},
68+
},
6069
{
6170
name: "level and events filters",
6271
expected: EventStartStreaming{
6372
ClientEvent: ClientEvent{Type: StartStreaming},
6473
Filters: &StreamingFilters{
65-
Level: infoLevel,
66-
Events: []LogEventType{Cloudflared},
74+
Level: infoLevel,
75+
Events: []LogEventType{Cloudflared},
76+
Sampling: 0.5,
6777
},
6878
},
6979
},

management/session.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package management
22

33
import (
44
"context"
5+
"math/rand"
56
"sync/atomic"
67
)
78

@@ -25,6 +26,8 @@ type session struct {
2526
listener chan *Log
2627
// Types of log events that this session will provide through the listener
2728
filters *StreamingFilters
29+
// Sampling of the log events this session will send (runs after all other filters if available)
30+
sampler *sampler
2831
}
2932

3033
// NewSession creates a new session.
@@ -43,6 +46,20 @@ func newSession(size int, actor actor, cancel context.CancelFunc) *session {
4346
func (s *session) Filters(filters *StreamingFilters) {
4447
if filters != nil {
4548
s.filters = filters
49+
sampling := filters.Sampling
50+
// clamp the sampling values between 0 and 1
51+
if sampling < 0 {
52+
sampling = 0
53+
}
54+
if sampling > 1 {
55+
sampling = 1
56+
}
57+
s.filters.Sampling = sampling
58+
if sampling > 0 && sampling < 1 {
59+
s.sampler = &sampler{
60+
p: int(sampling * 100),
61+
}
62+
}
4663
} else {
4764
s.filters = &StreamingFilters{}
4865
}
@@ -61,6 +78,10 @@ func (s *session) Insert(log *Log) {
6178
if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) {
6279
return
6380
}
81+
// Sampling is also optional
82+
if s.sampler != nil && !s.sampler.Sample() {
83+
return
84+
}
6485
select {
6586
case s.listener <- log:
6687
default:
@@ -86,3 +107,14 @@ func contains(array []LogEventType, t LogEventType) bool {
86107
}
87108
return false
88109
}
110+
111+
// sampler will send approximately every p percentage log events out of 100.
112+
type sampler struct {
113+
p int
114+
}
115+
116+
// Sample returns true if the event should be part of the sample, false if the event should be dropped.
117+
func (s *sampler) Sample() bool {
118+
return rand.Intn(100) <= s.p
119+
120+
}

management/session_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,27 @@ func TestSession_Insert(t *testing.T) {
6767
},
6868
expectLog: false,
6969
},
70+
{
71+
name: "sampling",
72+
filters: StreamingFilters{
73+
Sampling: 0.9999999,
74+
},
75+
expectLog: true,
76+
},
77+
{
78+
name: "sampling (invalid negative)",
79+
filters: StreamingFilters{
80+
Sampling: -1.0,
81+
},
82+
expectLog: true,
83+
},
84+
{
85+
name: "sampling (invalid too large)",
86+
filters: StreamingFilters{
87+
Sampling: 2.0,
88+
},
89+
expectLog: true,
90+
},
7091
{
7192
name: "filter and event",
7293
filters: StreamingFilters{

0 commit comments

Comments
 (0)