Skip to content

Commit f70d0a5

Browse files
authored
[ACI-4154] Log interceptor potential deadlock fix (#286)
* Resolve deadlock by write calls with independent timeouts * Cleanup comments, imports * Cancel context of writes sooner * Reduce expectations in case of slow writes * Use input timeout in both writer calls * Make more concise non blocking writer * Use byte array in NonBlockingWriter * Change the write method to match change in channel type * Make buffer full errors debug only and raise buffer size
1 parent cdc586f commit f70d0a5

File tree

6 files changed

+124
-20
lines changed

6 files changed

+124
-20
lines changed

autocodesign/devportalclient/appstoreconnect/appstoreconnect.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ func (c *Client) Do(req *http.Request, v interface{}) (*http.Response, error) {
283283

284284
c.tracker.TrackAPIRequest(req.Method, req.URL.Host, req.URL.Path, resp.StatusCode, duration)
285285

286-
287286
if v != nil {
288287
decErr := json.NewDecoder(resp.Body).Decode(v)
289288
if decErr == io.EOF {

destination/device_finder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7-
"github.com/bitrise-io/go-utils/errorutil"
8-
"github.com/bitrise-io/go-utils/retry"
97
"os"
108
"time"
119

10+
"github.com/bitrise-io/go-utils/errorutil"
11+
"github.com/bitrise-io/go-utils/retry"
12+
1213
"github.com/bitrise-io/go-utils/v2/command"
1314
"github.com/bitrise-io/go-utils/v2/log"
1415
"github.com/bitrise-io/go-xcode/v2/xcodeversion"

destination/parse.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package destination
22

33
import (
44
"fmt"
5+
"os"
6+
"strings"
7+
58
"github.com/bitrise-io/go-utils/errorutil"
69
"github.com/bitrise-io/go-utils/v2/command"
710
"github.com/hashicorp/go-version"
8-
"os"
9-
"strings"
1011
)
1112

1213
// DeviceList ...

loginterceptor/loginterceptor.go

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
// both writers. Partial writes without newline are buffered until a newline.
1414
type PrefixInterceptor struct {
1515
prefixRegexp *regexp.Regexp
16-
intercepted io.Writer
17-
target io.Writer
16+
intercepted *NonBlockingWriter
17+
target *NonBlockingWriter
1818
logger log.Logger
1919

2020
// internal pipe and goroutine to scan and route
@@ -31,8 +31,8 @@ func NewPrefixInterceptor(prefixRegexp *regexp.Regexp, intercepted, target io.Wr
3131
pipeReader, pipeWriter := io.Pipe()
3232
interceptor := &PrefixInterceptor{
3333
prefixRegexp: prefixRegexp,
34-
intercepted: intercepted,
35-
target: target,
34+
intercepted: NewNonBlockingWriter(intercepted, logger),
35+
target: NewNonBlockingWriter(target, logger),
3636
logger: logger,
3737
internalReader: pipeReader,
3838
internalWriter: pipeWriter,
@@ -55,18 +55,12 @@ func (i *PrefixInterceptor) Close() error {
5555
}
5656

5757
func (i *PrefixInterceptor) closeAfterRun() {
58-
// Close writers if able
59-
if interceptedCloser, ok := i.intercepted.(io.Closer); ok {
60-
if err := interceptedCloser.Close(); err != nil {
61-
i.logger.Errorf("closing intercepted writer: %v", err)
62-
}
58+
if err := i.intercepted.Close(); err != nil {
59+
i.logger.Errorf("intercepted writer: %v", err)
6360
}
64-
if originalCloser, ok := i.target.(io.Closer); ok {
65-
if err := originalCloser.Close(); err != nil {
66-
i.logger.Errorf("closing original writer: %v", err)
67-
}
61+
if err := i.target.Close(); err != nil {
62+
i.logger.Errorf("target writer: %v", err)
6863
}
69-
7064
if err := i.internalReader.Close(); err != nil {
7165
i.logger.Errorf("internal reader: %v", err)
7266
}
@@ -104,3 +98,53 @@ func (i *PrefixInterceptor) run() {
10498
i.logger.Errorf("router scanner error: %v\n", err)
10599
}
106100
}
101+
102+
// NonBlockingWriter is an io.Writer that writes to a wrapped io.Writer in a non-blocking way.
103+
type NonBlockingWriter struct {
104+
channel chan []byte
105+
wrapped io.Writer
106+
logger log.Logger
107+
}
108+
109+
// NewNonBlockingWriter creates a new NonBlockingWriter.
110+
func NewNonBlockingWriter(w io.Writer, logger log.Logger) *NonBlockingWriter {
111+
writer := &NonBlockingWriter{
112+
channel: make(chan []byte, 10000), // buffered channel to avoid blocking
113+
wrapped: w,
114+
logger: logger,
115+
}
116+
go writer.Run()
117+
return writer
118+
}
119+
120+
// Write implements io.Writer. It writes into an internal pipe which the interceptor goroutine consumes.
121+
func (i *NonBlockingWriter) Write(p []byte) (int, error) {
122+
select {
123+
case i.channel <- p:
124+
return len(p), nil
125+
default:
126+
i.logger.Debugf("buffer full, dropping log")
127+
return 0, nil
128+
}
129+
}
130+
131+
// Close stops the interceptor and closes the pipe.
132+
func (i *NonBlockingWriter) Close() error {
133+
close(i.channel)
134+
return nil
135+
}
136+
137+
// Run consumes the channel and writes to the wrapped writer.
138+
func (i *NonBlockingWriter) Run() {
139+
for msg := range i.channel {
140+
if _, err := i.wrapped.Write(msg); err != nil {
141+
i.logger.Errorf("NonBlockingWriter: wrapped writer error: %v", err)
142+
}
143+
}
144+
145+
if closer, ok := i.wrapped.(io.Closer); ok {
146+
if err := closer.Close(); err != nil {
147+
i.logger.Errorf("NonBlockingWriter: closing wrapped writer: %v", err)
148+
}
149+
}
150+
}

loginterceptor/loginterceptor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,64 @@ func TestPrefixInterceptor(t *testing.T) {
4040
assert.Equal(t, msg1+msg2+msg3+msg4, string(target))
4141
}
4242

43+
func TestPrefixInterceptorWithPrematureClose(t *testing.T) {
44+
interceptReader, interceptWriter := io.Pipe()
45+
targetReader, targetWriter := io.Pipe()
46+
re := regexp.MustCompile(`^\[Bitrise.*\].*`)
47+
48+
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, targetWriter, log.NewLogger())
49+
50+
msg1 := "Log message without prefix\n"
51+
msg2 := "[Bitrise Analytics] Log message with prefix\n"
52+
msg3 := "[Bitrise Build Cache] Log message with prefix\n"
53+
msg4 := "Last message that won't be sent\n"
54+
55+
go func() {
56+
_, _ = sut.Write([]byte(msg1))
57+
_, _ = sut.Write([]byte(msg2))
58+
_, _ = sut.Write([]byte(msg3))
59+
_ = sut.Close()
60+
_, _ = sut.Write([]byte(msg4))
61+
}()
62+
63+
intercepted, target, err := readTwo(interceptReader, targetReader)
64+
assert.NoError(t, err)
65+
assert.Equal(t, msg2+msg3, string(intercepted))
66+
assert.Equal(t, msg1+msg2+msg3, string(target))
67+
}
68+
69+
func TestPrefixInterceptorWithBlockedPipe(t *testing.T) {
70+
interceptReader, interceptWriter := io.Pipe()
71+
targetReader, targetWriter := io.Pipe()
72+
re := regexp.MustCompile(`^\[Bitrise.*\].*`)
73+
74+
sut := loginterceptor.NewPrefixInterceptor(re, interceptWriter, targetWriter, log.NewLogger())
75+
76+
msg1 := "Log message without prefix\n"
77+
msg2 := "[Bitrise Analytics] Log message with prefix\n"
78+
msg3 := "[Bitrise Build Cache] Log message with prefix\n"
79+
msg4 := "Stuff [Bitrise Build Cache] Log message without prefix\n"
80+
81+
go func() {
82+
//nolint:errCheck
83+
defer sut.Close()
84+
85+
_, _ = sut.Write([]byte(msg1))
86+
_, _ = sut.Write([]byte(msg2))
87+
_, _ = sut.Write([]byte(msg3))
88+
_, _ = sut.Write([]byte(msg4))
89+
}()
90+
91+
target, err := io.ReadAll(targetReader)
92+
assert.NoError(t, err)
93+
assert.Equal(t, msg1+msg2+msg3+msg4, string(target))
94+
95+
// Reading from interceptReader should block until targetWriter is read
96+
intercepted, err := io.ReadAll(interceptReader)
97+
assert.NoError(t, err)
98+
assert.Equal(t, msg2+msg3, string(intercepted))
99+
}
100+
43101
func readTwo(r1, r2 io.Reader) (out1, out2 []byte, err error) {
44102
var (
45103
wg sync.WaitGroup

xcodecache/swiftpm_cache.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
// SwiftPackagesStateInvalid is the partial error message printed out if swift packages cache is invalid.
1313
// Can be used to detect invalid state and clear the path returned by SwiftPackagesPath.
1414
// xcodebuild: error: Could not resolve package dependencies:
15-
// The repository at [path] is invalid; try resetting package caches
15+
//
16+
// The repository at [path] is invalid; try resetting package caches
1617
const SwiftPackagesStateInvalid = "Could not resolve package dependencies:"
1718

1819
// SwiftPackageCache ...

0 commit comments

Comments
 (0)