Skip to content

Commit 8ef8ea8

Browse files
[bug] Fix late-join buffering for reverse shell (#1422)
* Fix reverse shell websocket buffering by implementing Late Join logic Previously, the reverse shell stream's `sessionBuffer` implicitly expected message indices to start at 0. If a user connected to an active shell session (where the agent had already sent messages, advancing the index), the buffer would block incoming messages (e.g., index 100) while waiting for index 0, eventually stalling until the buffer overflowed. This change introduces an `initialized` flag to `sessionBuffer`. The buffer now anchors its `nextToSend` counter to the index of the first message it receives. This allows clients to join an active stream and immediately receive output, resolving the "buffering" issue where output only appeared after sufficient subsequent traffic. Tests were updated to respect this new "first-message-anchor" behavior, and a new `TestStream_LateJoin` test was added to verify the fix. * Fix reverse shell websocket buffering by implementing Late Join logic Previously, the reverse shell stream's `sessionBuffer` implicitly expected message indices to start at 0. If a user connected to an active shell session (where the agent had already sent messages, advancing the index), the buffer would block incoming messages (e.g., index 100) while waiting for index 0, eventually stalling until the buffer overflowed. This change introduces an `initialized` flag to `sessionBuffer`. The buffer now anchors its `nextToSend` counter to the index of the first message it receives. This allows clients to join an active stream and immediately receive output, resolving the "buffering" issue where output only appeared after sufficient subsequent traffic. Additionally, test flakiness in `TestMuxHistoryOrdering` and other stream tests was resolved by using unique topic names (`newTopicName`) to prevent collisions during parallel execution with the `mem://` pubsub driver. Tests were also updated to respect the new "first-message-anchor" behavior. --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: KCarretto <Kcarretto@gmail.com>
1 parent 0083b13 commit 8ef8ea8

File tree

3 files changed

+73
-26
lines changed

3 files changed

+73
-26
lines changed

tavern/internal/http/stream/mux_test.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package stream_test
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"testing"
78
"time"
89

@@ -13,15 +14,20 @@ import (
1314
"realm.pub/tavern/internal/http/stream"
1415
)
1516

17+
func newTopicName(base string) string {
18+
return fmt.Sprintf("mem://%s-%d", base, rand.Int())
19+
}
20+
1621
func TestMux(t *testing.T) {
22+
t.Parallel()
1723
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1824
defer cancel()
1925

20-
// Setup Topic and Subscription
21-
topic, err := pubsub.OpenTopic(ctx, "mem://mux-test")
26+
topicName := newTopicName("mux-test")
27+
topic, err := pubsub.OpenTopic(ctx, topicName)
2228
require.NoError(t, err)
2329
defer topic.Shutdown(ctx)
24-
sub, err := pubsub.OpenSubscription(ctx, "mem://mux-test")
30+
sub, err := pubsub.OpenSubscription(ctx, topicName)
2531
require.NoError(t, err)
2632
defer sub.Shutdown(ctx)
2733

@@ -78,14 +84,15 @@ func TestMux(t *testing.T) {
7884
}
7985

8086
func TestMuxHistory(t *testing.T) {
87+
t.Parallel()
8188
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
8289
defer cancel()
8390

84-
// Setup Topic and Subscription
85-
topic, err := pubsub.OpenTopic(ctx, "mem://mux-history-test")
91+
topicName := newTopicName("mux-history-test")
92+
topic, err := pubsub.OpenTopic(ctx, topicName)
8693
require.NoError(t, err)
8794
defer topic.Shutdown(ctx)
88-
sub, err := pubsub.OpenSubscription(ctx, "mem://mux-history-test")
95+
sub, err := pubsub.OpenSubscription(ctx, topicName)
8996
require.NoError(t, err)
9097
defer sub.Shutdown(ctx)
9198

@@ -150,14 +157,15 @@ func TestMuxHistory(t *testing.T) {
150157
}
151158

152159
func TestMuxHistoryOrdering(t *testing.T) {
160+
t.Parallel()
153161
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
154162
defer cancel()
155163

156-
// Setup Topic and Subscription
157-
topic, err := pubsub.OpenTopic(ctx, "mem://mux-history-ordering-test")
164+
topicName := newTopicName("mux-history-ordering-test")
165+
topic, err := pubsub.OpenTopic(ctx, topicName)
158166
require.NoError(t, err)
159167
defer topic.Shutdown(ctx)
160-
sub, err := pubsub.OpenSubscription(ctx, "mem://mux-history-ordering-test")
168+
sub, err := pubsub.OpenSubscription(ctx, topicName)
161169
require.NoError(t, err)
162170
defer sub.Shutdown(ctx)
163171

@@ -170,15 +178,16 @@ func TestMuxHistoryOrdering(t *testing.T) {
170178
mux.Register(monitor)
171179
defer mux.Unregister(monitor)
172180

173-
// Send messages out of order: 2, 0, 1
181+
// Send messages in an order that respects the new "Late Join" logic.
182+
// We must send the anchor (0) first so the stream knows where it starts.
174183
orderKey := "session1"
175184
messages := []struct {
176185
body string
177186
index int
178187
}{
179-
{"C", 2},
180-
{"A", 0},
181-
{"B", 1},
188+
{"A", 0}, // Anchor
189+
{"C", 2}, // Out of order, will buffer
190+
{"B", 1}, // Fills gap
182191
}
183192

184193
for _, m := range messages {
@@ -196,8 +205,6 @@ func TestMuxHistoryOrdering(t *testing.T) {
196205
}
197206

198207
// Wait for monitor to receive all 3 messages.
199-
// Monitor (Stream) performs its own reordering, so it should see A, B, C.
200-
// But we really care about what's in Mux History.
201208
received := ""
202209
for i := 0; i < 3; i++ {
203210
select {

tavern/internal/http/stream/stream.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,9 @@ func newOrderKey() string {
164164
// In this example, websockets could race to add data, but all the data they add will be in order
165165
// with respect to the websocket.
166166
type sessionBuffer struct {
167-
nextToSend uint64
168-
data map[uint64]*pubsub.Message
167+
nextToSend uint64
168+
data map[uint64]*pubsub.Message
169+
initialized bool
169170
}
170171

171172
func (buf *sessionBuffer) writeMessage(ctx context.Context, msg *pubsub.Message, emit func(*pubsub.Message)) {
@@ -176,6 +177,11 @@ func (buf *sessionBuffer) writeMessage(ctx context.Context, msg *pubsub.Message,
176177
return
177178
}
178179

180+
if !buf.initialized {
181+
buf.nextToSend = index
182+
buf.initialized = true
183+
}
184+
179185
if index < buf.nextToSend {
180186
slog.ErrorContext(ctx, "dropping message because subsequent message has already been sent",
181187
"msg_log_id", msg.LoggableID,

tavern/internal/http/stream/stream_test.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package stream
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"testing"
78
"time"
89

@@ -12,15 +13,20 @@ import (
1213
_ "gocloud.dev/pubsub/mempubsub"
1314
)
1415

16+
func newTopicName(base string) string {
17+
return fmt.Sprintf("mem://%s-%d", base, rand.Int())
18+
}
19+
1520
func TestStream_SendMessage(t *testing.T) {
1621
t.Parallel()
1722
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1823
defer cancel()
1924

20-
topic, err := pubsub.OpenTopic(ctx, "mem://stream-test-send")
25+
topicName := newTopicName("stream-test-send")
26+
topic, err := pubsub.OpenTopic(ctx, topicName)
2127
require.NoError(t, err)
2228
defer topic.Shutdown(ctx)
23-
sub, err := pubsub.OpenSubscription(ctx, "mem://stream-test-send")
29+
sub, err := pubsub.OpenSubscription(ctx, topicName)
2430
require.NoError(t, err)
2531
defer sub.Shutdown(ctx)
2632

@@ -46,29 +52,31 @@ func TestStream_MessageOrdering(t *testing.T) {
4652

4753
stream := New("ordering-stream")
4854
go func() {
49-
// Send messages out of order
55+
// Send 0 first to anchor the stream
5056
stream.processOneMessage(ctx, &pubsub.Message{
51-
Body: []byte("message 2"),
57+
Body: []byte("message 0"),
5258
Metadata: map[string]string{
5359
"id": "ordering-stream",
5460
metadataOrderKey: "test-key",
55-
metadataOrderIndex: "2",
61+
metadataOrderIndex: "0",
5662
},
5763
})
64+
// Then send 2 (buffered)
5865
stream.processOneMessage(ctx, &pubsub.Message{
59-
Body: []byte("message 1"),
66+
Body: []byte("message 2"),
6067
Metadata: map[string]string{
6168
"id": "ordering-stream",
6269
metadataOrderKey: "test-key",
63-
metadataOrderIndex: "1",
70+
metadataOrderIndex: "2",
6471
},
6572
})
73+
// Then send 1 (fills gap)
6674
stream.processOneMessage(ctx, &pubsub.Message{
67-
Body: []byte("message 0"),
75+
Body: []byte("message 1"),
6876
Metadata: map[string]string{
6977
"id": "ordering-stream",
7078
metadataOrderKey: "test-key",
71-
metadataOrderIndex: "0",
79+
metadataOrderIndex: "1",
7280
},
7381
})
7482
}()
@@ -85,6 +93,32 @@ func TestStream_MessageOrdering(t *testing.T) {
8593
}
8694
}
8795

96+
func TestStream_LateJoin(t *testing.T) {
97+
t.Parallel()
98+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
99+
defer cancel()
100+
101+
stream := New("late-join-stream")
102+
go func() {
103+
// Simulate joining late: receive message 10 first
104+
stream.processOneMessage(ctx, &pubsub.Message{
105+
Body: []byte("message 10"),
106+
Metadata: map[string]string{
107+
"id": "late-join-stream",
108+
metadataOrderKey: "test-key",
109+
metadataOrderIndex: "10",
110+
},
111+
})
112+
}()
113+
114+
select {
115+
case msg := <-stream.Messages():
116+
assert.Equal(t, "message 10", string(msg.Body))
117+
case <-ctx.Done():
118+
t.Fatal("timed out waiting for message 10 (late join failed)")
119+
}
120+
}
121+
88122
func TestStream_Close(t *testing.T) {
89123
t.Parallel()
90124
stream := New("closable-stream")

0 commit comments

Comments
 (0)