@@ -20,6 +20,7 @@ import (
2020 "fmt"
2121 "io"
2222 "log"
23+ "strings"
2324 "time"
2425)
2526
@@ -46,9 +47,10 @@ type JournalReaderConfig struct {
4647}
4748
4849// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
49- // systemd journal.
50+ // systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
5051type JournalReader struct {
51- journal * Journal
52+ journal * Journal
53+ msgReader * strings.Reader
5254}
5355
5456// NewJournalReader creates a new JournalReader with configuration options that are similar to the
@@ -101,35 +103,60 @@ func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
101103 return r , nil
102104}
103105
106+ // Read reads entries from the journal. Read follows the Reader interface so
107+ // it must be able to read a specific amount of bytes. Journald on the other
108+ // hand only allows us to read full entries of arbitrary size (without byte
109+ // granularity). JournalReader is therefore internally buffering entries that
110+ // don't fit in the read buffer. Callers should keep calling until 0 and/or an
111+ // error is returned.
104112func (r * JournalReader ) Read (b []byte ) (int , error ) {
105113 var err error
106- var c int
107114
108- // Advance the journal cursor
109- c , err = r . journal . Next ()
115+ if r . msgReader == nil {
116+ var c int
110117
111- // An unexpected error
112- if err != nil {
113- return 0 , err
114- }
118+ // Advance the journal cursor. It has to be called at least one time
119+ // before reading
120+ c , err = r .journal .Next ()
115121
116- // EOF detection
117- if c == 0 {
118- return 0 , io . EOF
119- }
122+ // An unexpected error
123+ if err != nil {
124+ return 0 , err
125+ }
120126
121- // Build a message
122- var msg string
123- msg , err = r .buildMessage ()
127+ // EOF detection
128+ if c == 0 {
129+ return 0 , io .EOF
130+ }
124131
125- if err != nil {
126- return 0 , err
132+ // Build a message
133+ var msg string
134+ msg , err = r .buildMessage ()
135+
136+ if err != nil {
137+ return 0 , err
138+ }
139+ r .msgReader = strings .NewReader (msg )
127140 }
128141
129142 // Copy and return the message
130- copy (b , []byte (msg ))
143+ var sz int
144+ sz , err = r .msgReader .Read (b )
145+ if err == io .EOF {
146+ // The current entry has been fully read. Don't propagate this
147+ // EOF, so the next entry can be read at the next Read()
148+ // iteration.
149+ r .msgReader = nil
150+ return sz , nil
151+ }
152+ if err != nil {
153+ return sz , err
154+ }
155+ if r .msgReader .Len () == 0 {
156+ r .msgReader = nil
157+ }
131158
132- return len ( msg ) , nil
159+ return sz , nil
133160}
134161
135162// Close closes the JournalReader's handle to the journal.
@@ -139,6 +166,7 @@ func (r *JournalReader) Close() error {
139166
140167// Rewind attempts to rewind the JournalReader to the first entry.
141168func (r * JournalReader ) Rewind () error {
169+ r .msgReader = nil
142170 return r .journal .SeekHead ()
143171}
144172
0 commit comments