Skip to content

Commit 38cd455

Browse files
committed
TUN-7373: Streaming logs override for same actor
To help accommodate web browser interactions with websockets, when a streaming logs session is requested for the same actor while already serving a session for that user in a separate request, the original request will be closed and the new request start streaming logs instead. This should help with rogue sessions holding on for too long with no client on the other side (before idle timeout or connection close).
1 parent ee5e447 commit 38cd455

File tree

109 files changed

+12692
-1799
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+12692
-1799
lines changed

component-tests/test_tail.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import pytest
55
import requests
6+
import websockets
67
from websockets.client import connect, WebSocketClientProtocol
78
from conftest import CfdModes
89
from constants import MAX_RETRIES, BACKOFF_SECS
@@ -88,6 +89,46 @@ async def test_streaming_logs_filters(self, tmp_path, component_tests_config):
8889
# send stop_streaming
8990
await websocket.send('{"type": "stop_streaming"}')
9091

92+
@pytest.mark.asyncio
93+
async def test_streaming_logs_actor_override(self, tmp_path, component_tests_config):
94+
"""
95+
Validates that a streaming logs session can be overriden by the same actor
96+
"""
97+
print("test_streaming_logs_actor_override")
98+
config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
99+
LOGGER.debug(config)
100+
headers = {}
101+
headers["Content-Type"] = "application/json"
102+
config_path = write_config(tmp_path, config.full_config)
103+
with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
104+
wait_tunnel_ready(tunnel_url=config.get_url(),
105+
require_min_connections=1)
106+
cfd_cli = CloudflaredCli(config, config_path, LOGGER)
107+
url = cfd_cli.get_management_wsurl("logs", config, config_path)
108+
task = asyncio.ensure_future(start_streaming_to_be_remotely_closed(url))
109+
override_task = asyncio.ensure_future(start_streaming_override(url))
110+
await asyncio.wait([task, override_task])
111+
assert task.exception() == None, task.exception()
112+
assert override_task.exception() == None, override_task.exception()
113+
114+
async def start_streaming_to_be_remotely_closed(url):
115+
async with connect(url, open_timeout=5, close_timeout=5) as websocket:
116+
try:
117+
await websocket.send(json.dumps({"type": "start_streaming"}))
118+
await asyncio.sleep(10)
119+
assert websocket.closed, "expected this request to be forcibly closed by the override"
120+
except websockets.ConnectionClosed:
121+
# we expect the request to be closed
122+
pass
123+
124+
async def start_streaming_override(url):
125+
# wait for the first connection to be established
126+
await asyncio.sleep(1)
127+
async with connect(url, open_timeout=5, close_timeout=5) as websocket:
128+
await websocket.send(json.dumps({"type": "start_streaming"}))
129+
await asyncio.sleep(1)
130+
await websocket.send(json.dumps({"type": "stop_streaming"}))
131+
await asyncio.sleep(1)
91132

92133
# Every http request has two log lines sent
93134
async def generate_and_validate_log_event(websocket: WebSocketClientProtocol, url: str):

