Skip to content

Commit ea75b50

Browse files
authored
[extension/awslogsencodingextension] Fix concurrent usage of gzip reader (open-telemetry#40838)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The gzip reader was returned to the pool before we finished using it, which could cause multiple go routines using the same reader. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue N/A. <!--Describe what testing was performed and which tests were added.--> #### Testing There was one unit test added to check this isn't happening anymore. <!--Describe the documentation added.--> #### Documentation N/A <!--Please delete paragraphs that you did not use before submitting.-->
1 parent b8746a9 commit ea75b50

File tree

4 files changed

+123
-18
lines changed

4 files changed

+123
-18
lines changed

.chloggen/gzip-concurrent.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awslogsencodingextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix bug in which concurrent go routines can end up using the same gzip reader
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40838]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/encoding/awslogsencodingextension/extension.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ import (
2323
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler/waf"
2424
)
2525

26+
const (
27+
gzipEncoding = "gzip"
28+
bytesEncoding = "bytes"
29+
parquetEncoding = "parquet"
30+
)
31+
2632
var _ encoding.LogsUnmarshalerExtension = (*encodingExtension)(nil)
2733

2834
type encodingExtension struct {
@@ -80,57 +86,62 @@ func (*encodingExtension) Shutdown(_ context.Context) error {
8086
}
8187

8288
func (e *encodingExtension) getGzipReader(buf []byte) (io.Reader, error) {
83-
var errGzipReader error
89+
var err error
8490
gzipReader, ok := e.gzipPool.Get().(*gzip.Reader)
8591
if !ok {
86-
gzipReader, errGzipReader = gzip.NewReader(bytes.NewReader(buf))
92+
gzipReader, err = gzip.NewReader(bytes.NewReader(buf))
8793
} else {
88-
errGzipReader = gzipReader.Reset(bytes.NewReader(buf))
94+
err = gzipReader.Reset(bytes.NewBuffer(buf))
8995
}
90-
if errGzipReader != nil {
96+
if err != nil {
9197
if gzipReader != nil {
9298
e.gzipPool.Put(gzipReader)
9399
}
94-
return nil, fmt.Errorf("failed to decompress content: %w", errGzipReader)
100+
return nil, fmt.Errorf("failed to decompress content: %w", err)
95101
}
96-
defer func() {
97-
_ = gzipReader.Close()
98-
e.gzipPool.Put(gzipReader)
99-
}()
100102
return gzipReader, nil
101103
}
102104

103-
func (e *encodingExtension) getReaderFromFormat(buf []byte) (io.Reader, error) {
105+
func (e *encodingExtension) getReaderFromFormat(buf []byte) (string, io.Reader, error) {
104106
switch e.format {
105107
case formatWAFLog, formatCloudWatchLogsSubscriptionFilter:
106-
return e.getGzipReader(buf)
108+
reader, err := e.getGzipReader(buf)
109+
return gzipEncoding, reader, err
107110
case formatS3AccessLog:
108-
return bytes.NewReader(buf), nil
111+
return bytesEncoding, bytes.NewReader(buf), nil
109112
case formatVPCFlowLog:
110113
switch e.vpcFormat {
111114
case fileFormatParquet:
112-
return nil, fmt.Errorf("%q still needs to be implemented", e.vpcFormat)
115+
return parquetEncoding, nil, fmt.Errorf("%q still needs to be implemented", e.vpcFormat)
113116
case fileFormatPlainText:
114-
return e.getGzipReader(buf)
117+
reader, err := e.getGzipReader(buf)
118+
return gzipEncoding, reader, err
115119
default:
116120
// should not be possible
117-
return nil, fmt.Errorf(
121+
return "", nil, fmt.Errorf(
118122
"unsupported file fileFormat %q for VPC flow log, expected one of %q",
119123
e.vpcFormat,
120124
supportedVPCFlowLogFileFormat,
121125
)
122126
}
123127
default:
124128
// should not be possible
125-
return nil, fmt.Errorf("unimplemented: format %q has no reader", e.format)
129+
return "", nil, fmt.Errorf("unimplemented: format %q has no reader", e.format)
126130
}
127131
}
128132

129133
func (e *encodingExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
130-
reader, err := e.getReaderFromFormat(buf)
134+
encodingReader, reader, err := e.getReaderFromFormat(buf)
131135
if err != nil {
132136
return plog.Logs{}, fmt.Errorf("failed to get reader for %q logs: %w", e.format, err)
133137
}
138+
defer func() {
139+
if encodingReader == gzipEncoding {
140+
r := reader.(*gzip.Reader)
141+
_ = r.Close()
142+
e.gzipPool.Put(r)
143+
}
144+
}()
134145

135146
logs, err := e.unmarshaler.UnmarshalAWSLogs(reader)
136147
if err != nil {

extension/encoding/awslogsencodingextension/extension_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@ package awslogsencodingextension
55

66
import (
77
"bytes"
8+
"os"
9+
"sync"
810
"testing"
911

1012
"github.com/klauspost/compress/gzip"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
15+
"go.opentelemetry.io/collector/component"
1316
"go.opentelemetry.io/collector/extension/extensiontest"
17+
18+
subscriptionfilter "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler/subscription-filter"
1419
)
1520

1621
func TestNew_CloudWatchLogsSubscriptionFilter(t *testing.T) {
@@ -91,7 +96,7 @@ func TestGetReaderFromFormat(t *testing.T) {
9196
for name, test := range tests {
9297
t.Run(name, func(t *testing.T) {
9398
e := &encodingExtension{format: test.format}
94-
reader, err := e.getReaderFromFormat(test.buf)
99+
_, reader, err := e.getReaderFromFormat(test.buf)
95100
if test.expectedErr != "" {
96101
require.ErrorContains(t, err, test.expectedErr)
97102
return
@@ -101,3 +106,49 @@ func TestGetReaderFromFormat(t *testing.T) {
101106
})
102107
}
103108
}
109+
110+
// readAndCompressLogFile reads the data inside it, compresses it
111+
// and returns a GZIP reader for it.
112+
func readAndCompressLogFile(t *testing.T, file string) []byte {
113+
data, err := os.ReadFile(file)
114+
require.NoError(t, err)
115+
var compressedData bytes.Buffer
116+
gzipWriter := gzip.NewWriter(&compressedData)
117+
_, err = gzipWriter.Write(data)
118+
require.NoError(t, err)
119+
err = gzipWriter.Close()
120+
require.NoError(t, err)
121+
return compressedData.Bytes()
122+
}
123+
124+
func TestConcurrentGzipReaderUsage(t *testing.T) {
125+
// Create an encoding extension for cloudwatch format to test the
126+
// gzip reader and check that it works as expected for non concurrent
127+
// and concurrent usage
128+
ext := &encodingExtension{
129+
unmarshaler: subscriptionfilter.NewSubscriptionFilterUnmarshaler(component.BuildInfo{}),
130+
format: formatCloudWatchLogsSubscriptionFilter,
131+
gzipPool: sync.Pool{},
132+
}
133+
134+
cloudwatchData := readAndCompressLogFile(t, "testdata/cloudwatch_log.json")
135+
testUnmarshall := func() {
136+
_, err := ext.UnmarshalLogs(cloudwatchData)
137+
require.NoError(t, err)
138+
}
139+
140+
// non concurrent
141+
testUnmarshall()
142+
143+
// concurrent usage
144+
concurrent := 20
145+
wg := sync.WaitGroup{}
146+
for i := 0; i < concurrent; i++ {
147+
wg.Add(1)
148+
go func() {
149+
defer wg.Done()
150+
testUnmarshall()
151+
}()
152+
}
153+
wg.Wait()
154+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"owner": "123456789012",
3+
"logGroup": "CloudTrail",
4+
"logStream": "123456789012_CloudTrail_us-east-1",
5+
"subscriptionFilters": [
6+
"Destination"
7+
],
8+
"messageType": "DATA_MESSAGE",
9+
"logEvents": [
10+
{
11+
"id": "31953106606966983378809025079804211143289615424298221568",
12+
"timestamp": 1432826855000,
13+
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
14+
}
15+
]
16+
}

0 commit comments

Comments
 (0)