Skip to content

Commit d1b7d05

Browse files
authored
Merge pull request #273 from muesli/sdjournal-race-conditions
sdjournal: Fixed race conditions in JournalReader.Follow
2 parents 597ab4b + bb69a3d commit d1b7d05

File tree

1 file changed

+16
-13
lines changed

1 file changed

+16
-13
lines changed

sdjournal/read.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"log"
2323
"strings"
24+
"sync"
2425
"time"
2526
)
2627

@@ -195,20 +196,20 @@ func (r *JournalReader) Rewind() error {
195196

196197
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
197198
// follow will continue until a single time.Time is received on the until channel.
198-
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
199+
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
199200

200201
// Process journal entries and events. Entries are flushed until the tail or
201202
// timeout is reached, and then we wait for new events or the timeout.
202203
var msg = make([]byte, 64*1<<(10))
203-
var waitCh = make(chan int)
204-
var waitStop = make(chan bool)
205-
defer close(waitStop)
204+
var waitCh = make(chan int, 1)
205+
var waitGroup sync.WaitGroup
206+
defer waitGroup.Wait()
206207

207208
process:
208209
for {
209210
c, err := r.Read(msg)
210211
if err != nil && err != io.EOF {
211-
break process
212+
return err
212213
}
213214

214215
select {
@@ -218,7 +219,7 @@ process:
218219
}
219220
if c > 0 {
220221
if _, err = writer.Write(msg[:c]); err != nil {
221-
break process
222+
return err
222223
}
223224
continue process
224225
}
@@ -227,11 +228,11 @@ process:
227228
// Holds journal events to process. Tightly bounded for now unless there's a
228229
// reason to unblock the journal watch routine more quickly.
229230
for {
231+
waitGroup.Add(1)
230232
go func() {
231-
select {
232-
case <-waitStop:
233-
case waitCh <- r.journal.Wait(1 * time.Second):
234-
}
233+
status := r.journal.Wait(100 * time.Millisecond)
234+
waitCh <- status
235+
waitGroup.Done()
235236
}()
236237

237238
select {
@@ -244,13 +245,15 @@ process:
244245
case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
245246
continue process
246247
default:
247-
log.Printf("Received unknown event: %d\n", e)
248+
if e < 0 {
249+
return fmt.Errorf("received error event: %d", e)
250+
}
251+
252+
log.Printf("received unknown event: %d\n", e)
248253
}
249254
}
250255
}
251256
}
252-
253-
return
254257
}
255258

256259
// simpleMessageFormatter is the default formatter.

0 commit comments

Comments
 (0)