Skip to content
This repository was archived by the owner on Dec 17, 2025. It is now read-only.

Commit f372f22

Browse files
authored
Merge pull request #19 from DMwangnima/feat/specify_request
feat: support specifying the request to be used each time the SSE stream is created
2 parents e083267 + 7cded64 commit f372f22

File tree

6 files changed

+794
-266
lines changed

6 files changed

+794
-266
lines changed

README.md

Lines changed: 143 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -90,129 +90,160 @@ see: [examples/client/quickstart/main.go](examples/client/quickstart/main.go)
9090
package main
9191

9292
import (
93-
"context"
94-
"sync"
95-
"time"
93+
"context"
94+
"sync"
95+
"time"
9696

97-
"github.com/cloudwego/hertz/pkg/common/hlog"
97+
"github.com/cloudwego/hertz/pkg/app/client"
98+
"github.com/cloudwego/hertz/pkg/common/hlog"
99+
"github.com/cloudwego/hertz/pkg/protocol"
98100

99-
"github.com/hertz-contrib/sse"
101+
"github.com/hertz-contrib/sse"
100102
)
101103

102104
var wg sync.WaitGroup
103105

104106
func main() {
105-
wg.Add(2)
106-
go func() {
107-
c := sse.NewClient("http://127.0.0.1:8888/sse")
108-
109-
// touch off when connected to the server
110-
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
111-
hlog.Infof("client1 connect to server %s success with %s method", c.GetURL(), c.GetMethod())
112-
})
113-
114-
// touch off when the connection is shutdown
115-
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
116-
hlog.Infof("client1 disconnect to server %s success with %s method", c.GetURL(), c.GetMethod())
117-
})
118-
119-
events := make(chan *sse.Event)
120-
errChan := make(chan error)
121-
ctx, cancel := context.WithCancel(context.Background())
122-
go func() {
123-
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
124-
if msg.Data != nil {
125-
events <- msg
126-
return
127-
}
128-
})
129-
errChan <- cErr
130-
}()
131-
go func() {
132-
time.Sleep(5 * time.Second)
133-
cancel()
134-
hlog.Info("client1 subscribe cancel")
135-
}()
136-
for {
137-
select {
138-
case e := <-events:
139-
hlog.Infof("client1, %+v", e)
140-
case err := <-errChan:
141-
if err == nil {
142-
hlog.Info("client1, ctx done, read stop")
143-
} else {
144-
hlog.CtxErrorf(ctx, "client1, err = %s", err.Error())
145-
}
146-
wg.Done()
147-
return
148-
}
149-
}
150-
}()
151-
152-
go func() {
153-
c := sse.NewClient("http://127.0.0.1:8888/sse")
154-
155-
// touch off when connected to the server
156-
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
157-
hlog.Infof("client2 %s connect to server success with %s method", c.GetURL(), c.GetMethod())
158-
})
159-
160-
// touch off when the connection is shutdown
161-
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
162-
hlog.Infof("client2 %s disconnect to server success with %s method", c.GetURL(), c.GetMethod())
163-
})
164-
165-
events := make(chan *sse.Event, 10)
166-
errChan := make(chan error)
167-
go func() {
168-
cErr := c.Subscribe(func(msg *sse.Event) {
169-
if msg.Data != nil {
170-
events <- msg
171-
return
172-
}
173-
})
174-
errChan <- cErr
175-
}()
176-
177-
streamClosed := false
178-
for {
179-
select {
180-
case e := <-events:
181-
hlog.Infof("client2, %+v", e)
182-
time.Sleep(2 * time.Second) // do something blocked
183-
// When the event ends, you should break out of the loop.
184-
if checkEventEnd(e) {
185-
wg.Done()
186-
return
187-
}
188-
case err := <-errChan:
189-
if err == nil {
190-
// err is nil means read io.EOF, stream is closed
191-
streamClosed = true
192-
hlog.Info("client2, stream closed")
193-
// continue read channel events
194-
continue
195-
}
196-
hlog.CtxErrorf(context.Background(), "client2, err = %s", err.Error())
197-
wg.Done()
198-
return
199-
default:
200-
if streamClosed {
201-
hlog.Info("client2, events is empty and stream closed")
202-
wg.Done()
203-
return
204-
}
205-
}
206-
}
207-
}()
208-
209-
wg.Wait()
107+
wg.Add(2)
108+
go func() {
109+
// create Hertz client
110+
hCli, err := client.NewClient()
111+
if err != nil {
112+
hlog.Errorf("create Hertz client failed, err: %v", err)
113+
return
114+
}
115+
// inject Hertz client to create SSE client
116+
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
117+
if err != nil {
118+
hlog.Errorf("create SSE client failed, err: %v", err)
119+
return
120+
}
121+
122+
// touch off when connected to the server
123+
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
124+
hlog.Infof("client1 connect to server success")
125+
})
126+
127+
// touch off when the connection is shutdown
128+
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
129+
hlog.Infof("client1 disconnect to server success")
130+
})
131+
132+
events := make(chan *sse.Event)
133+
errChan := make(chan error)
134+
ctx, cancel := context.WithCancel(context.Background())
135+
go func() {
136+
// build the req sent with each SSE request
137+
req := &protocol.Request{}
138+
req.SetRequestURI("http://127.0.0.1:8888/sse")
139+
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
140+
if msg.Data != nil {
141+
events <- msg
142+
return
143+
}
144+
}, sse.WithRequest(req))
145+
errChan <- cErr
146+
}()
147+
go func() {
148+
time.Sleep(5 * time.Second)
149+
cancel()
150+
hlog.Info("client1 subscribe cancel")
151+
}()
152+
for {
153+
select {
154+
case e := <-events:
155+
hlog.Infof("client1, %+v", e)
156+
case err := <-errChan:
157+
if err == nil {
158+
hlog.Info("client1, ctx done, read stop")
159+
} else {
160+
hlog.CtxErrorf(ctx, "client1, err = %s", err.Error())
161+
}
162+
wg.Done()
163+
return
164+
}
165+
}
166+
}()
167+
168+
go func() {
169+
// create Hertz client
170+
hCli, err := client.NewClient()
171+
if err != nil {
172+
hlog.Errorf("create Hertz client failed, err: %v", err)
173+
return
174+
}
175+
// inject Hertz client to create SSE client
176+
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
177+
if err != nil {
178+
hlog.Errorf("create SSE client failed, err: %v", err)
179+
return
180+
}
181+
182+
// touch off when connected to the server
183+
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
184+
hlog.Infof("client2 connect to server success")
185+
})
186+
187+
// touch off when the connection is shutdown
188+
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
189+
hlog.Infof("client2 disconnect to server success")
190+
})
191+
192+
events := make(chan *sse.Event, 10)
193+
errChan := make(chan error)
194+
go func() {
195+
// build the req sent with each SSE request
196+
req := &protocol.Request{}
197+
req.SetRequestURI("http://127.0.0.1:8888/sse")
198+
cErr := c.Subscribe(func(msg *sse.Event) {
199+
if msg.Data != nil {
200+
events <- msg
201+
return
202+
}
203+
}, sse.WithRequest(req))
204+
errChan <- cErr
205+
}()
206+
207+
streamClosed := false
208+
for {
209+
select {
210+
case e := <-events:
211+
hlog.Infof("client2, %+v", e)
212+
time.Sleep(2 * time.Second) // do something blocked
213+
// When the event ends, you should break out of the loop.
214+
if checkEventEnd(e) {
215+
wg.Done()
216+
return
217+
}
218+
case err := <-errChan:
219+
if err == nil {
220+
// err is nil means read io.EOF, stream is closed
221+
streamClosed = true
222+
hlog.Info("client2, stream closed")
223+
// continue read channel events
224+
continue
225+
}
226+
hlog.CtxErrorf(context.Background(), "client2, err = %s", err.Error())
227+
wg.Done()
228+
return
229+
default:
230+
if streamClosed {
231+
hlog.Info("client2, events is empty and stream closed")
232+
wg.Done()
233+
return
234+
}
235+
}
236+
}
237+
}()
238+
239+
wg.Wait()
210240
}
211241

