@@ -23,6 +23,8 @@ go get github.com/hertz-contrib/sse
2323
2424### Server
2525
26+ see: [ examples/server/quickstart/main.go] ( examples/server/quickstart/main.go )
27+
2628``` go
2729package main
2830
@@ -31,11 +33,11 @@ import (
3133 " net/http"
3234 " time"
3335
34- " github.com/hertz-contrib/sse"
35-
3636 " github.com/cloudwego/hertz/pkg/app"
3737 " github.com/cloudwego/hertz/pkg/app/server"
3838 " github.com/cloudwego/hertz/pkg/common/hlog"
39+
40+ " github.com/hertz-contrib/sse"
3941)
4042
4143func main () {
@@ -49,6 +51,9 @@ func main() {
4951 // you must set status code and response headers before first render call
5052 c.SetStatusCode (http.StatusOK )
5153 s := sse.NewStream (c)
54+
55+ count := 0
56+ sendCountLimit := 10
5257 for t := range time.NewTicker (1 * time.Second ).C {
5358 event := &sse.Event {
5459 Event: " timestamp" ,
@@ -58,34 +63,47 @@ func main() {
5863 if err != nil {
5964 return
6065 }
66+ count++
67+ if count >= sendCountLimit {
68+ // send end flag to client
69+ err := s.Publish (&sse.Event {
70+ Event: " end" ,
71+ Data: []byte (" end flag" ),
72+ })
73+ if err != nil {
74+ return
75+ }
76+ break
77+ }
6178 }
6279 })
6380
6481 h.Spin ()
6582}
66-
6783```
6884
6985### Client
7086
87+ see: [ examples/client/quickstart/main.go] ( examples/client/quickstart/main.go )
88+
7189``` go
7290package main
7391
7492import (
7593 " context"
7694 " sync"
77-
78- " github.com/hertz-contrib/sse"
95+ " time"
7996
8097 " github.com/cloudwego/hertz/pkg/common/hlog"
98+
99+ " github.com/hertz-contrib/sse"
81100)
82101
83102var wg sync.WaitGroup
84103
85104func main () {
86105 wg.Add (2 )
87106 go func () {
88- // pass in the server-side URL to initialize the client
89107 c := sse.NewClient (" http://127.0.0.1:8888/sse" )
90108
91109 // touch off when connected to the server
@@ -100,29 +118,38 @@ func main() {
100118
101119 events := make (chan *sse.Event )
102120 errChan := make (chan error )
121+ ctx , cancel := context.WithCancel (context.Background ())
103122 go func () {
104- cErr := c.Subscribe ( func (msg *sse.Event ) {
123+ cErr := c.SubscribeWithContext (ctx, func (msg *sse.Event ) {
105124 if msg.Data != nil {
106125 events <- msg
107126 return
108127 }
109128 })
110129 errChan <- cErr
111130 }()
131+ go func () {
132+ time.Sleep (5 * time.Second )
133+ cancel ()
134+ hlog.Info (" client1 subscribe cancel" )
135+ }()
112136 for {
113137 select {
114138 case e := <- events:
115- hlog.Info ( e)
139+ hlog.Infof ( " client1, %+v " , e)
116140 case err := <- errChan:
117- hlog.CtxErrorf (context.Background (), " err = %s " , err.Error ())
141+ if err == nil {
142+ hlog.Info (" client1, ctx done, read stop" )
143+ } else {
144+ hlog.CtxErrorf (ctx, " client1, err = %s " , err.Error ())
145+ }
118146 wg.Done ()
119147 return
120148 }
121149 }
122150 }()
123151
124152 go func () {
125- // pass in the server-side URL to initialize the client
126153 c := sse.NewClient (" http://127.0.0.1:8888/sse" )
127154
128155 // touch off when connected to the server
@@ -135,32 +162,57 @@ func main() {
135162 hlog.Infof (" client2 %s disconnect to server success with %s method" , c.GetURL (), c.GetMethod ())
136163 })
137164
138- events := make (chan *sse.Event )
165+ events := make (chan *sse.Event , 10 )
139166 errChan := make (chan error )
140167 go func () {
141- cErr := c.Subscribe ( func (msg *sse.Event ) {
168+ cErr := c.Subscribe (func (msg *sse.Event ) {
142169 if msg.Data != nil {
143170 events <- msg
144171 return
145172 }
146173 })
147174 errChan <- cErr
148175 }()
176+
177+ streamClosed := false
149178 for {
150179 select {
151180 case e := <- events:
152- hlog.Info (e)
181+ hlog.Infof (" client2, %+v " , e)
182+ time.Sleep (2 * time.Second ) // do something blocked
183+ // When the event ends, you should break out of the loop.
184+ if checkEventEnd (e) {
185+ wg.Done ()
186+ return
187+ }
153188 case err := <- errChan:
154- hlog.CtxErrorf (context.Background (), " err = %s " , err.Error ())
189+ if err == nil {
190+ // err is nil means read io.EOF, stream is closed
191+ streamClosed = true
192+ hlog.Info (" client2, stream closed" )
193+ // continue read channel events
194+ continue
195+ }
196+ hlog.CtxErrorf (context.Background (), " client2, err = %s " , err.Error ())
155197 wg.Done ()
156198 return
199+ default :
200+ if streamClosed {
201+ hlog.Info (" client2, events is empty and stream closed" )
202+ wg.Done ()
203+ return
204+ }
157205 }
158206 }
159207 }()
160208
161209 wg.Wait ()
162210}
163211
212+ func checkEventEnd (e *sse .Event ) bool {
213+ // check e.Data or e.Event. It depends on the definition of the server
214+ return e.Event == " end" || string (e.Data ) == " end flag"
215+ }
164216```
165217
166218## Real-world examples
0 commit comments