File tree Expand file tree Collapse file tree 1 file changed +50
-5
lines changed
Expand file tree Collapse file tree 1 file changed +50
-5
lines changed Original file line number Diff line number Diff line change 11package server
22
33import (
4+ "bytes"
5+ "context"
46 "fmt"
7+ "runtime"
8+ "sync"
59
610 "github.com/iwanhae/kuview/pkg/controller"
711 "github.com/labstack/echo/v4"
@@ -43,11 +47,8 @@ func (s *Server) subscribe(c echo.Context) error {
4347 }()
4448
4549 // 2. Send the snapshot to the client.
46- for _ , v := range cache {
47- evt := Event {
48- Data : eventAsJSON (v ),
49- }
50- if err := evt .MarshalTo (w ); err != nil {
50+ for v := range s .encodeEventsParallel (c .Request ().Context (), cache ) {
51+ if _ , err := w .Write (v ); err != nil {
5152 return err
5253 }
5354 }
@@ -92,3 +93,47 @@ func (s *Server) Emit(v *controller.Event) {
9293 // it must be sent after the cache is updated
9394 s .evtCh <- v
9495}
96+
97+ func (s * Server ) encodeEventsParallel (ctx context.Context , cache []* controller.Event ) <- chan []byte {
98+ ch := make (chan []byte , 10 * runtime .NumCPU ())
99+
100+ q := make (chan * controller.Event , 10 * runtime .NumCPU ())
101+ go func () {
102+ defer close (q )
103+ for _ , v := range cache {
104+ q <- v
105+ }
106+ }()
107+
108+ var wg sync.WaitGroup
109+ for i := 0 ; i < runtime .NumCPU (); i ++ {
110+ wg .Add (1 )
111+ go func () {
112+ defer wg .Done ()
113+ for {
114+ select {
115+ case <- ctx .Done ():
116+ return
117+ case v , ok := <- q :
118+ if ! ok {
119+ return
120+ }
121+ evt := Event {Data : eventAsJSON (v )}
122+ buf := bytes .NewBuffer (nil )
123+
124+ if err := evt .MarshalTo (buf ); err != nil {
125+ log .Error ().Err (err ).Msg ("failed to marshal event to buffer" )
126+ continue
127+ }
128+ ch <- buf .Bytes ()
129+ }
130+ }
131+ }()
132+ }
133+ go func () {
134+ wg .Wait ()
135+ close (ch )
136+ }()
137+
138+ return ch
139+ }
You can’t perform that action at this time.
0 commit comments