212242
func checkEventEnd(e *sse.Event) bool {
213-
// check e.Data or e.Event. It depends on the definition of the server
214-
return e.Event == "end" || string(e.Data) == "end flag"
243+
// check e.Data or e.Event. It depends on the definition of the server
244+
return e.Event == "end" || string(e.Data) == "end flag"
215245
}
246+
216247
```
217248

218249
## Real-world examples

README_CN.md

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ import (
9292
"sync"
9393
"time"
9494

95+
"github.com/cloudwego/hertz/pkg/app/client"
9596
"github.com/cloudwego/hertz/pkg/common/hlog"
97+
"github.com/cloudwego/hertz/pkg/protocol"
9698

9799
"github.com/hertz-contrib/sse"
98100
)
@@ -102,28 +104,42 @@ var wg sync.WaitGroup
102104
func main() {
103105
wg.Add(2)
104106
go func() {
105-
c := sse.NewClient("http://127.0.0.1:8888/sse")
107+
// 创建 Hertz client
108+
hCli, err := client.NewClient()
109+
if err != nil {
110+
hlog.Errorf("create Hertz client failed, err: %v", err)
111+
return
112+
}
113+
// 传入 Hertz client 构建 SSE client
114+
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
115+
if err != nil {
116+
hlog.Errorf("create SSE client failed, err: %v", err)
117+
return
118+
}
106119

107120
// 连接到服务端的时候触发
108121
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
109-
hlog.Infof("client1 connect to server %s success with %s method", c.GetURL(), c.GetMethod())
122+
hlog.Infof("client1 connect to server success")
110123
})
111124

112125
// 服务端断开连接的时候触发
113126
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
114-
hlog.Infof("client1 disconnect to server %s success with %s method", c.GetURL(), c.GetMethod())
127+
hlog.Infof("client1 disconnect to server success")
115128
})
116129

117130
events := make(chan *sse.Event)
118131
errChan := make(chan error)
119132
ctx, cancel := context.WithCancel(context.Background())
120133
go func() {
134+
// 构建每次 SSE 请求发送的 req
135+
req := &protocol.Request{}
136+
req.SetRequestURI("http://127.0.0.1:8888/sse")
121137
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
122138
if msg.Data != nil {
123139
events <- msg
124140
return
125141
}
126-
})
142+
}, sse.WithRequest(req))
127143
errChan <- cErr
128144
}()
129145
go func() {
@@ -148,27 +164,41 @@ func main() {
148164
}()
149165

150166
go func() {
151-
c := sse.NewClient("http://127.0.0.1:8888/sse")
167+
// 创建 Hertz client
168+
hCli, err := client.NewClient()
169+
if err != nil {
170+
hlog.Errorf("create Hertz client failed, err: %v", err)
171+
return
172+
}
173+
// 传入 Hertz client 构建 SSE client
174+
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
175+
if err != nil {
176+
hlog.Errorf("create SSE client failed, err: %v", err)
177+
return
178+
}
152179

153180
// 连接到服务端的时候触发
154181
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
155-
hlog.Infof("client2 %s connect to server success with %s method", c.GetURL(), c.GetMethod())
182+
hlog.Infof("client2 connect to server success")
156183
})
157184

158185
// 服务端断开连接的时候触发
159186
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
160-
hlog.Infof("client2 %s disconnect to server success with %s method", c.GetURL(), c.GetMethod())
187+
hlog.Infof("client2 disconnect to server success")
161188
})
162189

163190
events := make(chan *sse.Event, 10)
164191
errChan := make(chan error)
165192
go func() {
193+
// 构建每次 SSE 请求发送的 req
194+
req := &protocol.Request{}
195+
req.SetRequestURI("http://127.0.0.1:8888/sse")
166196
cErr := c.Subscribe(func(msg *sse.Event) {
167197
if msg.Data != nil {
168198
events <- msg
169199
return
170200
}
171-
})
201+
}, sse.WithRequest(req))
172202
errChan <- cErr
173203
}()
174204

@@ -211,6 +241,7 @@ func checkEventEnd(e *sse.Event) bool {
211241
// 可以检查 e.Data 或者 e.Event, 取决于服务端的定义
212242
return e.Event == "end" || string(e.Data) == "end flag"
213243
}
244+
214245
```
215246

216247
## 真实场景示例

0 commit comments

Comments
 (0)