Skip to content
This repository was archived by the owner on Dec 17, 2025. It is now read-only.

Commit 800cc5a

Browse files
authored
feat: add unsubscribe (#15)
* feat: add unsubscribe * fix: fix unit_test * chore: optimize license header * chore: optimize license header
1 parent 688b860 commit 800cc5a

File tree

12 files changed

+58
-21
lines changed

12 files changed

+58
-21
lines changed

README_CN.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func main() {
156156
}
157157
}()
158158

159-
select {}
159+
wg.Wait()
160160
}
161161

162162
```

client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -124,7 +124,7 @@ func (c *Client) startReadLoop(ctx context.Context, reader *EventStreamReader) (
124124
func (c *Client) readLoop(ctx context.Context, reader *EventStreamReader, outCh chan *Event, erChan chan error) {
125125
for {
126126
// Read each new line and process the type of event
127-
event, err := reader.ReadEvent()
127+
event, err := reader.ReadEvent(ctx)
128128
if err != nil {
129129
if err == io.EOF {
130130
erChan <- nil

client_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -197,6 +197,31 @@ func TestClientSubscribe(t *testing.T) {
197197
assert.Nil(t, cErr)
198198
}
199199

200+
func TestClientUnSubscribe(t *testing.T) {
201+
go newServer(false, "8887")
202+
time.Sleep(time.Second)
203+
c := NewClient("http://127.0.0.1:8887/sse")
204+
205+
events := make(chan *Event)
206+
ctx, cancel := context.WithCancel(context.Background())
207+
var cErr error
208+
go func() {
209+
cErr = c.SubscribeWithContext(ctx, func(msg *Event) {
210+
if msg.Data != nil {
211+
events <- msg
212+
return
213+
}
214+
})
215+
assert.Nil(t, cErr)
216+
}()
217+
cancel()
218+
time.Sleep(5 * time.Second)
219+
for i := 0; i < 5; i++ {
220+
_, err := wait(events, time.Second*1)
221+
assert.DeepEqual(t, errors.New("timeout"), err)
222+
}
223+
}
224+
200225
func TestClientSubscribeMultiline(t *testing.T) {
201226
go newMultilineServer("9007")
202227
time.Sleep(time.Second)

encoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

encoder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

event.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@ package sse
4040
import (
4141
"bufio"
4242
"bytes"
43+
"context"
4344
"io"
4445
)
4546

@@ -119,10 +120,15 @@ func minPosInt(a, b int) int {
119120
}
120121

121122
// ReadEvent scans the EventStream for events.
122-
func (e *EventStreamReader) ReadEvent() ([]byte, error) {
123+
func (e *EventStreamReader) ReadEvent(ctx context.Context) ([]byte, error) {
123124
if e.scanner.Scan() {
124-
event := e.scanner.Bytes()
125-
return event, nil
125+
select {
126+
case <-ctx.Done():
127+
return nil, io.EOF
128+
default:
129+
event := e.scanner.Bytes()
130+
return event, nil
131+
}
126132
}
127133
if err := e.scanner.Err(); err != nil {
128134
return nil, err

examples/client/quickstart/main.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,11 +38,11 @@ package main
3838

3939
import (
4040
"context"
41-
"sync"
42-
43-
"github.com/hertz-contrib/sse"
44-
41+
"fmt"
4542
"github.com/cloudwego/hertz/pkg/common/hlog"
43+
"github.com/hertz-contrib/sse"
44+
"sync"
45+
"time"
4646
)
4747

4848
var wg sync.WaitGroup
@@ -64,15 +64,21 @@ func main() {
6464

6565
events := make(chan *sse.Event)
6666
errChan := make(chan error)
67+
ctx, cancel := context.WithCancel(context.Background())
6768
go func() {
68-
cErr := c.Subscribe(func(msg *sse.Event) {
69+
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
6970
if msg.Data != nil {
7071
events <- msg
7172
return
7273
}
7374
})
7475
errChan <- cErr
7576
}()
77+
go func() {
78+
time.Sleep(5 * time.Second)
79+
cancel()
80+
fmt.Println("client1 subscribe cancel")
81+
}()
7682
for {
7783
select {
7884
case e := <-events:

examples/server/chat/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

examples/server/quickstart/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

examples/server/stockprice/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 CloudWeGo Authors
2+
* Copyright 2024 CloudWeGo Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)