go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/getsentry/raven-go v0.2.0
1414
github.com/getsentry/sentry-go v0.16.0
1515
github.com/go-chi/chi/v5 v5.0.8
16+
github.com/go-jose/go-jose/v3 v3.0.0
1617
github.com/gobwas/ws v1.0.4
1718
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
1819
github.com/google/gopacket v1.1.19
@@ -36,8 +37,8 @@ require (
3637
go.opentelemetry.io/otel/trace v1.6.3
3738
go.opentelemetry.io/proto/otlp v0.15.0
3839
go.uber.org/automaxprocs v1.4.0
39-
golang.org/x/crypto v0.5.0
40-
golang.org/x/net v0.7.0
40+
golang.org/x/crypto v0.8.0
41+
golang.org/x/net v0.9.0
4142
golang.org/x/sync v0.1.0
4243
golang.org/x/sys v0.7.0
4344
golang.org/x/term v0.7.0
@@ -96,8 +97,8 @@ require (
9697
github.com/russross/blackfriday/v2 v2.1.0 // indirect
9798
golang.org/x/mod v0.8.0 // indirect
9899
golang.org/x/oauth2 v0.4.0 // indirect
99-
golang.org/x/text v0.7.0 // indirect
100-
golang.org/x/tools v0.1.12 // indirect
100+
golang.org/x/text v0.9.0 // indirect
101+
golang.org/x/tools v0.6.0 // indirect
101102
google.golang.org/appengine v1.6.7 // indirect
102103
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
103104
google.golang.org/grpc v1.51.0 // indirect

go.sum

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxI
178178
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
179179
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
180180
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
181+
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
182+
github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8=
181183
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
182184
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
183185
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
@@ -542,12 +544,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
542544
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
543545
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
544546
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
547+
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
545548
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
546549
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
547550
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
548551
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
549-
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
550-
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
552+
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
553+
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
551554
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
552555
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
553556
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -640,8 +643,8 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su
640643
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
641644
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
642645
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
643-
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
644-
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
646+
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
647+
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
645648
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
646649
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
647650
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -777,8 +780,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
777780
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
778781
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
779782
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
780-
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
781-
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
783+
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
784+
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
782785
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
783786
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
784787
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -839,8 +842,8 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
839842
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
840843
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
841844
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
842-
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
843-
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
845+
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
846+
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
844847
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
845848
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
846849
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

management/logger.go

Lines changed: 28 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,9 @@ import (
1111

1212
var json = jsoniter.ConfigFastest
1313

14-
const (
15-
// Indicates how many log messages the listener will hold before dropping.
16-
// Provides a throttling mechanism to drop latest messages if the sender
17-
// can't keep up with the influx of log messages.
18-
logWindow = 30
19-
)
20-
2114
// Logger manages the number of management streaming log sessions
2215
type Logger struct {
23-
sessions []*Session
16+
sessions []*session
2417
mu sync.RWMutex
2518

2619
// Unique logger that isn't a io.Writer of the list of zerolog writers. This helps prevent management log
@@ -40,69 +33,47 @@ func NewLogger() *Logger {
4033
}
4134

4235
type LoggerListener interface {
43-
Listen(*StreamingFilters) *Session
44-
Close(*Session)
45-
}
46-
47-
type Session struct {
48-
// Buffered channel that holds the recent log events
49-
listener chan *Log
50-
// Types of log events that this session will provide through the listener
51-
filters *StreamingFilters
52-
}
53-
54-
func newSession(size int, filters *StreamingFilters) *Session {
55-
s := &Session{
56-
listener: make(chan *Log, size),
57-
}
58-
if filters != nil {
59-
s.filters = filters
60-
} else {
61-
s.filters = &StreamingFilters{}
62-
}
63-
return s
36+
// ActiveSession returns the first active session for the requested actor.
37+
ActiveSession(actor) *session
38+
// ActiveSession returns the count of active sessions.
39+
ActiveSessions() int
40+
// Listen appends the session to the list of sessions that receive log events.
41+
Listen(*session)
42+
// Remove a session from the available sessions that were receiving log events.
43+
Remove(*session)
6444
}
6545

66-
// Insert attempts to insert the log to the session. If the log event matches the provided session filters, it
67-
// will be applied to the listener.
68-
func (s *Session) Insert(log *Log) {
69-
// Level filters are optional
70-
if s.filters.Level != nil {
71-
if *s.filters.Level > log.Level {
72-
return
46+
func (l *Logger) ActiveSession(actor actor) *session {
47+
l.mu.RLock()
48+
defer l.mu.RUnlock()
49+
for _, session := range l.sessions {
50+
if session.actor.ID == actor.ID && session.active.Load() {
51+
return session
7352
}
7453
}
75-
// Event filters are optional
76-
if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) {
77-
return
78-
}
79-
select {
80-
case s.listener <- log:
81-
default:
82-
// buffer is full, discard
83-
}
54+
return nil
8455
}
8556

86-
func contains(array []LogEventType, t LogEventType) bool {
87-
for _, v := range array {
88-
if v == t {
89-
return true
57+
func (l *Logger) ActiveSessions() int {
58+
l.mu.RLock()
59+
defer l.mu.RUnlock()
60+
count := 0
61+
for _, session := range l.sessions {
62+
if session.active.Load() {
63+
count += 1
9064
}
9165
}
92-
return false
66+
return count
9367
}
9468

95-
// Listen creates a new Session that will append filtered log events as they are created.
96-
func (l *Logger) Listen(filters *StreamingFilters) *Session {
69+
func (l *Logger) Listen(session *session) {
9770
l.mu.Lock()
9871
defer l.mu.Unlock()
99-
listener := newSession(logWindow, filters)
100-
l.sessions = append(l.sessions, listener)
101-
return listener
72+
session.active.Store(true)
73+
l.sessions = append(l.sessions, session)
10274
}
10375

104-
// Close will remove a Session from the available sessions that were receiving log events.
105-
func (l *Logger) Close(session *Session) {
76+
func (l *Logger) Remove(session *session) {
10677
l.mu.Lock()
10778
defer l.mu.Unlock()
10879
index := -1

0 commit comments

Comments
 (0)