Skip to content

Commit 38d7c89

Browse files
authored
Dont close stdout (#289)
* Let the (code)user handle the closing of passed writers when using loginterceptor * Simplify loginterceptor and fix tests * Unblock run goroutine when channels are full * Filter in scan to preserve channel resources * Log interception buffer limit errors with Debugf
1 parent f70d0a5 commit 38d7c89

File tree

2 files changed

+131
-137
lines changed

2 files changed

+131
-137
lines changed

loginterceptor/loginterceptor.go

Lines changed: 59 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,55 @@ import (
1111

1212
// PrefixInterceptor intercept writes: if a line begins with prefix, it will be written to
1313
// both writers. Partial writes without newline are buffered until a newline.
14+
//
15+
// Note: Callers are responsible for closing intercepted and target writers that implement io.Closer
1416
type PrefixInterceptor struct {
15-
prefixRegexp *regexp.Regexp
16-
intercepted *NonBlockingWriter
17-
target *NonBlockingWriter
18-
logger log.Logger
17+
prefixRegexp *regexp.Regexp
18+
targetCh chan string
19+
interceptedCh chan string
20+
logger log.Logger
1921

2022
// internal pipe and goroutine to scan and route
2123
internalReader *io.PipeReader
2224
internalWriter *io.PipeWriter
2325

24-
// close once
26+
// closing
2527
closeOnce sync.Once
2628
closeErr error
29+
30+
// TargetDelivered receives a single value when the target goroutine
31+
// finishes processing all messages. Callers should consume from this
32+
// channel to avoid goroutine leaks, or use Close() and ignore it.
33+
TargetDelivered <-chan struct{}
34+
// InterceptedDelivered receives a single value when the intercepted goroutine
35+
// finishes processing all messages. Callers should consume from this
36+
// channel to avoid goroutine leaks, or use Close() and ignore it.
37+
InterceptedDelivered <-chan struct{}
2738
}
2839

2940
// NewPrefixInterceptor returns an io.WriteCloser. Writes are based on line prefix.
41+
//
42+
// Note: Callers are responsible for closing intercepted and target writers that implement io.Closer
3043
func NewPrefixInterceptor(prefixRegexp *regexp.Regexp, intercepted, target io.Writer, logger log.Logger) *PrefixInterceptor {
3144
pipeReader, pipeWriter := io.Pipe()
45+
46+
targetCh := make(chan string, 10000)
47+
targetDoneCh := make(chan struct{}, 1)
48+
interceptedCh := make(chan string, 10000)
49+
interceptedDoneCh := make(chan struct{}, 1)
50+
51+
go sendingTo(targetCh, targetDoneCh, target, logger)
52+
go sendingTo(interceptedCh, interceptedDoneCh, intercepted, logger)
53+
3254
interceptor := &PrefixInterceptor{
33-
prefixRegexp: prefixRegexp,
34-
intercepted: NewNonBlockingWriter(intercepted, logger),
35-
target: NewNonBlockingWriter(target, logger),
36-
logger: logger,
37-
internalReader: pipeReader,
38-
internalWriter: pipeWriter,
55+
prefixRegexp: prefixRegexp,
56+
targetCh: targetCh,
57+
interceptedCh: interceptedCh,
58+
logger: logger,
59+
internalReader: pipeReader,
60+
internalWriter: pipeWriter,
61+
TargetDelivered: targetDoneCh,
62+
InterceptedDelivered: interceptedDoneCh,
3963
}
4064
go interceptor.run()
4165
return interceptor
@@ -55,15 +79,11 @@ func (i *PrefixInterceptor) Close() error {
5579
}
5680

5781
func (i *PrefixInterceptor) closeAfterRun() {
58-
if err := i.intercepted.Close(); err != nil {
59-
i.logger.Errorf("intercepted writer: %v", err)
60-
}
61-
if err := i.target.Close(); err != nil {
62-
i.logger.Errorf("target writer: %v", err)
63-
}
6482
if err := i.internalReader.Close(); err != nil {
6583
i.logger.Errorf("internal reader: %v", err)
6684
}
85+
close(i.targetCh)
86+
close(i.interceptedCh)
6787
}
6888

6989
// run reads lines (and partial final chunk) and writes them.
@@ -79,18 +99,21 @@ func (i *PrefixInterceptor) run() {
7999
for scanner.Scan() {
80100
line := scanner.Text() // note: newline removed
81101
// re-append newline to preserve same output format
82-
outLine := line + "\n"
102+
logLine := line + "\n"
103+
104+
select {
105+
case i.targetCh <- logLine:
106+
default:
107+
i.logger.Debugf("target channel full, dropping message")
108+
}
83109

84-
// Write to intercepted channel if matching regexp
85110
if i.prefixRegexp.MatchString(line) {
86-
if _, err := io.WriteString(i.intercepted, outLine); err != nil {
87-
i.logger.Errorf("intercept writer error: %v", err)
111+
select {
112+
case i.interceptedCh <- logLine:
113+
default:
114+
i.logger.Debugf("intercepted channel full, dropping message")
88115
}
89116
}
90-
// Always write to target channel
91-
if _, err := io.WriteString(i.target, outLine); err != nil {
92-
i.logger.Errorf("writer error: %v", err)
93-
}
94117
}
95118

96119
// handle any scanner error
@@ -99,52 +122,18 @@ func (i *PrefixInterceptor) run() {
99122
}
100123
}
101124

102-
// NonBlockingWriter is an io.Writer that writes to a wrapped io.Writer in a non-blocking way.
103-
type NonBlockingWriter struct {
104-
channel chan []byte
105-
wrapped io.Writer
106-
logger log.Logger
107-
}
108-
109-
// NewNonBlockingWriter creates a new NonBlockingWriter.
110-
func NewNonBlockingWriter(w io.Writer, logger log.Logger) *NonBlockingWriter {
111-
writer := &NonBlockingWriter{
112-
channel: make(chan []byte, 10000), // buffered channel to avoid blocking
113-
wrapped: w,
114-
logger: logger,
115-
}
116-
go writer.Run()
117-
return writer
118-
}
119-
120-
// Write implements io.Writer. It writes into an internal pipe which the interceptor goroutine consumes.
121-
func (i *NonBlockingWriter) Write(p []byte) (int, error) {
122-
select {
123-
case i.channel <- p:
124-
return len(p), nil
125-
default:
126-
i.logger.Debugf("buffer full, dropping log")
127-
return 0, nil
128-
}
129-
}
130-
131-
// Close stops the interceptor and closes the pipe.
132-
func (i *NonBlockingWriter) Close() error {
133-
close(i.channel)
134-
return nil
135-
}
136-
137-
// Run consumes the channel and writes to the wrapped writer.
138-
func (i *NonBlockingWriter) Run() {
139-
for msg := range i.channel {
140-
if _, err := i.wrapped.Write(msg); err != nil {
141-
i.logger.Errorf("NonBlockingWriter: wrapped writer error: %v", err)
125+
func sendingTo(
126+
srcCh <-chan string,
127+
done chan<- struct{},
128+
writer io.Writer,
129+
logger log.Logger,
130+
) {
131+
for msg := range srcCh {
132+
if _, err := io.WriteString(writer, msg); err != nil {
133+
logger.Errorf(" writer error: %v", err)
142134
}
143135
}
144136

145-
if closer, ok := i.wrapped.(io.Closer); ok {
146-
if err := closer.Close(); err != nil {
147-
i.logger.Errorf("NonBlockingWriter: closing wrapped writer: %v", err)
148-
}
149-
}
137+
done <- struct{}{}
138+
close(done)
150139
}
Lines changed: 72 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package loginterceptor_test
22

33
import (
4-
"bytes"
54
"io"
65
"regexp"
76
"sync"
@@ -12,45 +11,42 @@ import (
1211
"github.com/stretchr/testify/assert"
1312
)
1413

14+
const (
15+
msg1 = "Log message without prefix\n"
16+
msg2 = "[Bitrise Analytics] Log message with prefixs\n"
17+
msg3 = "[Bitrise Build Cache] Log message with prefix\n"
18+
msg4 = "Stuff [Bitrise Build Cache] Log message without prefix\n"
19+
)
20+
1521
func TestPrefixInterceptor(t *testing.T) {
16-
interceptReader, interceptWriter := io.Pipe()
17-
targetReader, targetWriter := io.Pipe()
22+
interceptedMsgs := NewChanWriterCloser()
23+
targetMsgs := NewChanWriterCloser()
1824
re := regexp.MustCompile(`^\[Bitrise.*\].*`)
1925

20-
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, targetWriter, log.NewLogger())
21-
22-
msg1 := "Log message without prefix\n"
23-
msg2 := "[Bitrise Analytics] Log message with prefix\n"
24-
msg3 := "[Bitrise Build Cache] Log message with prefix\n"
25-
msg4 := "Stuff [Bitrise Build Cache] Log message without prefix\n"
26+
sut := loginterceptor.NewPrefixInterceptor(re, &interceptedMsgs, &targetMsgs, log.NewLogger())
2627

2728
go func() {
28-
//nolint:errCheck
29-
defer sut.Close()
30-
29+
defer func() { _ = sut.Close() }()
3130
_, _ = sut.Write([]byte(msg1))
3231
_, _ = sut.Write([]byte(msg2))
3332
_, _ = sut.Write([]byte(msg3))
3433
_, _ = sut.Write([]byte(msg4))
3534
}()
3635

37-
intercepted, target, err := readTwo(interceptReader, targetReader)
38-
assert.NoError(t, err)
39-
assert.Equal(t, msg2+msg3, string(intercepted))
40-
assert.Equal(t, msg1+msg2+msg3+msg4, string(target))
36+
waitForBoth(sut)
37+
_ = interceptedMsgs.Close()
38+
_ = targetMsgs.Close()
39+
40+
assert.Equal(t, msg2+msg3, interceptedMsgs.Messages())
41+
assert.Equal(t, msg1+msg2+msg3+msg4, targetMsgs.Messages())
4142
}
4243

4344
func TestPrefixInterceptorWithPrematureClose(t *testing.T) {
44-
interceptReader, interceptWriter := io.Pipe()
45-
targetReader, targetWriter := io.Pipe()
45+
interceptedMsgs := NewChanWriterCloser()
46+
targetMsgs := NewChanWriterCloser()
4647
re := regexp.MustCompile(`^\[Bitrise.*\].*`)
4748

48-
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, targetWriter, log.NewLogger())
49-
50-
msg1 := "Log message without prefix\n"
51-
msg2 := "[Bitrise Analytics] Log message with prefix\n"
52-
msg3 := "[Bitrise Build Cache] Log message with prefix\n"
53-
msg4 := "Last message that won't be sent\n"
49+
sut := loginterceptor.NewPrefixInterceptor(re, &interceptedMsgs, &targetMsgs, log.NewLogger())
5450

5551
go func() {
5652
_, _ = sut.Write([]byte(msg1))
@@ -60,71 +56,80 @@ func TestPrefixInterceptorWithPrematureClose(t *testing.T) {
6056
_, _ = sut.Write([]byte(msg4))
6157
}()
6258

63-
intercepted, target, err := readTwo(interceptReader, targetReader)
64-
assert.NoError(t, err)
65-
assert.Equal(t, msg2+msg3, string(intercepted))
66-
assert.Equal(t, msg1+msg2+msg3, string(target))
59+
waitForBoth(sut)
60+
_ = interceptedMsgs.Close()
61+
_ = targetMsgs.Close()
62+
63+
assert.Equal(t, msg2+msg3, interceptedMsgs.Messages())
64+
assert.Equal(t, msg1+msg2+msg3, targetMsgs.Messages())
6765
}
6866

6967
func TestPrefixInterceptorWithBlockedPipe(t *testing.T) {
70-
interceptReader, interceptWriter := io.Pipe()
71-
targetReader, targetWriter := io.Pipe()
68+
_, interceptWriter := io.Pipe()
69+
targetMsgs := NewChanWriterCloser()
7270
re := regexp.MustCompile(`^\[Bitrise.*\].*`)
7371

74-
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, targetWriter, log.NewLogger())
75-
76-
msg1 := "Log message without prefix\n"
77-
msg2 := "[Bitrise Analytics] Log message with prefix\n"
78-
msg3 := "[Bitrise Build Cache] Log message with prefix\n"
79-
msg4 := "Stuff [Bitrise Build Cache] Log message without prefix\n"
72+
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, &targetMsgs, log.NewLogger())
8073

8174
go func() {
82-
//nolint:errCheck
83-
defer sut.Close()
84-
8575
_, _ = sut.Write([]byte(msg1))
8676
_, _ = sut.Write([]byte(msg2))
8777
_, _ = sut.Write([]byte(msg3))
78+
_ = sut.Close()
8879
_, _ = sut.Write([]byte(msg4))
8980
}()
9081

91-
target, err := io.ReadAll(targetReader)
92-
assert.NoError(t, err)
93-
assert.Equal(t, msg1+msg2+msg3+msg4, string(target))
82+
<-sut.TargetDelivered
83+
_ = targetMsgs.Close()
9484

95-
// Reading from interceptReader should block until targetWriter is read
96-
intercepted, err := io.ReadAll(interceptReader)
97-
assert.NoError(t, err)
98-
assert.Equal(t, msg2+msg3, string(intercepted))
85+
assert.Equal(t, msg1+msg2+msg3, targetMsgs.Messages())
9986
}
10087

101-
func readTwo(r1, r2 io.Reader) (out1, out2 []byte, err error) {
102-
var (
103-
wg sync.WaitGroup
104-
e1, e2 error
105-
)
88+
// --------------------------------
89+
// Helpers
90+
// --------------------------------
91+
func waitForBoth(sut *loginterceptor.PrefixInterceptor) {
92+
var wg sync.WaitGroup
10693
wg.Add(2)
10794

108-
var b1, b2 bytes.Buffer
109-
110-
go func() {
111-
defer wg.Done()
112-
_, e1 = io.Copy(&b1, r1)
113-
}()
95+
go func(wg *sync.WaitGroup) {
96+
<-sut.TargetDelivered
97+
wg.Done()
98+
}(&wg)
11499

115-
go func() {
116-
defer wg.Done()
117-
_, e2 = io.Copy(&b2, r2)
118-
}()
100+
go func(wg *sync.WaitGroup) {
101+
<-sut.InterceptedDelivered
102+
wg.Done()
103+
}(&wg)
119104

120105
wg.Wait()
106+
}
107+
108+
type ChanWriterCloser struct {
109+
channel chan string
110+
}
121111

122-
// prefer to return the first non-nil error
123-
if e1 != nil {
124-
return b1.Bytes(), b2.Bytes(), e1
112+
func NewChanWriterCloser() ChanWriterCloser {
113+
return ChanWriterCloser{
114+
channel: make(chan string, 1000),
125115
}
126-
if e2 != nil {
127-
return b1.Bytes(), b2.Bytes(), e2
116+
}
117+
118+
func (ch *ChanWriterCloser) Write(p []byte) (int, error) {
119+
ch.channel <- string(p)
120+
return len(p), nil
121+
}
122+
123+
// Close stops the interceptor and closes the pipe.
124+
func (ch *ChanWriterCloser) Close() error {
125+
close(ch.channel)
126+
return nil
127+
}
128+
129+
func (ch *ChanWriterCloser) Messages() string {
130+
var result string
131+
for msg := range ch.channel {
132+
result += msg
128133
}
129-
return b1.Bytes(), b2.Bytes(), nil
134+
return result
130135
}

0 commit comments

Comments
 (0)