Skip to content

Commit e217890

Browse files
committed
fix #119
1 parent bdbbc1f commit e217890

File tree

1 file changed

+72
-17
lines changed

1 file changed

+72
-17
lines changed

statsviz.go

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"path/filepath"
5050
"strconv"
5151
"strings"
52+
"sync"
5253
"time"
5354

5455
"github.com/arl/statsviz/internal/plot"
@@ -90,6 +91,8 @@ type Server struct {
9091
root string // HTTP path root
9192
plots *plot.List // plots shown on the user interface
9293
userPlots []plot.UserPlot
94+
lock sync.Mutex
95+
onData []func([]byte) error
9396
}
9497

9598
// NewServer constructs a new Statsviz Server with the provided options, or the
@@ -272,32 +275,84 @@ func (s *Server) Ws() http.HandlerFunc {
272275
}
273276

274277
func (s *Server) startTransfer(w io.Writer) {
275-
buffer := bytes.Buffer{}
276-
buffer.WriteString("data: ")
277-
callData := func() error {
278-
if err := s.plots.WriteValues(&buffer); err == nil {
279-
_, err = w.Write(buffer.Bytes())
280-
if err != nil {
281-
return err
282-
}
283-
if f, ok := w.(http.Flusher); ok {
284-
f.Flush()
285-
}
286-
} else {
278+
s.lock.Lock()
279+
c := make(chan struct{})
280+
s.onData = append(s.onData, func(data []byte) error {
281+
_, err := w.Write(data)
282+
if err != nil {
283+
close(c)
287284
return err
288285
}
286+
if f, ok := w.(http.Flusher); ok {
287+
f.Flush()
288+
}
289289
return nil
290+
})
291+
if len(s.onData) == 1 {
292+
go s.callData()
290293
}
291-
//the first time it was sent immediately
292-
err := callData()
293-
if err != nil {
294-
return
294+
s.lock.Unlock()
295+
<-c
296+
}
297+
func (s *Server) callData() {
298+
{
299+
//the first time it was sent immediately
300+
buffer := bytes.Buffer{}
301+
buffer.WriteString("data: ")
302+
if err := s.plots.WriteValues(&buffer); err == nil {
303+
buffer.WriteString("\n\n")
304+
err = s.onData[0](buffer.Bytes())
305+
if err != nil {
306+
s.lock.Lock()
307+
if len(s.onData) == 1 {
308+
s.onData = nil
309+
s.lock.Unlock()
310+
return
311+
} else {
312+
s.onData = s.onData[1:]
313+
}
314+
s.lock.Unlock()
315+
}
316+
}
295317
}
296318
tick := time.NewTicker(s.intv)
297319
defer tick.Stop()
298320
for range tick.C {
299-
if callData() != nil {
321+
buffer := bytes.Buffer{}
322+
buffer.WriteString("data: ")
323+
if err := s.plots.WriteValues(&buffer); err != nil {
324+
//maybe all connections should be closed?
325+
//fmt.Println("Error plots.WriteValues:", err)
326+
continue
327+
}
328+
buffer.WriteString("\n\n")
329+
onData := s.onData
330+
del := false
331+
for i, f := range onData {
332+
if err := f(buffer.Bytes()); err != nil {
333+
del = true
334+
onData[i] = nil
335+
continue
336+
}
337+
}
338+
if del {
339+
s.lock.Lock()
340+
l := len(onData)
341+
for i := 0; i < l; i++ {
342+
if onData[i] == nil {
343+
onData = append(onData[:i], onData[i+1:]...)
344+
i--
345+
l--
346+
}
347+
}
348+
s.onData = onData
349+
s.lock.Unlock()
350+
}
351+
s.lock.Lock()
352+
if len(s.onData) == 0 {
353+
s.lock.Unlock()
300354
return
301355
}
356+
s.lock.Unlock()
302357
}
303358
}

0 commit comments

Comments
 (0)