Skip to content

Commit bb69a3d

Browse files
committed
sdjournal: Fixed race conditions in JournalReader.Follow
JournalReader.Follow needs to wait for r.Journal.Wait to return, otherwise we're opening the door for several race conditions. The JournalReader could either be re-used or closed while Wait is still running, which seems to result in undefined behavior. Commonly Wait will return a non-existing, negative status code in such cases. Since Wait might return after we stopped reading from waitCh, it's now a buffered channel of size 1, so it'll never dead-lock in such situations. Read errors are now correctly propagated to the caller, since err isn't shadowing the named return value any longer. We also abort when receiving unknown status codes from Wait now, to prevent infinitely looping over it.
1 parent d7b55e1 commit bb69a3d

File tree

1 file changed

+16
-13
lines changed

1 file changed

+16
-13
lines changed

sdjournal/read.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"log"
2323
"strings"
24+
"sync"
2425
"time"
2526
)
2627

@@ -195,20 +196,20 @@ func (r *JournalReader) Rewind() error {
195196

196197
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
197198
// follow will continue until a single time.Time is received on the until channel.
198-
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
199+
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
199200

200201
// Process journal entries and events. Entries are flushed until the tail or
201202
// timeout is reached, and then we wait for new events or the timeout.
202203
var msg = make([]byte, 64*1<<(10))
203-
var waitCh = make(chan int)
204-
var waitStop = make(chan bool)
205-
defer close(waitStop)
204+
var waitCh = make(chan int, 1)
205+
var waitGroup sync.WaitGroup
206+
defer waitGroup.Wait()
206207

207208
process:
208209
for {
209210
c, err := r.Read(msg)
210211
if err != nil && err != io.EOF {
211-
break process
212+
return err
212213
}
213214

214215
select {
@@ -218,7 +219,7 @@ process:
218219
}
219220
if c > 0 {
220221
if _, err = writer.Write(msg[:c]); err != nil {
221-
break process
222+
return err
222223
}
223224
continue process
224225
}
@@ -227,11 +228,11 @@ process:
227228
// Holds journal events to process. Tightly bounded for now unless there's a
228229
// reason to unblock the journal watch routine more quickly.
229230
for {
231+
waitGroup.Add(1)
230232
go func() {
231-
select {
232-
case <-waitStop:
233-
case waitCh <- r.journal.Wait(1 * time.Second):
234-
}
233+
status := r.journal.Wait(100 * time.Millisecond)
234+
waitCh <- status
235+
waitGroup.Done()
235236
}()
236237

237238
select {
@@ -244,13 +245,15 @@ process:
244245
case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
245246
continue process
246247
default:
247-
log.Printf("Received unknown event: %d\n", e)
248+
if e < 0 {
249+
return fmt.Errorf("received error event: %d", e)
250+
}
251+
252+
log.Printf("received unknown event: %d\n", e)
248253
}
249254
}
250255
}
251256
}
252-
253-
return
254257
}
255258

256259
// simpleMessageFormatter is the default formatter.

0 commit comments

Comments
 (0)