Skip to content

Commit 1fe58eb

Browse files
committed
Add watch subscription checking for event flush when not subscribing
1 parent c37f005 commit 1fe58eb

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

client/client.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"os"
2929
"path/filepath"
3030
"strings"
31+
"sync/atomic"
3132

3233
"connectrpc.com/connect"
3334
"github.com/rs/xid"
@@ -72,13 +73,18 @@ var (
7273

7374
// ErrInitializationNotReceived occurs when the first response of the watch stream is not received.
7475
ErrInitializationNotReceived = errors.New("initialization is not received")
76+
77+
// ErrAlreadySubscribed occurs when the client is already subscribed to the document.
78+
ErrAlreadySubscribed = errors.New("already subscribed")
7579
)
7680

7781
// Attachment represents the document attached.
7882
type Attachment struct {
7983
doc *document.Document
8084
docID types.ID
8185

86+
isSubscribed atomic.Bool
87+
8288
rch <-chan WatchResponse
8389
ctx context.Context
8490
cancelFunc context.CancelFunc
@@ -395,10 +401,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
395401
if doc.Status() != document.StatusRemoved {
396402
doc.SetStatus(document.StatusDetached)
397403
}
398-
delete(c.attachments, doc.Key())
399-
400404
attachment.cancelFunc()
401-
attachment.rch = nil
405+
delete(c.attachments, doc.Key())
402406

403407
return nil
404408
}
@@ -432,6 +436,11 @@ func (c *Client) Subscribe(
432436
return nil, nil, ErrDocumentNotAttached
433437
}
434438

439+
if attachment.isSubscribed.Load() {
440+
return nil, nil, ErrAlreadySubscribed
441+
}
442+
attachment.isSubscribed.Store(true)
443+
435444
return attachment.rch, attachment.cancelFunc, nil
436445
}
437446

@@ -485,13 +494,14 @@ func (c *Client) runWatchLoop(
485494
close(rch)
486495
return
487496
}
488-
if resp == nil {
497+
if resp == nil || !attachment.isSubscribed.Load() {
489498
continue
490499
}
491500

492501
rch <- *resp
493502
}
494503
if err = stream.Err(); err != nil {
504+
attachment.isSubscribed.Store(false)
495505
rch <- WatchResponse{Err: err}
496506
close(rch)
497507
return

test/integration/document_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ func TestDocument(t *testing.T) {
166166
rch, _, err := c1.Subscribe(d1)
167167
assert.NoError(t, err)
168168

169-
isPresenceChangedEvent := true
170169
go func() {
171170
defer wg.Done()
172171

@@ -178,16 +177,7 @@ func TestDocument(t *testing.T) {
178177
}
179178
assert.NoError(t, resp.Err)
180179

181-
// TODO(krapie): event includes presence changes in the document too.
182-
// therefore, there will be
183-
// 1. DocumentChanged from client 2 (Presence change)
184-
// 2. DocumentWatched from client 2 (Watch event)
185-
// 3. DocumentChanged from client 2 (Document change)
186180
if resp.Type == client.DocumentChanged {
187-
if isPresenceChangedEvent {
188-
isPresenceChangedEvent = false
189-
continue
190-
}
191181
err := c1.Sync(ctx, client.WithDocKey(d1.Key()))
192182
assert.NoError(t, err)
193183
return

0 commit comments

Comments
 (0)