Skip to content

Commit 3fccc89

Browse files
committed
Events are now Clone-able to prevent read races
1 parent 58905d8 commit 3fccc89

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (c *Client) Send(ev *Event) error {
5454
if c.closed {
5555
return io.ErrClosedPipe
5656
}
57-
c.events <- ev
57+
c.events <- ev.Clone()
5858
return nil
5959
}
6060

@@ -65,7 +65,7 @@ func (c *Client) Shutdown() {
6565
}
6666

6767
// Wait blocks and waits for the client to be shutdown.
68-
// Call this is http handler threads to prevent the server from closing
68+
// Call this in http handler threads to prevent the server from closing
6969
// the client connection.
7070
func (c *Client) Wait() {
7171
c.waiter.Wait()

event.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,15 @@ func (e *Event) String() string {
132132
fullEvent, _ := ioutil.ReadAll(e)
133133
return string(fullEvent)
134134
}
135+
136+
// Clone returns a deep copy of the event
137+
func (e *Event) Clone() *Event {
138+
clone := &Event{
139+
id: e.id,
140+
event: e.event,
141+
retry: e.retry,
142+
}
143+
144+
clone.data = append(clone.data, e.data...)
145+
return clone
146+
}

stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *Stream) Unsubscribe(topic string, c *Client) {
8484
// Publish sends the event to clients that have subscribed to the given topic.
8585
func (s *Stream) Publish(topic string, e *Event) {
8686
s.listLock.RLock()
87-
defer s.listLock.Unlock()
87+
defer s.listLock.RUnlock()
8888

8989
for element := s.clients.Front(); element != nil; element = element.Next() {
9090
cli := element.Value.(*registeredClient)

0 commit comments

Comments
 (0)